#!/usr/bin/env python3 # -*- coding: utf-8 -*- import datetime import time import string from http.server import HTTPServer, BaseHTTPRequestHandler from io import BytesIO import string import base64 from queue import Queue from threading import Thread import subprocess NULL_CHAR = chr(0) class KeyboardHandler: def __init__(self, hidfile='/dev/hidg0'): # Open keyboard device for binary appending self.file = open(hidfile, 'ba') colors = { "Z": "a", # Black "B": "b", # Blue "G": "c", # Green "M": "d", # Magenta "R": "e", # Red "P": "f", # Pink "Y": "g", # Yellow "W": "h", # White "z": "i", # Light/blink Black "b": "j", # Light/blink Blue "g": "k", # Light/blink Green "m": "l", # Light/blink Magenta "r": "m", # Light/blink Red "p": "n", # Light/blink Pink "y": "o", # Light/blink Yellow "w": "p" # Light/blink White } special_chars = { ' ': [NULL_CHAR, chr(44)], '\n': [NULL_CHAR, chr(40)], '\\': [NULL_CHAR, chr(0x31)], '.': [NULL_CHAR, chr(0x37)], '/': [NULL_CHAR, chr(0x38)], ';': [NULL_CHAR, chr(0x33)], '-': [NULL_CHAR, chr(0x2d)], '=': [NULL_CHAR, chr(0x2e)], '[': [NULL_CHAR, chr(0x2f)], ']': [NULL_CHAR, chr(0x30)], '*': [NULL_CHAR, chr(0x55)], ',': [NULL_CHAR, chr(0x36)], '!': [chr(32), chr(0x1e)], '<': [chr(32), chr(0x36)], '>': [chr(32), chr(0x37)], '?': [chr(32), chr(0x38)], ':': [chr(32), chr(0x33)], '(': [chr(32), chr(0x26)], ')': [chr(32), chr(0x27)], '&': [chr(32), chr(0x24)], '%': [chr(32), chr(0x22)], '#': [chr(32), chr(0x20)], '@': [chr(32), chr(0x1f)], '$': [chr(32), chr(0x21)], '_': [chr(32), chr(0x2d)], '+': [chr(32), chr(0x2e)], '{': [chr(32), chr(0x2f)], '}': [chr(32), chr(0x30)], '|': [chr(32), chr(0x31)], '~': [chr(32), chr(0x35)], } alphabet_lower = set(string.ascii_lowercase) alphabet_upper = set(string.ascii_uppercase) def write_report(self, report): self.file.write(report.encode()) def release_keys(self): self.write_report(NULL_CHAR * 8) def printchar(self, c): if c in self.alphabet_lower: self.write_report(NULL_CHAR * 2 + chr(4 + ord(c) - ord('a')) + NULL_CHAR * 5) elif c in self.alphabet_upper: self.write_report(chr(32) + NULL_CHAR + chr(4 + ord(c) - ord('A')) + NULL_CHAR * 5) elif c.isdigit(): self.write_report(NULL_CHAR * 2 + chr(0x1e + ((ord(c) - ord('0') - 1) % 10)) + NULL_CHAR * 5) elif c in self.special_chars: self.write_report(self.special_chars[c][0] + NULL_CHAR + self.special_chars[c][1] + NULL_CHAR * 5) else: self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5) self.release_keys() def change_color(self, background, foreground): if background in self.colors and foreground in self.colors: # DELETE keypress self.write_report(NULL_CHAR * 2 + chr(0x2a) + NULL_CHAR * 5) self.release_keys() self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[background]) - ord('a')) + NULL_CHAR * 5) self.release_keys() self.write_report(NULL_CHAR * 2 + chr(4 + ord(self.colors[foreground]) - ord('a')) + NULL_CHAR * 5) else: print('Malformed Color code: §' + background + foreground) self.write_report(chr(32) + NULL_CHAR + chr(0x38) + NULL_CHAR * 5) self.release_keys() def printstring(self, s): last_c = '' last_last_c = '' # send Tab-key, to indicate this is sent by machine, not by human self.write_report(NULL_CHAR * 2 + chr(0x2b) + NULL_CHAR*5) self.release_keys() for c in s: if last_last_c == '§': self.change_color(last_c, c) elif c != '§' and last_c != '§': self.printchar(c) last_c, last_last_c = c, last_c def reset_color(self): self.change_color('Z', 'g') self.release_keys() def beep(self, code='fff'): self.write_report(NULL_CHAR * 2 + chr(0x29) + NULL_CHAR * 5) self.release_keys() for char in code: self.printchar(char) class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): def __init__(self, request, client_address, server): self.messagequeue = server.messagequeue super().__init__(request, client_address, server) def do_GET(self): self.send_response(200) self.end_headers() self.wfile.write(b'TAp reporting for duty!') def do_POST(self): message = base64.b64decode(self.headers['X-Messages']).decode('utf8') self.send_response(200) self.end_headers() self.messagequeue.put_nowait(message) self.wfile.write(b'Received message\n') def message_collector(messagequeue): keyboard = KeyboardHandler() while True: message = messagequeue.get() if message == 'reset': keyboard.printstring('\n' * 32) else: keyboard.printstring(message + '\n') args = [str(ord(s)) for s in message if 0 < ord(s) < 255][:20] subprocess.run(["i2cset", "-y", "1", "8", "0"] + args + ['i']) time.sleep(int(len(args) * 1.5)) keyboard.reset_color() keyboard.file.flush() if messagequeue.empty(): keyboard.beep() keyboard.file.flush() shared_messagequeue = Queue() httpd = HTTPServer(('0.0.0.0', 8000), SimpleHTTPRequestHandler) httpd.messagequeue = shared_messagequeue thread = Thread(target = message_collector, args = (shared_messagequeue, )) thread.start() httpd.serve_forever()