Compare commits
1 commit
Author | SHA1 | Date | |
---|---|---|---|
|
33d8f6888d |
1 changed files with 123 additions and 137 deletions
140
server.py
140
server.py
|
@ -7,20 +7,21 @@ from http.server import HTTPServer, BaseHTTPRequestHandler
|
|||
from io import BytesIO
|
||||
import string
|
||||
import base64
|
||||
from queue import Queue
|
||||
import queue
|
||||
from threading import Thread
|
||||
import subprocess
|
||||
|
||||
NULL_CHAR = chr(0)
|
||||
file = open('/dev/hidg0', 'rb+')
|
||||
|
||||
alphabet_lower = set(string.ascii_lowercase)
|
||||
alphabet_upper = set(string.ascii_uppercase)
|
||||
|
||||
class KeyboardHandler:
|
||||
message_queue = queue.SimpleQueue
|
||||
|
||||
def __init__(self, hidfile='/dev/hidg0'):
|
||||
# Open keyboard device for binary appending
|
||||
self.file = open(hidfile, 'ba')
|
||||
thread = Thread(target = writer)
|
||||
thread.start()
|
||||
|
||||
colors = {
|
||||
colors = {
|
||||
"Z": "a", # Black
|
||||
"B": "b", # Blue
|
||||
"G": "c", # Green
|
||||
|
@ -37,9 +38,9 @@ class KeyboardHandler:
|
|||
"p": "n", # Light/blink Pink
|
||||
"y": "o", # Light/blink Yellow
|
||||
"w": "p" # Light/blink White
|
||||
}
|
||||
}
|
||||
|
||||
special_chars = {
|
||||
special_chars = {
|
||||
' ': [NULL_CHAR, chr(44)],
|
||||
'\n': [NULL_CHAR, chr(40)],
|
||||
'\\': [NULL_CHAR, chr(0x31)],
|
||||
|
@ -70,112 +71,97 @@ class KeyboardHandler:
|
|||
'}': [chr(32), chr(0x30)],
|
||||
'|': [chr(32), chr(0x31)],
|
||||
'~': [chr(32), chr(0x35)],
|
||||
}
|
||||
|
||||
alphabet_lower = set(string.ascii_lowercase)
|
||||
alphabet_upper = set(string.ascii_uppercase)
|
||||
}
|
||||
|
||||
|
||||
def write_report(self, report):
|
||||
self.file.write(report.encode())
|
||||
def writer() {
|
||||
while true:
|
||||
if not message_queue.empty():
|
||||
message = message_queue.get()
|
||||
printstring(message)
|
||||
}
|
||||
|
||||
def write_report(report):
|
||||
file.write(report.encode())
|
||||
|
||||
|
||||
def release_keys(self):
|
||||
self.write_report(NULL_CHAR * 8)
|
||||
def release_keys():
|
||||
write_report(NULL_CHAR * 8)
|
||||
|
||||
def printchar(self, c):
|
||||
if c in self.alphabet_lower:
|
||||
self.write_report(NULL_CHAR * 2 + chr(4 + ord(c) - ord('a')) + NULL_CHAR * 5)
|
||||
elif c in self.alphabet_upper:
|
||||
self.write_report(chr(32) + NULL_CHAR + chr(4 + ord(c) - ord('A')) + NULL_CHAR * 5)
|
||||
|
||||
def printchar(c):
|
||||
if c in alphabet_lower:
|
||||
write_report(NULL_CHAR * 2 + chr(4 + ord(c) - ord('a')) + NULL_CHAR * 5)
|
||||
elif c in alphabet_upper:
|
||||
write_report(chr(32) + NULL_CHAR + chr(4 + ord(c) - ord('A')) + NULL_CHAR * 5)
|
||||
elif c.isdigit():
|
||||
self.write_report(NULL_CHAR * 2 + chr(0x1e + ((ord(c) - ord('0') - 1) % 10)) + NULL_CHAR * 5)
|
||||
elif c in self.special_chars:
|
||||
self.write_report(self.special_chars[c][0] + NULL_CHAR + self.special_chars[c][1] + NULL_CHAR * 5)
|
||||
write_report(NULL_CHAR * 2 + chr(0x1e + ((ord(c) - ord('0') - 1) % 10)) + NULL_CHAR * 5)
|
||||
elif c in special_chars:
|
||||
write_report(special_chars[c][0] + NULL_CHAR + special_chars[c][1] + NULL_CHAR * 5)
|
||||
else:
|
||||
self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
|
||||
self.release_keys()
|
||||
write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
|
||||
release_keys()
|
||||
|
||||
|
||||
def change_color(self, background, foreground):
|
||||
if background in self.colors and foreground in self.colors:
|
||||
def change_color(background, foreground):
|
||||
if background in colors and foreground in colors:
|
||||
# DELETE keypress
|
||||
self.write_report(NULL_CHAR * 2 + chr(0x2a) + NULL_CHAR * 5)
|
||||
self.release_keys()
|
||||
self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[background]) - ord('a')) + NULL_CHAR * 5)
|
||||
self.release_keys()
|
||||
self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[foreground]) - ord('a')) + NULL_CHAR * 5)
|
||||
write_report(NULL_CHAR * 2 + chr(0x2a) + NULL_CHAR * 5)
|
||||
release_keys()
|
||||
write_report(NULL_CHAR * 2 + chr(4 + ord(colors[background]) - ord('a')) + NULL_CHAR * 5)
|
||||
release_keys()
|
||||
write_report(NULL_CHAR * 2 + chr(4 + ord(colors[foreground]) - ord('a')) + NULL_CHAR * 5)
|
||||
else:
|
||||
print('Malformed Color code: §' + background + foreground)
|
||||
self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
|
||||
self.release_keys()
|
||||
write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5)
|
||||
release_keys()
|
||||
|
||||
|
||||
def printstring(self, s):
|
||||
def printstring(s):
|
||||
last_c = ''
|
||||
last_last_c = ''
|
||||
# send Tab-key, to indicate this is sent by machine, not by human
|
||||
self.write_report(NULL_CHAR * 2 + chr(0x2b) + NULL_CHAR*5)
|
||||
self.release_keys()
|
||||
for c in s:
|
||||
if last_last_c == '§':
|
||||
self.change_color(last_c, c)
|
||||
change_color(last_c, c)
|
||||
elif c != '§' and last_c != '§':
|
||||
self.printchar(c)
|
||||
printchar(c)
|
||||
last_c, last_last_c = c, last_c
|
||||
|
||||
|
||||
def reset_color(self):
|
||||
self.change_color('Z', 'g')
|
||||
self.release_keys()
|
||||
def reset_color():
|
||||
change_color('Z', 'g')
|
||||
release_keys()
|
||||
|
||||
def beep(self, code='fff'):
|
||||
self.write_report(NULL_CHAR * 2 + chr(0x29) + NULL_CHAR * 5)
|
||||
self.release_keys()
|
||||
for char in code:
|
||||
self.printchar(char)
|
||||
def beep():
|
||||
write_report(NULL_CHAR * 2 + chr(0x29) + 'ff' + NULL_CHAR * 3)
|
||||
release_keys()
|
||||
print('beep')
|
||||
|
||||
|
||||
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
|
||||
|
||||
def __init__(self, request, client_address, server):
|
||||
self.messagequeue = server.messagequeue
|
||||
super().__init__(request, client_address, server)
|
||||
|
||||
def do_GET(self):
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
self.wfile.write(b'TAp reporting for duty!')
|
||||
self.wfile.write(b'Hello, world!')
|
||||
|
||||
def do_POST(self):
|
||||
print(self.headers)
|
||||
message = base64.b64decode(self.headers['X-Messages']).decode('utf8')
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
self.messagequeue.put_nowait(message)
|
||||
self.wfile.write(b'Received message\n')
|
||||
|
||||
def message_collector(messagequeue):
|
||||
keyboard = KeyboardHandler()
|
||||
while True:
|
||||
message = messagequeue.get()
|
||||
response = BytesIO()
|
||||
response.write(b'This is POST request. ')
|
||||
response.write(b'Received: ')
|
||||
self.wfile.write(response.getvalue())
|
||||
if message == 'reset':
|
||||
keyboard.printstring('\n' * 32)
|
||||
message_queue.put('\n' * 32)
|
||||
else:
|
||||
keyboard.printstring(message + '\n')
|
||||
args = [str(ord(s)) for s in message if 0 < ord(s) < 255][:20]
|
||||
subprocess.run(["i2cset", "-y", "1", "8", "0"] + args + ['i'])
|
||||
time.sleep(int(len(args) * 1.5))
|
||||
keyboard.reset_color()
|
||||
keyboard.file.flush()
|
||||
if messagequeue.empty():
|
||||
keyboard.beep()
|
||||
keyboard.file.flush()
|
||||
message_queue.put(message + '\n')
|
||||
# beep()
|
||||
reset_color()
|
||||
file.flush()
|
||||
|
||||
|
||||
|
||||
shared_messagequeue = Queue()
|
||||
httpd = HTTPServer(('0.0.0.0', 8000), SimpleHTTPRequestHandler)
|
||||
httpd.messagequeue = shared_messagequeue
|
||||
thread = Thread(target = message_collector, args = (shared_messagequeue, ))
|
||||
thread.start()
|
||||
httpd.serve_forever()
|
||||
|
|
Loading…
Reference in a new issue