mirror of
https://github.com/rustdesk/rustdesk.git
synced 2024-12-18 13:37:52 +08:00
125 lines
3.9 KiB
Python
125 lines
3.9 KiB
Python
from pynput.keyboard import Key, Controller
|
|
from pynput.keyboard._xorg import KeyCode
|
|
from pynput._util.xorg import display_manager
|
|
import Xlib
|
|
import os
|
|
import sys
|
|
import socket
|
|
|
|
KeyCode._from_symbol("\0") # test
|
|
|
|
class MyController(Controller):
|
|
def _handle(self, key, is_press):
|
|
"""Resolves a key identifier and sends a keyboard event.
|
|
:param event: The *X* keyboard event.
|
|
:param int keysym: The keysym to handle.
|
|
"""
|
|
event = Xlib.display.event.KeyPress if is_press \
|
|
else Xlib.display.event.KeyRelease
|
|
keysym = self._keysym(key)
|
|
|
|
# Make sure to verify that the key was resolved
|
|
if keysym is None:
|
|
raise self.InvalidKeyException(key)
|
|
|
|
# If the key has a virtual key code, use that immediately with
|
|
# fake_input; fake input,being an X server extension, has access to
|
|
# more internal state that we do
|
|
if key.vk is not None:
|
|
with display_manager(self._display) as dm:
|
|
Xlib.ext.xtest.fake_input(
|
|
dm,
|
|
Xlib.X.KeyPress if is_press else Xlib.X.KeyRelease,
|
|
dm.keysym_to_keycode(key.vk))
|
|
|
|
# Otherwise use XSendEvent; we need to use this in the general case to
|
|
# work around problems with keyboard layouts
|
|
else:
|
|
try:
|
|
keycode, shift_state = self.keyboard_mapping[keysym]
|
|
with self.modifiers as modifiers:
|
|
alt_gr = Key.alt_gr in modifiers
|
|
if alt_gr:
|
|
self._send_key(event, keycode, shift_state)
|
|
else:
|
|
with display_manager(self._display) as dm:
|
|
Xlib.ext.xtest.fake_input(
|
|
dm,
|
|
Xlib.X.KeyPress if is_press else Xlib.X.KeyRelease,
|
|
keycode)
|
|
|
|
except KeyError:
|
|
with self._borrow_lock:
|
|
keycode, index, count = self._borrows[keysym]
|
|
self._send_key(
|
|
event,
|
|
keycode,
|
|
index_to_shift(self._display, index))
|
|
count += 1 if is_press else -1
|
|
self._borrows[keysym] = (keycode, index, count)
|
|
|
|
# Notify any running listeners
|
|
self._emit('_on_fake_event', key, is_press)
|
|
|
|
keyboard = MyController()
|
|
|
|
server_address = sys.argv[1]
|
|
if not os.path.exists(os.path.dirname(server_address)):
|
|
os.makedirs(os.path.dirname(server_address))
|
|
|
|
try:
|
|
os.unlink(server_address)
|
|
except OSError:
|
|
if os.path.exists(server_address):
|
|
raise
|
|
|
|
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
server.bind(server_address)
|
|
server.listen(1)
|
|
clientsocket, address = server.accept()
|
|
os.system('chmod a+rw %s'%server_address)
|
|
print("Got pynput connection")
|
|
|
|
|
|
def loop():
|
|
global keyboard
|
|
buf = []
|
|
while True:
|
|
data = clientsocket.recv(1024)
|
|
if not data:
|
|
print("Connection broken")
|
|
break
|
|
buf.extend(data)
|
|
while buf:
|
|
n = buf[0]
|
|
n = n + 1
|
|
if len(buf) < n:
|
|
break
|
|
msg = bytearray(buf[1:n]).decode("utf-8")
|
|
buf = buf[n:]
|
|
if len(msg) < 2:
|
|
continue
|
|
if msg[1] == "\0":
|
|
keyboard = MyController()
|
|
print("Keyboard reset")
|
|
continue
|
|
if len(msg) == 2:
|
|
name = msg[1]
|
|
else:
|
|
name = KeyCode._from_symbol(msg[1:])
|
|
if str(name) == "<0>":
|
|
continue
|
|
try:
|
|
if msg[0] == "p":
|
|
keyboard.press(name)
|
|
else:
|
|
keyboard.release(name)
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
|
|
loop()
|
|
clientsocket.close()
|
|
server.close()
|
|
|