Skip to content

Commit 3268c62

Browse files
committed
vnc: magic alt-alt key
1 parent 21c83e6 commit 3268c62

File tree

1 file changed

+127
-25
lines changed

1 file changed

+127
-25
lines changed

kvmd/apps/vnc/server.py

+127-25
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@
2525
import socket
2626
import dataclasses
2727
import contextlib
28+
import time
2829

2930
import aiohttp
3031
import async_lru
3132

33+
from evdev import ecodes
34+
3235
from ...logging import get_logger
3336

3437
from ...keyboard.keysym import SymmapModifiers
@@ -68,6 +71,10 @@ class _SharedParams:
6871

6972

7073
class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
74+
__MAGIC_KEY = ecodes.KEY_LEFTALT
75+
__MAGIC_TIMEOUT = 2
76+
__MAGIC_TRIGGER = 2
77+
7178
def __init__( # pylint: disable=too-many-arguments,too-many-locals
7279
self,
7380
reader: asyncio.StreamReader,
@@ -131,9 +138,18 @@ def __init__( # pylint: disable=too-many-arguments,too-many-locals
131138
# Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD
132139
self.__mouse_buttons: dict[str, (bool | None)] = dict.fromkeys(MOUSE_TO_EVDEV, None)
133140
self.__mouse_move = (-1, -1) # (X, Y)
134-
135141
self.__modifiers = 0
136142

143+
self.__clipboard = ""
144+
145+
self.__magic_taps = 0
146+
self.__magic_ts = 0.0
147+
self.__magic_codes: list[int] = []
148+
149+
self.__info_host = ""
150+
self.__info_switch_units = 0
151+
self.__info_switch_active = ""
152+
137153
# =====
138154

139155
async def run(self) -> None:
@@ -178,16 +194,22 @@ async def __kvmd_task_loop(self) -> None:
178194
async def __process_ws_event(self, event_type: str, event: dict) -> None:
179195
if event_type == "info":
180196
if "meta" in event:
197+
host = ""
181198
try:
182-
host = event["meta"]["server"]["host"]
199+
if isinstance(event["meta"]["server"]["host"], str):
200+
host = event["meta"]["server"]["host"].strip()
183201
except Exception:
184-
host = None
185-
else:
186-
if isinstance(host, str):
187-
name = f"PiKVM: {host}"
188-
if self._encodings.has_rename:
189-
await self._send_rename(name)
190-
self.__shared_params.name = name
202+
pass
203+
self.__info_host = host
204+
await self.__update_info()
205+
206+
elif event_type == "switch":
207+
if "model" in event:
208+
self.__info_switch_units = len(event["model"]["units"])
209+
if "summary" in event:
210+
self.__info_switch_active = event["summary"]["active_id"]
211+
if "model" in event or "summary" in event:
212+
await self.__update_info()
191213

192214
elif event_type == "hid":
193215
if (
@@ -197,6 +219,17 @@ async def __process_ws_event(self, event_type: str, event: dict) -> None:
197219
):
198220
await self._send_leds_state(**event["keyboard"]["leds"])
199221

222+
async def __update_info(self) -> None:
223+
info: list[str] = []
224+
if self.__info_switch_units > 0:
225+
info.append("Port " + (self.__info_switch_active or "not selected"))
226+
if self.__info_host:
227+
info.append(self.__info_host)
228+
info.append("PiKVM")
229+
self.__shared_params.name = " | ".join(info)
230+
if self._encodings.has_rename:
231+
await self._send_rename(self.__shared_params.name)
232+
200233
# =====
201234

202235
async def __streamer_task_loop(self) -> None:
@@ -329,6 +362,8 @@ async def _authorize_none(self) -> bool:
329362
# =====
330363

331364
async def _on_key_event(self, code: int, state: bool) -> None:
365+
assert self.__stage1_authorized.is_passed()
366+
332367
is_modifier = self.__switch_modifiers_x11(code, state)
333368
variants = self.__symmap.get(code)
334369
fake_shift = False
@@ -347,19 +382,19 @@ async def _on_key_event(self, code: int, state: bool) -> None:
347382
key = variants[SymmapModifiers.SHIFT]
348383
fake_shift = True
349384

350-
if key and self.__kvmd_ws:
385+
if key:
351386
if fake_shift:
352-
await self.__kvmd_ws.send_key_event(EvdevModifiers.SHIFT_LEFT, True)
353-
await self.__kvmd_ws.send_key_event(key, state)
387+
await self.__handle_key(ecodes.KEY_LEFTSHIFT, True)
388+
await self.__handle_key(key, state)
354389
if fake_shift:
355-
await self.__kvmd_ws.send_key_event(EvdevModifiers.SHIFT_LEFT, False)
390+
await self.__handle_key(ecodes.KEY_LEFTSHIFT, False)
356391

357392
async def _on_ext_key_event(self, code: int, state: bool) -> None:
393+
assert self.__stage1_authorized.is_passed()
358394
key = AT1_TO_EVDEV.get(code, 0)
359395
if key:
360396
self.__switch_modifiers_evdev(key, state) # Предполагаем, что модификаторы всегда известны
361-
if self.__kvmd_ws:
362-
await self.__kvmd_ws.send_key_event(key, state)
397+
await self.__handle_key(key, state)
363398

364399
def __switch_modifiers_x11(self, key: int, state: bool) -> bool:
365400
mod = 0
@@ -393,7 +428,83 @@ def __switch_modifiers_evdev(self, key: int, state: bool) -> bool:
393428
self.__modifiers &= ~mod
394429
return True
395430

431+
async def __handle_key(self, key: int, state: bool) -> None: # pylint: disable=too-many-branches
432+
if self.__magic_ts + self.__MAGIC_TIMEOUT < time.monotonic():
433+
self.__magic_taps = 0
434+
self.__magic_ts = 0
435+
self.__magic_codes = []
436+
437+
if key == self.__MAGIC_KEY:
438+
if not state:
439+
self.__magic_taps += 1
440+
self.__magic_ts = time.monotonic()
441+
elif state:
442+
taps = self.__magic_taps
443+
codes = self.__magic_codes
444+
self.__magic_taps = 0
445+
self.__magic_ts = 0
446+
self.__magic_codes = []
447+
if taps >= self.__MAGIC_TRIGGER:
448+
if key == ecodes.KEY_P:
449+
await self.__handle_magic_clipboard_print()
450+
return
451+
elif key in [ecodes.KEY_UP, ecodes.KEY_LEFT]:
452+
await self.__handle_magic_switch_prev()
453+
return
454+
elif key in [ecodes.KEY_DOWN, ecodes.KEY_RIGHT]:
455+
await self.__handle_magic_switch_next()
456+
return
457+
elif ecodes.KEY_1 <= key <= ecodes.KEY_8:
458+
if 1 <= self.__info_switch_units <= 2:
459+
await self.__handle_magic_switch_port(key - ecodes.KEY_1)
460+
elif self.__info_switch_units > 2:
461+
codes.append(key - ecodes.KEY_1 + 1)
462+
if len(codes) == 1:
463+
self.__magic_taps = taps
464+
self.__magic_ts = time.monotonic()
465+
self.__magic_codes = codes
466+
elif len(codes) >= 2:
467+
await self.__handle_magic_switch_port(codes[0] + codes[1] / 10)
468+
return
469+
470+
if self.__kvmd_ws:
471+
await self.__kvmd_ws.send_key_event(key, state)
472+
473+
async def __handle_magic_switch_prev(self) -> None:
474+
assert self.__kvmd_session
475+
if self.__info_switch_units > 0:
476+
get_logger(0).info("%s [main]: Switching port to the previous one ...", self._remote)
477+
await self.__kvmd_session.switch.set_active_prev()
478+
479+
async def __handle_magic_switch_next(self) -> None:
480+
assert self.__kvmd_session
481+
if self.__info_switch_units > 0:
482+
get_logger(0).info("%s [main]: Switching port to the next one ...", self._remote)
483+
await self.__kvmd_session.switch.set_active_next()
484+
485+
async def __handle_magic_switch_port(self, port: float) -> None:
486+
assert self.__kvmd_session
487+
if self.__info_switch_units > 0:
488+
get_logger(0).info("%s [main]: Switching port to %s ...", self._remote, port)
489+
await self.__kvmd_session.switch.set_active(port)
490+
491+
async def __handle_magic_clipboard_print(self) -> None:
492+
assert self.__kvmd_session
493+
if self.__clipboard:
494+
logger = get_logger(0)
495+
logger.info("%s [main]: Printing %d characters ...", self._remote, len(self.__clipboard))
496+
try:
497+
(keymap_name, available) = await self.__kvmd_session.hid.get_keymaps()
498+
if self.__keymap_name in available:
499+
keymap_name = self.__keymap_name
500+
await self.__kvmd_session.hid.print(self.__clipboard, 0, keymap_name)
501+
except Exception:
502+
logger.exception("%s [main]: Can't print characters", self._remote)
503+
504+
# =====
505+
396506
async def _on_pointer_event(self, buttons: dict[str, bool], wheel: tuple[int, int], move: tuple[int, int]) -> None:
507+
assert self.__stage1_authorized.is_passed()
397508
if self.__kvmd_ws:
398509
if wheel[0] or wheel[1]:
399510
await self.__kvmd_ws.send_mouse_wheel_event(*wheel)
@@ -409,16 +520,7 @@ async def _on_pointer_event(self, buttons: dict[str, bool], wheel: tuple[int, in
409520

410521
async def _on_cut_event(self, text: str) -> None:
411522
assert self.__stage1_authorized.is_passed()
412-
assert self.__kvmd_session
413-
logger = get_logger(0)
414-
logger.info("%s [main]: Printing %d characters ...", self._remote, len(text))
415-
try:
416-
(keymap_name, available) = await self.__kvmd_session.hid.get_keymaps()
417-
if self.__keymap_name in available:
418-
keymap_name = self.__keymap_name
419-
await self.__kvmd_session.hid.print(text, 0, keymap_name)
420-
except Exception:
421-
logger.exception("%s [main]: Can't print characters", self._remote)
523+
self.__clipboard = text
422524

423525
async def _on_set_encodings(self) -> None:
424526
assert self.__stage1_authorized.is_passed()

0 commit comments

Comments
 (0)