Compare commits

..

6 commits

Author SHA1 Message Date
redfast00
f904752fbe
Add Tab before every message 2021-11-08 23:47:06 +01:00
redfast00
ce24054458
YEET 2020-03-10 03:22:44 +01:00
redfast00
c0931a8087
Flush at better times 2020-01-22 20:19:34 +01:00
redfast00
49408b075b
Replace single-threaded model with producer/consumer multithreaded model 2020-01-20 19:00:50 +01:00
redfast00
63dd730986
Change to enabled duration 2019-10-01 01:23:59 +02:00
redfast00
54c84edde2
Enable beep again 2019-10-01 00:33:09 +02:00

132
server.py
View file

@ -7,19 +7,18 @@ from http.server import HTTPServer, BaseHTTPRequestHandler
from io import BytesIO
import string
import base64
import queue
from queue import Queue
from threading import Thread
import subprocess
NULL_CHAR = chr(0)
file = open('/dev/hidg0', 'rb+')
alphabet_lower = set(string.ascii_lowercase)
alphabet_upper = set(string.ascii_uppercase)
message_queue = queue.SimpleQueue
class KeyboardHandler:
thread = Thread(target = writer)
thread.start()
def __init__(self, hidfile='/dev/hidg0'):
# Open keyboard device for binary appending
self.file = open(hidfile, 'ba')
colors = {
"Z": "a", # Black
@ -73,95 +72,110 @@ special_chars = {
'~': [chr(32), chr(0x35)],
}
def writer() {
while true:
if not message_queue.empty():
message = message_queue.get()
printstring(message)
}
def write_report(report):
file.write(report.encode())
alphabet_lower = set(string.ascii_lowercase)
alphabet_upper = set(string.ascii_uppercase)
def release_keys():
write_report(NULL_CHAR * 8)
def write_report(self, report):
self.file.write(report.encode())
def printchar(c):
if c in alphabet_lower:
write_report(NULL_CHAR * 2 + chr(4 + ord(c) - ord('a')) + NULL_CHAR * 5)
elif c in alphabet_upper:
write_report(chr(32) + NULL_CHAR + chr(4 + ord(c) - ord('A')) + NULL_CHAR * 5)
def release_keys(self):
self.write_report(NULL_CHAR * 8)
def printchar(self, c):
if c in self.alphabet_lower:
self.write_report(NULL_CHAR * 2 + chr(4 + ord(c) - ord('a')) + NULL_CHAR * 5)
elif c in self.alphabet_upper:
self.write_report(chr(32) + NULL_CHAR + chr(4 + ord(c) - ord('A')) + NULL_CHAR * 5)
elif c.isdigit():
write_report(NULL_CHAR * 2 + chr(0x1e + ((ord(c) - ord('0') - 1) % 10)) + NULL_CHAR * 5)
elif c in special_chars:
write_report(special_chars[c][0] + NULL_CHAR + special_chars[c][1] + NULL_CHAR * 5)
self.write_report(NULL_CHAR * 2 + chr(0x1e + ((ord(c) - ord('0') - 1) % 10)) + NULL_CHAR * 5)
elif c in self.special_chars:
self.write_report(self.special_chars[c][0] + NULL_CHAR + self.special_chars[c][1] + NULL_CHAR * 5)
else:
write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
release_keys()
self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
self.release_keys()
def change_color(background, foreground):
if background in colors and foreground in colors:
def change_color(self, background, foreground):
if background in self.colors and foreground in self.colors:
# DELETE keypress
write_report(NULL_CHAR * 2 + chr(0x2a) + NULL_CHAR * 5)
release_keys()
write_report(NULL_CHAR * 2 + chr(4 + ord(colors[background]) - ord('a')) + NULL_CHAR * 5)
release_keys()
write_report(NULL_CHAR * 2 + chr(4 + ord(colors[foreground]) - ord('a')) + NULL_CHAR * 5)
self.write_report(NULL_CHAR * 2 + chr(0x2a) + NULL_CHAR * 5)
self.release_keys()
self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[background]) - ord('a')) + NULL_CHAR * 5)
self.release_keys()
self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[foreground]) - ord('a')) + NULL_CHAR * 5)
else:
print('Malformed Color code: §' + background + foreground)
write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
release_keys()
self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
self.release_keys()
def printstring(s):
def printstring(self, s):
last_c = ''
last_last_c = ''
# send Tab-key, to indicate this is sent by machine, not by human
self.write_report(NULL_CHAR * 2 + chr(0x2b) + NULL_CHAR*5)
self.release_keys()
for c in s:
if last_last_c == '§':
change_color(last_c, c)
self.change_color(last_c, c)
elif c != '§' and last_c != '§':
printchar(c)
self.printchar(c)
last_c, last_last_c = c, last_c
def reset_color():
change_color('Z', 'g')
release_keys()
def reset_color(self):
self.change_color('Z', 'g')
self.release_keys()
def beep():
write_report(NULL_CHAR * 2 + chr(0x29) + 'ff' + NULL_CHAR * 3)
release_keys()
print('beep')
def beep(self, code='fff'):
self.write_report(NULL_CHAR * 2 + chr(0x29) + NULL_CHAR * 5)
self.release_keys()
for char in code:
self.printchar(char)
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def __init__(self, request, client_address, server):
self.messagequeue = server.messagequeue
super().__init__(request, client_address, server)
def do_GET(self):
self.send_response(200)
self.end_headers()
self.wfile.write(b'Hello, world!')
self.wfile.write(b'TAp reporting for duty!')
def do_POST(self):
print(self.headers)
message = base64.b64decode(self.headers['X-Messages']).decode('utf8')
self.send_response(200)
self.end_headers()
response = BytesIO()
response.write(b'This is POST request. ')
response.write(b'Received: ')
self.wfile.write(response.getvalue())
self.messagequeue.put_nowait(message)
self.wfile.write(b'Received message\n')
def message_collector(messagequeue):
keyboard = KeyboardHandler()
while True:
message = messagequeue.get()
if message == 'reset':
message_queue.put('\n' * 32)
keyboard.printstring('\n' * 32)
else:
message_queue.put(message + '\n')
# beep()
reset_color()
file.flush()
keyboard.printstring(message + '\n')
args = [str(ord(s)) for s in message if 0 < ord(s) < 255][:20]
subprocess.run(["i2cset", "-y", "1", "8", "0"] + args + ['i'])
time.sleep(int(len(args) * 1.5))
keyboard.reset_color()
keyboard.file.flush()
if messagequeue.empty():
keyboard.beep()
keyboard.file.flush()
shared_messagequeue = Queue()
httpd = HTTPServer(('0.0.0.0', 8000), SimpleHTTPRequestHandler)
httpd.messagequeue = shared_messagequeue
thread = Thread(target = message_collector, args = (shared_messagequeue, ))
thread.start()
httpd.serve_forever()