Replace single-threaded model with producer/consumer multithreaded model

This commit is contained in:
redfast00 2020-01-20 19:00:50 +01:00
parent 63dd730986
commit 49408b075b
No known key found for this signature in database
GPG Key ID: 5946E0E34FD0553C
1 changed files with 129 additions and 112 deletions

241
server.py
View File

@ -7,148 +7,165 @@ from http.server import HTTPServer, BaseHTTPRequestHandler
from io import BytesIO from io import BytesIO
import string import string
import base64 import base64
from queue import Queue
from threading import Thread
NULL_CHAR = chr(0) NULL_CHAR = chr(0)
file = open('/dev/hidg0', 'rb+')
alphabet_lower = set(string.ascii_lowercase)
alphabet_upper = set(string.ascii_uppercase)
colors = {
"Z": "a", # Black
"B": "b", # Blue
"G": "c", # Green
"M": "d", # Magenta
"R": "e", # Red
"P": "f", # Pink
"Y": "g", # Yellow
"W": "h", # White
"z": "i", # Light/blink Black
"b": "j", # Light/blink Blue
"g": "k", # Light/blink Green
"m": "l", # Light/blink Magenta
"r": "m", # Light/blink Red
"p": "n", # Light/blink Pink
"y": "o", # Light/blink Yellow
"w": "p" # Light/blink White
}
special_chars = {
' ': [NULL_CHAR, chr(44)],
'\n': [NULL_CHAR, chr(40)],
'\\': [NULL_CHAR, chr(0x31)],
'.': [NULL_CHAR, chr(0x37)],
'/': [NULL_CHAR, chr(0x38)],
';': [NULL_CHAR, chr(0x33)],
'-': [NULL_CHAR, chr(0x2d)],
'=': [NULL_CHAR, chr(0x2e)],
'[': [NULL_CHAR, chr(0x2f)],
']': [NULL_CHAR, chr(0x30)],
'*': [NULL_CHAR, chr(0x55)],
',': [NULL_CHAR, chr(0x36)],
'!': [chr(32), chr(0x1e)],
'<': [chr(32), chr(0x36)],
'>': [chr(32), chr(0x37)],
'?': [chr(32), chr(0x38)],
':': [chr(32), chr(0x33)],
'(': [chr(32), chr(0x26)],
')': [chr(32), chr(0x27)],
'&': [chr(32), chr(0x24)],
'%': [chr(32), chr(0x22)],
'#': [chr(32), chr(0x20)],
'@': [chr(32), chr(0x1f)],
'$': [chr(32), chr(0x21)],
'_': [chr(32), chr(0x2d)],
'+': [chr(32), chr(0x2e)],
'{': [chr(32), chr(0x2f)],
'}': [chr(32), chr(0x30)],
'|': [chr(32), chr(0x31)],
'~': [chr(32), chr(0x35)],
}
def write_report(report): class KeyboardHandler:
file.write(report.encode())
def __init__(self, hidfile='/dev/hidg0'):
# Open keyboard device for binary appending
self.file = open(hidfile, 'ba')
colors = {
"Z": "a", # Black
"B": "b", # Blue
"G": "c", # Green
"M": "d", # Magenta
"R": "e", # Red
"P": "f", # Pink
"Y": "g", # Yellow
"W": "h", # White
"z": "i", # Light/blink Black
"b": "j", # Light/blink Blue
"g": "k", # Light/blink Green
"m": "l", # Light/blink Magenta
"r": "m", # Light/blink Red
"p": "n", # Light/blink Pink
"y": "o", # Light/blink Yellow
"w": "p" # Light/blink White
}
special_chars = {
' ': [NULL_CHAR, chr(44)],
'\n': [NULL_CHAR, chr(40)],
'\\': [NULL_CHAR, chr(0x31)],
'.': [NULL_CHAR, chr(0x37)],
'/': [NULL_CHAR, chr(0x38)],
';': [NULL_CHAR, chr(0x33)],
'-': [NULL_CHAR, chr(0x2d)],
'=': [NULL_CHAR, chr(0x2e)],
'[': [NULL_CHAR, chr(0x2f)],
']': [NULL_CHAR, chr(0x30)],
'*': [NULL_CHAR, chr(0x55)],
',': [NULL_CHAR, chr(0x36)],
'!': [chr(32), chr(0x1e)],
'<': [chr(32), chr(0x36)],
'>': [chr(32), chr(0x37)],
'?': [chr(32), chr(0x38)],
':': [chr(32), chr(0x33)],
'(': [chr(32), chr(0x26)],
')': [chr(32), chr(0x27)],
'&': [chr(32), chr(0x24)],
'%': [chr(32), chr(0x22)],
'#': [chr(32), chr(0x20)],
'@': [chr(32), chr(0x1f)],
'$': [chr(32), chr(0x21)],
'_': [chr(32), chr(0x2d)],
'+': [chr(32), chr(0x2e)],
'{': [chr(32), chr(0x2f)],
'}': [chr(32), chr(0x30)],
'|': [chr(32), chr(0x31)],
'~': [chr(32), chr(0x35)],
}
alphabet_lower = set(string.ascii_lowercase)
alphabet_upper = set(string.ascii_uppercase)
def release_keys(): def write_report(self, report):
write_report(NULL_CHAR * 8) self.file.write(report.encode())
def printchar(c): def release_keys(self):
if c in alphabet_lower: self.write_report(NULL_CHAR * 8)
write_report(NULL_CHAR * 2 + chr(4 + ord(c) - ord('a')) + NULL_CHAR * 5)
elif c in alphabet_upper: def printchar(self, c):
write_report(chr(32) + NULL_CHAR + chr(4 + ord(c) - ord('A')) + NULL_CHAR * 5) if c in self.alphabet_lower:
elif c.isdigit(): self.write_report(NULL_CHAR * 2 + chr(4 + ord(c) - ord('a')) + NULL_CHAR * 5)
write_report(NULL_CHAR * 2 + chr(0x1e + ((ord(c) - ord('0') - 1) % 10)) + NULL_CHAR * 5) elif c in self.alphabet_upper:
elif c in special_chars: self.write_report(chr(32) + NULL_CHAR + chr(4 + ord(c) - ord('A')) + NULL_CHAR * 5)
write_report(special_chars[c][0] + NULL_CHAR + special_chars[c][1] + NULL_CHAR * 5) elif c.isdigit():
else: self.write_report(NULL_CHAR * 2 + chr(0x1e + ((ord(c) - ord('0') - 1) % 10)) + NULL_CHAR * 5)
write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5) elif c in self.special_chars:
release_keys() self.write_report(self.special_chars[c][0] + NULL_CHAR + self.special_chars[c][1] + NULL_CHAR * 5)
else:
self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
self.release_keys()
def change_color(background, foreground): def change_color(self, background, foreground):
if background in colors and foreground in colors: if background in self.colors and foreground in self.colors:
# DELETE keypress # DELETE keypress
write_report(NULL_CHAR * 2 + chr(0x2a) + NULL_CHAR * 5) self.write_report(NULL_CHAR * 2 + chr(0x2a) + NULL_CHAR * 5)
release_keys() self.release_keys()
write_report(NULL_CHAR * 2 + chr(4 + ord(colors[background]) - ord('a')) + NULL_CHAR * 5) self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[background]) - ord('a')) + NULL_CHAR * 5)
release_keys() self.release_keys()
write_report(NULL_CHAR * 2 + chr(4 + ord(colors[foreground]) - ord('a')) + NULL_CHAR * 5) self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[foreground]) - ord('a')) + NULL_CHAR * 5)
else: else:
print('Malformed Color code: §' + background + foreground) print('Malformed Color code: §' + background + foreground)
write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5) self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
release_keys() self.release_keys()
def printstring(s): def printstring(self, s):
last_c = '' last_c = ''
last_last_c = '' last_last_c = ''
for c in s: for c in s:
if last_last_c == '§': if last_last_c == '§':
change_color(last_c, c) self.change_color(last_c, c)
elif c != '§' and last_c != '§': elif c != '§' and last_c != '§':
printchar(c) self.printchar(c)
last_c, last_last_c = c, last_c last_c, last_last_c = c, last_c
def reset_color(): def reset_color(self):
change_color('Z', 'g') self.change_color('Z', 'g')
release_keys() self.release_keys()
def beep(code='fff'): def beep(self, code='fff'):
write_report(NULL_CHAR * 2 + chr(0x29) + NULL_CHAR * 5) self.write_report(NULL_CHAR * 2 + chr(0x29) + NULL_CHAR * 5)
release_keys() self.release_keys()
for char in code: for char in code:
printchar(char) self.printchar(char)
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def __init__(self, request, client_address, server):
self.messagequeue = server.messagequeue
super().__init__(request, client_address, server)
def do_GET(self): def do_GET(self):
self.send_response(200) self.send_response(200)
self.end_headers() self.end_headers()
self.wfile.write(b'Hello, world!') self.wfile.write(b'TAp reporting for duty!')
def do_POST(self): def do_POST(self):
print(self.headers)
message = base64.b64decode(self.headers['X-Messages']).decode('utf8') message = base64.b64decode(self.headers['X-Messages']).decode('utf8')
self.send_response(200) self.send_response(200)
self.end_headers() self.end_headers()
response = BytesIO() self.messagequeue.put_nowait(message)
response.write(b'This is POST request. ') self.wfile.write(b'Received message')
response.write(b'Received: ')
self.wfile.write(response.getvalue()) def message_collector(messagequeue):
keyboard = KeyboardHandler()
while True:
message = messagequeue.get()
if message == 'reset': if message == 'reset':
printstring('\n' * 32) keyboard.printstring('\n' * 32)
else: else:
printstring(message + '\n') keyboard.printstring(message + '\n')
beep() keyboard.beep()
reset_color() keyboard.reset_color()
file.flush() keyboard.file.flush()
shared_messagequeue = Queue()
httpd = HTTPServer(('0.0.0.0', 8000), SimpleHTTPRequestHandler) httpd = HTTPServer(('0.0.0.0', 8000), SimpleHTTPRequestHandler)
httpd.messagequeue = shared_messagequeue
thread = Thread(target = message_collector, args = (shared_messagequeue, ))
thread.start()
httpd.serve_forever() httpd.serve_forever()