Compare commits

..

1 commit

Author SHA1 Message Date
Hannes Klinckaert
33d8f6888d maybe added threads 2019-10-02 18:18:27 +02:00

260
server.py
View file

@ -7,175 +7,161 @@ from http.server import HTTPServer, BaseHTTPRequestHandler
from io import BytesIO from io import BytesIO
import string import string
import base64 import base64
from queue import Queue import queue
from threading import Thread from threading import Thread
import subprocess
NULL_CHAR = chr(0) NULL_CHAR = chr(0)
file = open('/dev/hidg0', 'rb+')
alphabet_lower = set(string.ascii_lowercase)
alphabet_upper = set(string.ascii_uppercase)
message_queue = queue.SimpleQueue
thread = Thread(target = writer)
thread.start()
colors = {
"Z": "a", # Black
"B": "b", # Blue
"G": "c", # Green
"M": "d", # Magenta
"R": "e", # Red
"P": "f", # Pink
"Y": "g", # Yellow
"W": "h", # White
"z": "i", # Light/blink Black
"b": "j", # Light/blink Blue
"g": "k", # Light/blink Green
"m": "l", # Light/blink Magenta
"r": "m", # Light/blink Red
"p": "n", # Light/blink Pink
"y": "o", # Light/blink Yellow
"w": "p" # Light/blink White
}
special_chars = {
' ': [NULL_CHAR, chr(44)],
'\n': [NULL_CHAR, chr(40)],
'\\': [NULL_CHAR, chr(0x31)],
'.': [NULL_CHAR, chr(0x37)],
'/': [NULL_CHAR, chr(0x38)],
';': [NULL_CHAR, chr(0x33)],
'-': [NULL_CHAR, chr(0x2d)],
'=': [NULL_CHAR, chr(0x2e)],
'[': [NULL_CHAR, chr(0x2f)],
']': [NULL_CHAR, chr(0x30)],
'*': [NULL_CHAR, chr(0x55)],
',': [NULL_CHAR, chr(0x36)],
'!': [chr(32), chr(0x1e)],
'<': [chr(32), chr(0x36)],
'>': [chr(32), chr(0x37)],
'?': [chr(32), chr(0x38)],
':': [chr(32), chr(0x33)],
'(': [chr(32), chr(0x26)],
')': [chr(32), chr(0x27)],
'&': [chr(32), chr(0x24)],
'%': [chr(32), chr(0x22)],
'#': [chr(32), chr(0x20)],
'@': [chr(32), chr(0x1f)],
'$': [chr(32), chr(0x21)],
'_': [chr(32), chr(0x2d)],
'+': [chr(32), chr(0x2e)],
'{': [chr(32), chr(0x2f)],
'}': [chr(32), chr(0x30)],
'|': [chr(32), chr(0x31)],
'~': [chr(32), chr(0x35)],
}
class KeyboardHandler: def writer() {
while true:
if not message_queue.empty():
message = message_queue.get()
printstring(message)
}
def __init__(self, hidfile='/dev/hidg0'): def write_report(report):
# Open keyboard device for binary appending file.write(report.encode())
self.file = open(hidfile, 'ba')
colors = {
"Z": "a", # Black
"B": "b", # Blue
"G": "c", # Green
"M": "d", # Magenta
"R": "e", # Red
"P": "f", # Pink
"Y": "g", # Yellow
"W": "h", # White
"z": "i", # Light/blink Black
"b": "j", # Light/blink Blue
"g": "k", # Light/blink Green
"m": "l", # Light/blink Magenta
"r": "m", # Light/blink Red
"p": "n", # Light/blink Pink
"y": "o", # Light/blink Yellow
"w": "p" # Light/blink White
}
special_chars = {
' ': [NULL_CHAR, chr(44)],
'\n': [NULL_CHAR, chr(40)],
'\\': [NULL_CHAR, chr(0x31)],
'.': [NULL_CHAR, chr(0x37)],
'/': [NULL_CHAR, chr(0x38)],
';': [NULL_CHAR, chr(0x33)],
'-': [NULL_CHAR, chr(0x2d)],
'=': [NULL_CHAR, chr(0x2e)],
'[': [NULL_CHAR, chr(0x2f)],
']': [NULL_CHAR, chr(0x30)],
'*': [NULL_CHAR, chr(0x55)],
',': [NULL_CHAR, chr(0x36)],
'!': [chr(32), chr(0x1e)],
'<': [chr(32), chr(0x36)],
'>': [chr(32), chr(0x37)],
'?': [chr(32), chr(0x38)],
':': [chr(32), chr(0x33)],
'(': [chr(32), chr(0x26)],
')': [chr(32), chr(0x27)],
'&': [chr(32), chr(0x24)],
'%': [chr(32), chr(0x22)],
'#': [chr(32), chr(0x20)],
'@': [chr(32), chr(0x1f)],
'$': [chr(32), chr(0x21)],
'_': [chr(32), chr(0x2d)],
'+': [chr(32), chr(0x2e)],
'{': [chr(32), chr(0x2f)],
'}': [chr(32), chr(0x30)],
'|': [chr(32), chr(0x31)],
'~': [chr(32), chr(0x35)],
}
alphabet_lower = set(string.ascii_lowercase)
alphabet_upper = set(string.ascii_uppercase)
def write_report(self, report): def release_keys():
self.file.write(report.encode()) write_report(NULL_CHAR * 8)
def release_keys(self): def printchar(c):
self.write_report(NULL_CHAR * 8) if c in alphabet_lower:
write_report(NULL_CHAR * 2 + chr(4 + ord(c) - ord('a')) + NULL_CHAR * 5)
def printchar(self, c): elif c in alphabet_upper:
if c in self.alphabet_lower: write_report(chr(32) + NULL_CHAR + chr(4 + ord(c) - ord('A')) + NULL_CHAR * 5)
self.write_report(NULL_CHAR * 2 + chr(4 + ord(c) - ord('a')) + NULL_CHAR * 5) elif c.isdigit():
elif c in self.alphabet_upper: write_report(NULL_CHAR * 2 + chr(0x1e + ((ord(c) - ord('0') - 1) % 10)) + NULL_CHAR * 5)
self.write_report(chr(32) + NULL_CHAR + chr(4 + ord(c) - ord('A')) + NULL_CHAR * 5) elif c in special_chars:
elif c.isdigit(): write_report(special_chars[c][0] + NULL_CHAR + special_chars[c][1] + NULL_CHAR * 5)
self.write_report(NULL_CHAR * 2 + chr(0x1e + ((ord(c) - ord('0') - 1) % 10)) + NULL_CHAR * 5) else:
elif c in self.special_chars: write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
self.write_report(self.special_chars[c][0] + NULL_CHAR + self.special_chars[c][1] + NULL_CHAR * 5) release_keys()
else:
self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
self.release_keys()
def change_color(self, background, foreground): def change_color(background, foreground):
if background in self.colors and foreground in self.colors: if background in colors and foreground in colors:
# DELETE keypress # DELETE keypress
self.write_report(NULL_CHAR * 2 + chr(0x2a) + NULL_CHAR * 5) write_report(NULL_CHAR * 2 + chr(0x2a) + NULL_CHAR * 5)
self.release_keys() release_keys()
self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[background]) - ord('a')) + NULL_CHAR * 5) write_report(NULL_CHAR * 2 + chr(4 + ord(colors[background]) - ord('a')) + NULL_CHAR * 5)
self.release_keys() release_keys()
self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[foreground]) - ord('a')) + NULL_CHAR * 5) write_report(NULL_CHAR * 2 + chr(4 + ord(colors[foreground]) - ord('a')) + NULL_CHAR * 5)
else: else:
print('Malformed Color code: §' + background + foreground) print('Malformed Color code: §' + background + foreground)
self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5) write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
self.release_keys() release_keys()
def printstring(self, s): def printstring(s):
last_c = '' last_c = ''
last_last_c = '' last_last_c = ''
# send Tab-key, to indicate this is sent by machine, not by human for c in s:
self.write_report(NULL_CHAR * 2 + chr(0x2b) + NULL_CHAR*5) if last_last_c == '§':
self.release_keys() change_color(last_c, c)
for c in s: elif c != '§' and last_c != '§':
if last_last_c == '§': printchar(c)
self.change_color(last_c, c) last_c, last_last_c = c, last_c
elif c != '§' and last_c != '§':
self.printchar(c)
last_c, last_last_c = c, last_c
def reset_color(self): def reset_color():
self.change_color('Z', 'g') change_color('Z', 'g')
self.release_keys() release_keys()
def beep(self, code='fff'): def beep():
self.write_report(NULL_CHAR * 2 + chr(0x29) + NULL_CHAR * 5) write_report(NULL_CHAR * 2 + chr(0x29) + 'ff' + NULL_CHAR * 3)
self.release_keys() release_keys()
for char in code: print('beep')
self.printchar(char)
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def __init__(self, request, client_address, server):
self.messagequeue = server.messagequeue
super().__init__(request, client_address, server)
def do_GET(self): def do_GET(self):
self.send_response(200) self.send_response(200)
self.end_headers() self.end_headers()
self.wfile.write(b'TAp reporting for duty!') self.wfile.write(b'Hello, world!')
def do_POST(self): def do_POST(self):
print(self.headers)
message = base64.b64decode(self.headers['X-Messages']).decode('utf8') message = base64.b64decode(self.headers['X-Messages']).decode('utf8')
self.send_response(200) self.send_response(200)
self.end_headers() self.end_headers()
self.messagequeue.put_nowait(message) response = BytesIO()
self.wfile.write(b'Received message\n') response.write(b'This is POST request. ')
response.write(b'Received: ')
def message_collector(messagequeue): self.wfile.write(response.getvalue())
keyboard = KeyboardHandler()
while True:
message = messagequeue.get()
if message == 'reset': if message == 'reset':
keyboard.printstring('\n' * 32) message_queue.put('\n' * 32)
else: else:
keyboard.printstring(message + '\n') message_queue.put(message + '\n')
args = [str(ord(s)) for s in message if 0 < ord(s) < 255][:20] # beep()
subprocess.run(["i2cset", "-y", "1", "8", "0"] + args + ['i']) reset_color()
time.sleep(int(len(args) * 1.5)) file.flush()
keyboard.reset_color()
keyboard.file.flush()
if messagequeue.empty():
keyboard.beep()
keyboard.file.flush()
shared_messagequeue = Queue()
httpd = HTTPServer(('0.0.0.0', 8000), SimpleHTTPRequestHandler) httpd = HTTPServer(('0.0.0.0', 8000), SimpleHTTPRequestHandler)
httpd.messagequeue = shared_messagequeue
thread = Thread(target = message_collector, args = (shared_messagequeue, ))
thread.start()
httpd.serve_forever() httpd.serve_forever()