mirror of
https://github.com/rustdesk/rustdesk.git
synced 2025-01-18 07:43:01 +08:00
linux_wayland_support: pynput_service.py
Signed-off-by: fufesou <shuanglongchen@yeah.net>
This commit is contained in:
parent
05b264c968
commit
8e121eacd9
236
pynput_service.py
Normal file
236
pynput_service.py
Normal file
@ -0,0 +1,236 @@
|
||||
from pynput.keyboard import Key, Controller
|
||||
from pynput.keyboard._xorg import KeyCode
|
||||
from pynput._util.xorg import display_manager
|
||||
import Xlib
|
||||
from pynput._util.xorg import *
|
||||
import Xlib
|
||||
import os
|
||||
import sys
|
||||
import socket
|
||||
|
||||
KeyCode._from_symbol("\0") # test
|
||||
|
||||
DEAD_KEYS = {
|
||||
'`': 65104,
|
||||
'´': 65105,
|
||||
'^': 65106,
|
||||
'~': 65107,
|
||||
'¯': 65108,
|
||||
'˘': 65109,
|
||||
'˙': 65110,
|
||||
'¨': 65111,
|
||||
'˚': 65112,
|
||||
'˝': 65113,
|
||||
'ˇ': 65114,
|
||||
'¸': 65115,
|
||||
'˛': 65116,
|
||||
'℩': 65117, # ?
|
||||
'゛': 65118, # ?
|
||||
'゚ ': 65119,
|
||||
'ٜ': 65120,
|
||||
'↪': 65121,
|
||||
' ̛': 65122,
|
||||
}
|
||||
|
||||
|
||||
|
||||
def my_keyboard_mapping(display):
|
||||
"""Generates a mapping from *keysyms* to *key codes* and required
|
||||
modifier shift states.
|
||||
|
||||
:param Xlib.display.Display display: The display for which to retrieve the
|
||||
keyboard mapping.
|
||||
|
||||
:return: the keyboard mapping
|
||||
"""
|
||||
mapping = {}
|
||||
|
||||
shift_mask = 1 << 0
|
||||
group_mask = alt_gr_mask(display)
|
||||
|
||||
# Iterate over all keysym lists in the keyboard mapping
|
||||
min_keycode = display.display.info.min_keycode
|
||||
keycode_count = display.display.info.max_keycode - min_keycode + 1
|
||||
for index, keysyms in enumerate(display.get_keyboard_mapping(
|
||||
min_keycode, keycode_count)):
|
||||
key_code = index + min_keycode
|
||||
|
||||
# Normalise the keysym list to yield a tuple containing the two groups
|
||||
normalized = keysym_normalize(keysyms)
|
||||
if not normalized:
|
||||
continue
|
||||
|
||||
# Iterate over the groups to extract the shift and modifier state
|
||||
for groups, group in zip(normalized, (False, True)):
|
||||
for keysym, shift in zip(groups, (False, True)):
|
||||
|
||||
if not keysym:
|
||||
continue
|
||||
shift_state = 0 \
|
||||
| (shift_mask if shift else 0) \
|
||||
| (group_mask if group else 0)
|
||||
|
||||
# !!!: Save all keycode combinations of keysym
|
||||
if keysym in mapping:
|
||||
mapping[keysym].append((key_code, shift_state))
|
||||
else:
|
||||
mapping[keysym] = [(key_code, shift_state)]
|
||||
return mapping
|
||||
|
||||
|
||||
class MyController(Controller):
|
||||
def _update_keyboard_mapping(self):
|
||||
"""Updates the keyboard mapping.
|
||||
"""
|
||||
with display_manager(self._display) as dm:
|
||||
self._keyboard_mapping = my_keyboard_mapping(dm)
|
||||
|
||||
def send_event(self, event, keycode, shift_state):
|
||||
with display_manager(self._display) as dm, self.modifiers as modifiers:
|
||||
# Under certain cimcumstances, such as when running under Xephyr,
|
||||
# the value returned by dm.get_input_focus is an int
|
||||
window = dm.get_input_focus().focus
|
||||
send_event = getattr(
|
||||
window,
|
||||
'send_event',
|
||||
lambda event: dm.send_event(window, event))
|
||||
send_event(event(
|
||||
detail=keycode,
|
||||
state=shift_state | self._shift_mask(modifiers),
|
||||
time=0,
|
||||
root=dm.screen().root,
|
||||
window=window,
|
||||
same_screen=0,
|
||||
child=Xlib.X.NONE,
|
||||
root_x=0, root_y=0, event_x=0, event_y=0))
|
||||
|
||||
def fake_input(self, keycode, is_press):
|
||||
with display_manager(self._display) as dm:
|
||||
Xlib.ext.xtest.fake_input(
|
||||
dm,
|
||||
Xlib.X.KeyPress if is_press else Xlib.X.KeyRelease,
|
||||
keycode)
|
||||
|
||||
def _handle(self, key, is_press):
|
||||
"""Resolves a key identifier and sends a keyboard event.
|
||||
:param event: The *X* keyboard event.
|
||||
:param int keysym: The keysym to handle.
|
||||
"""
|
||||
event = Xlib.display.event.KeyPress if is_press \
|
||||
else Xlib.display.event.KeyRelease
|
||||
keysym = self._keysym(key)
|
||||
|
||||
if key.vk is not None:
|
||||
keycode = self._display.keysym_to_keycode(key.vk)
|
||||
self.fake_input(keycode, is_press)
|
||||
# Otherwise use XSendEvent; we need to use this in the general case to
|
||||
# work around problems with keyboard layouts
|
||||
self._emit('_on_fake_event', key, is_press)
|
||||
return
|
||||
|
||||
# Make sure to verify that the key was resolved
|
||||
if keysym is None:
|
||||
raise self.InvalidKeyException(key)
|
||||
|
||||
# There may be multiple keycodes for keysym in keyboard_mapping
|
||||
keycode_flag = len(self.keyboard_mapping[keysym]) == 1
|
||||
if keycode_flag:
|
||||
keycode, shift_state = self.keyboard_mapping[keysym][0]
|
||||
else:
|
||||
keycode, shift_state = self._display.keysym_to_keycode(keysym), 0
|
||||
|
||||
keycode_set = set(map(lambda x: x[0], self.keyboard_mapping[keysym]))
|
||||
# The keycode of the dead key is inconsistent, The keysym has multiple combinations of a keycode.
|
||||
if keycode != self._display.keysym_to_keycode(keysym) \
|
||||
or (keycode_flag == False and keycode == list(keycode_set)[0] and len(keycode_set) == 1):
|
||||
deakkey_chr = str(key).replace("'", '')
|
||||
keysym = DEAD_KEYS[deakkey_chr]
|
||||
keycode, shift_state = self.keyboard_mapping[keysym][0]
|
||||
|
||||
# If the key has a virtual key code, use that immediately with
|
||||
# fake_input; fake input,being an X server extension, has access to
|
||||
# more internal state that we do
|
||||
|
||||
try:
|
||||
with self.modifiers as modifiers:
|
||||
alt_gr = Key.alt_gr in modifiers
|
||||
# !!!: Send_event can't support lock screen, this condition cann't be modified
|
||||
if alt_gr:
|
||||
self.send_event(
|
||||
event, keycode, shift_state)
|
||||
else:
|
||||
self.fake_input(keycode, is_press)
|
||||
except KeyError:
|
||||
with self._borrow_lock:
|
||||
keycode, index, count = self._borrows[keysym]
|
||||
self._send_key(
|
||||
event,
|
||||
keycode,
|
||||
index_to_shift(self._display, index))
|
||||
count += 1 if is_press else -1
|
||||
self._borrows[keysym] = (keycode, index, count)
|
||||
|
||||
# Notify any running listeners
|
||||
self._emit('_on_fake_event', key, is_press)
|
||||
|
||||
|
||||
keyboard = MyController()
|
||||
|
||||
server_address = sys.argv[1]
|
||||
if not os.path.exists(os.path.dirname(server_address)):
|
||||
os.makedirs(os.path.dirname(server_address))
|
||||
|
||||
try:
|
||||
os.unlink(server_address)
|
||||
except OSError:
|
||||
if os.path.exists(server_address):
|
||||
raise
|
||||
|
||||
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
server.bind(server_address)
|
||||
server.listen(1)
|
||||
clientsocket, address = server.accept()
|
||||
os.system('chmod a+rw %s' % server_address)
|
||||
print("Got pynput connection")
|
||||
|
||||
|
||||
def loop():
|
||||
global keyboard
|
||||
buf = []
|
||||
while True:
|
||||
data = clientsocket.recv(1024)
|
||||
if not data:
|
||||
print("Connection broken")
|
||||
break
|
||||
buf.extend(data)
|
||||
while buf:
|
||||
n = buf[0]
|
||||
n = n + 1
|
||||
if len(buf) < n:
|
||||
break
|
||||
msg = bytearray(buf[1:n]).decode("utf-8")
|
||||
buf = buf[n:]
|
||||
if len(msg) < 2:
|
||||
continue
|
||||
if msg[1] == "\0":
|
||||
keyboard = MyController()
|
||||
print("Keyboard reset")
|
||||
continue
|
||||
if len(msg) == 2:
|
||||
name = msg[1]
|
||||
else:
|
||||
name = KeyCode._from_symbol(msg[1:])
|
||||
if str(name) == "<0>":
|
||||
continue
|
||||
try:
|
||||
if msg[0] == "p":
|
||||
keyboard.press(name)
|
||||
else:
|
||||
keyboard.release(name)
|
||||
except Exception as e:
|
||||
print('[x] error key',e)
|
||||
|
||||
|
||||
loop()
|
||||
clientsocket.close()
|
||||
server.close()
|
Loading…
Reference in New Issue
Block a user