Compare commits

..

6 commits

Author SHA1 Message Date
redfast00
f904752fbe
Add Tab before every message 2021-11-08 23:47:06 +01:00
redfast00
ce24054458
YEET 2020-03-10 03:22:44 +01:00
redfast00
c0931a8087
Flush at better times 2020-01-22 20:19:34 +01:00
redfast00
49408b075b
Replace single-threaded model with producer/consumer multithreaded model 2020-01-20 19:00:50 +01:00
redfast00
63dd730986
Change to enabled duration 2019-10-01 01:23:59 +02:00
redfast00
54c84edde2
Enable beep again 2019-10-01 00:33:09 +02:00

140
server.py
View file

@ -7,21 +7,20 @@ from http.server import HTTPServer, BaseHTTPRequestHandler
from io import BytesIO from io import BytesIO
import string import string
import base64 import base64
import queue from queue import Queue
from threading import Thread from threading import Thread
import subprocess
NULL_CHAR = chr(0) NULL_CHAR = chr(0)
file = open('/dev/hidg0', 'rb+')
alphabet_lower = set(string.ascii_lowercase)
alphabet_upper = set(string.ascii_uppercase)
message_queue = queue.SimpleQueue class KeyboardHandler:
thread = Thread(target = writer) def __init__(self, hidfile='/dev/hidg0'):
thread.start() # Open keyboard device for binary appending
self.file = open(hidfile, 'ba')
colors = { colors = {
"Z": "a", # Black "Z": "a", # Black
"B": "b", # Blue "B": "b", # Blue
"G": "c", # Green "G": "c", # Green
@ -38,9 +37,9 @@ colors = {
"p": "n", # Light/blink Pink "p": "n", # Light/blink Pink
"y": "o", # Light/blink Yellow "y": "o", # Light/blink Yellow
"w": "p" # Light/blink White "w": "p" # Light/blink White
} }
special_chars = { special_chars = {
' ': [NULL_CHAR, chr(44)], ' ': [NULL_CHAR, chr(44)],
'\n': [NULL_CHAR, chr(40)], '\n': [NULL_CHAR, chr(40)],
'\\': [NULL_CHAR, chr(0x31)], '\\': [NULL_CHAR, chr(0x31)],
@ -71,97 +70,112 @@ special_chars = {
'}': [chr(32), chr(0x30)], '}': [chr(32), chr(0x30)],
'|': [chr(32), chr(0x31)], '|': [chr(32), chr(0x31)],
'~': [chr(32), chr(0x35)], '~': [chr(32), chr(0x35)],
} }
alphabet_lower = set(string.ascii_lowercase)
alphabet_upper = set(string.ascii_uppercase)
def writer() { def write_report(self, report):
while true: self.file.write(report.encode())
if not message_queue.empty():
message = message_queue.get()
printstring(message)
}
def write_report(report):
file.write(report.encode())
def release_keys(): def release_keys(self):
write_report(NULL_CHAR * 8) self.write_report(NULL_CHAR * 8)
def printchar(self, c):
def printchar(c): if c in self.alphabet_lower:
if c in alphabet_lower: self.write_report(NULL_CHAR * 2 + chr(4 + ord(c) - ord('a')) + NULL_CHAR * 5)
write_report(NULL_CHAR * 2 + chr(4 + ord(c) - ord('a')) + NULL_CHAR * 5) elif c in self.alphabet_upper:
elif c in alphabet_upper: self.write_report(chr(32) + NULL_CHAR + chr(4 + ord(c) - ord('A')) + NULL_CHAR * 5)
write_report(chr(32) + NULL_CHAR + chr(4 + ord(c) - ord('A')) + NULL_CHAR * 5)
elif c.isdigit(): elif c.isdigit():
write_report(NULL_CHAR * 2 + chr(0x1e + ((ord(c) - ord('0') - 1) % 10)) + NULL_CHAR * 5) self.write_report(NULL_CHAR * 2 + chr(0x1e + ((ord(c) - ord('0') - 1) % 10)) + NULL_CHAR * 5)
elif c in special_chars: elif c in self.special_chars:
write_report(special_chars[c][0] + NULL_CHAR + special_chars[c][1] + NULL_CHAR * 5) self.write_report(self.special_chars[c][0] + NULL_CHAR + self.special_chars[c][1] + NULL_CHAR * 5)
else: else:
write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5) self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
release_keys() self.release_keys()
def change_color(background, foreground): def change_color(self, background, foreground):
if background in colors and foreground in colors: if background in self.colors and foreground in self.colors:
# DELETE keypress # DELETE keypress
write_report(NULL_CHAR * 2 + chr(0x2a) + NULL_CHAR * 5) self.write_report(NULL_CHAR * 2 + chr(0x2a) + NULL_CHAR * 5)
release_keys() self.release_keys()
write_report(NULL_CHAR * 2 + chr(4 + ord(colors[background]) - ord('a')) + NULL_CHAR * 5) self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[background]) - ord('a')) + NULL_CHAR * 5)
release_keys() self.release_keys()
write_report(NULL_CHAR * 2 + chr(4 + ord(colors[foreground]) - ord('a')) + NULL_CHAR * 5) self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[foreground]) - ord('a')) + NULL_CHAR * 5)
else: else:
print('Malformed Color code: §' + background + foreground) print('Malformed Color code: §' + background + foreground)
write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5) self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
release_keys() self.release_keys()
def printstring(s): def printstring(self, s):
last_c = '' last_c = ''
last_last_c = '' last_last_c = ''
# send Tab-key, to indicate this is sent by machine, not by human
self.write_report(NULL_CHAR * 2 + chr(0x2b) + NULL_CHAR*5)
self.release_keys()
for c in s: for c in s:
if last_last_c == '§': if last_last_c == '§':
change_color(last_c, c) self.change_color(last_c, c)
elif c != '§' and last_c != '§': elif c != '§' and last_c != '§':
printchar(c) self.printchar(c)
last_c, last_last_c = c, last_c last_c, last_last_c = c, last_c
def reset_color(): def reset_color(self):
change_color('Z', 'g') self.change_color('Z', 'g')
release_keys() self.release_keys()
def beep(): def beep(self, code='fff'):
write_report(NULL_CHAR * 2 + chr(0x29) + 'ff' + NULL_CHAR * 3) self.write_report(NULL_CHAR * 2 + chr(0x29) + NULL_CHAR * 5)
release_keys() self.release_keys()
print('beep') for char in code:
self.printchar(char)
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def __init__(self, request, client_address, server):
self.messagequeue = server.messagequeue
super().__init__(request, client_address, server)
def do_GET(self): def do_GET(self):
self.send_response(200) self.send_response(200)
self.end_headers() self.end_headers()
self.wfile.write(b'Hello, world!') self.wfile.write(b'TAp reporting for duty!')
def do_POST(self): def do_POST(self):
print(self.headers)
message = base64.b64decode(self.headers['X-Messages']).decode('utf8') message = base64.b64decode(self.headers['X-Messages']).decode('utf8')
self.send_response(200) self.send_response(200)
self.end_headers() self.end_headers()
response = BytesIO() self.messagequeue.put_nowait(message)
response.write(b'This is POST request. ') self.wfile.write(b'Received message\n')
response.write(b'Received: ')
self.wfile.write(response.getvalue()) def message_collector(messagequeue):
keyboard = KeyboardHandler()
while True:
message = messagequeue.get()
if message == 'reset': if message == 'reset':
message_queue.put('\n' * 32) keyboard.printstring('\n' * 32)
else: else:
message_queue.put(message + '\n') keyboard.printstring(message + '\n')
# beep() args = [str(ord(s)) for s in message if 0 < ord(s) < 255][:20]
reset_color() subprocess.run(["i2cset", "-y", "1", "8", "0"] + args + ['i'])
file.flush() time.sleep(int(len(args) * 1.5))
keyboard.reset_color()
keyboard.file.flush()
if messagequeue.empty():
keyboard.beep()
keyboard.file.flush()
shared_messagequeue = Queue()
httpd = HTTPServer(('0.0.0.0', 8000), SimpleHTTPRequestHandler) httpd = HTTPServer(('0.0.0.0', 8000), SimpleHTTPRequestHandler)
httpd.messagequeue = shared_messagequeue
thread = Thread(target = message_collector, args = (shared_messagequeue, ))
thread.start()
httpd.serve_forever() httpd.serve_forever()