Compare commits

..

1 commit

Author SHA1 Message Date
Hannes Klinckaert
33d8f6888d maybe added threads 2019-10-02 18:18:27 +02:00

132
server.py
View file

@ -7,18 +7,19 @@ from http.server import HTTPServer, BaseHTTPRequestHandler
from io import BytesIO from io import BytesIO
import string import string
import base64 import base64
from queue import Queue import queue
from threading import Thread from threading import Thread
import subprocess
NULL_CHAR = chr(0) NULL_CHAR = chr(0)
file = open('/dev/hidg0', 'rb+')
alphabet_lower = set(string.ascii_lowercase)
alphabet_upper = set(string.ascii_uppercase)
class KeyboardHandler: message_queue = queue.SimpleQueue
def __init__(self, hidfile='/dev/hidg0'): thread = Thread(target = writer)
# Open keyboard device for binary appending thread.start()
self.file = open(hidfile, 'ba')
colors = { colors = {
"Z": "a", # Black "Z": "a", # Black
@ -72,110 +73,95 @@ class KeyboardHandler:
'~': [chr(32), chr(0x35)], '~': [chr(32), chr(0x35)],
} }
alphabet_lower = set(string.ascii_lowercase)
alphabet_upper = set(string.ascii_uppercase) def writer() {
while true:
if not message_queue.empty():
message = message_queue.get()
printstring(message)
}
def write_report(report):
file.write(report.encode())
def write_report(self, report): def release_keys():
self.file.write(report.encode()) write_report(NULL_CHAR * 8)
def release_keys(self): def printchar(c):
self.write_report(NULL_CHAR * 8) if c in alphabet_lower:
write_report(NULL_CHAR * 2 + chr(4 + ord(c) - ord('a')) + NULL_CHAR * 5)
def printchar(self, c): elif c in alphabet_upper:
if c in self.alphabet_lower: write_report(chr(32) + NULL_CHAR + chr(4 + ord(c) - ord('A')) + NULL_CHAR * 5)
self.write_report(NULL_CHAR * 2 + chr(4 + ord(c) - ord('a')) + NULL_CHAR * 5)
elif c in self.alphabet_upper:
self.write_report(chr(32) + NULL_CHAR + chr(4 + ord(c) - ord('A')) + NULL_CHAR * 5)
elif c.isdigit(): elif c.isdigit():
self.write_report(NULL_CHAR * 2 + chr(0x1e + ((ord(c) - ord('0') - 1) % 10)) + NULL_CHAR * 5) write_report(NULL_CHAR * 2 + chr(0x1e + ((ord(c) - ord('0') - 1) % 10)) + NULL_CHAR * 5)
elif c in self.special_chars: elif c in special_chars:
self.write_report(self.special_chars[c][0] + NULL_CHAR + self.special_chars[c][1] + NULL_CHAR * 5) write_report(special_chars[c][0] + NULL_CHAR + special_chars[c][1] + NULL_CHAR * 5)
else: else:
self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5) write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
self.release_keys() release_keys()
def change_color(self, background, foreground): def change_color(background, foreground):
if background in self.colors and foreground in self.colors: if background in colors and foreground in colors:
# DELETE keypress # DELETE keypress
self.write_report(NULL_CHAR * 2 + chr(0x2a) + NULL_CHAR * 5) write_report(NULL_CHAR * 2 + chr(0x2a) + NULL_CHAR * 5)
self.release_keys() release_keys()
self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[background]) - ord('a')) + NULL_CHAR * 5) write_report(NULL_CHAR * 2 + chr(4 + ord(colors[background]) - ord('a')) + NULL_CHAR * 5)
self.release_keys() release_keys()
self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[foreground]) - ord('a')) + NULL_CHAR * 5) write_report(NULL_CHAR * 2 + chr(4 + ord(colors[foreground]) - ord('a')) + NULL_CHAR * 5)
else: else:
print('Malformed Color code: §' + background + foreground) print('Malformed Color code: §' + background + foreground)
self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5) write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
self.release_keys() release_keys()
def printstring(self, s): def printstring(s):
last_c = '' last_c = ''
last_last_c = '' last_last_c = ''
# send Tab-key, to indicate this is sent by machine, not by human
self.write_report(NULL_CHAR * 2 + chr(0x2b) + NULL_CHAR*5)
self.release_keys()
for c in s: for c in s:
if last_last_c == '§': if last_last_c == '§':
self.change_color(last_c, c) change_color(last_c, c)
elif c != '§' and last_c != '§': elif c != '§' and last_c != '§':
self.printchar(c) printchar(c)
last_c, last_last_c = c, last_c last_c, last_last_c = c, last_c
def reset_color(self): def reset_color():
self.change_color('Z', 'g') change_color('Z', 'g')
self.release_keys() release_keys()
def beep(self, code='fff'): def beep():
self.write_report(NULL_CHAR * 2 + chr(0x29) + NULL_CHAR * 5) write_report(NULL_CHAR * 2 + chr(0x29) + 'ff' + NULL_CHAR * 3)
self.release_keys() release_keys()
for char in code: print('beep')
self.printchar(char)
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def __init__(self, request, client_address, server):
self.messagequeue = server.messagequeue
super().__init__(request, client_address, server)
def do_GET(self): def do_GET(self):
self.send_response(200) self.send_response(200)
self.end_headers() self.end_headers()
self.wfile.write(b'TAp reporting for duty!') self.wfile.write(b'Hello, world!')
def do_POST(self): def do_POST(self):
print(self.headers)
message = base64.b64decode(self.headers['X-Messages']).decode('utf8') message = base64.b64decode(self.headers['X-Messages']).decode('utf8')
self.send_response(200) self.send_response(200)
self.end_headers() self.end_headers()
self.messagequeue.put_nowait(message) response = BytesIO()
self.wfile.write(b'Received message\n') response.write(b'This is POST request. ')
response.write(b'Received: ')
def message_collector(messagequeue): self.wfile.write(response.getvalue())
keyboard = KeyboardHandler()
while True:
message = messagequeue.get()
if message == 'reset': if message == 'reset':
keyboard.printstring('\n' * 32) message_queue.put('\n' * 32)
else: else:
keyboard.printstring(message + '\n') message_queue.put(message + '\n')
args = [str(ord(s)) for s in message if 0 < ord(s) < 255][:20] # beep()
subprocess.run(["i2cset", "-y", "1", "8", "0"] + args + ['i']) reset_color()
time.sleep(int(len(args) * 1.5)) file.flush()
keyboard.reset_color()
keyboard.file.flush()
if messagequeue.empty():
keyboard.beep()
keyboard.file.flush()
shared_messagequeue = Queue()
httpd = HTTPServer(('0.0.0.0', 8000), SimpleHTTPRequestHandler) httpd = HTTPServer(('0.0.0.0', 8000), SimpleHTTPRequestHandler)
httpd.messagequeue = shared_messagequeue
thread = Thread(target = message_collector, args = (shared_messagequeue, ))
thread.start()
httpd.serve_forever() httpd.serve_forever()