diff --git a/python/debugserver.py b/python/debugserver.py index 0428e2a..ce01074 100644 --- a/python/debugserver.py +++ b/python/debugserver.py @@ -7,6 +7,7 @@ import serial import uuid from collections import deque import sys +from obus import Message app = Flask(__name__) @@ -23,91 +24,6 @@ max_message_cache = 200 shared_data = SharedData(deque(maxlen=max_message_cache), -1) - -@dataclass -class Message: - payload: bytes - received_from: int - received_at: datetime - internal_id: int - - def readable_time(self): - return self.received_at.strftime('%H:%M:%S') - - def priority_bit(self): - return (self.received_from >> 10) & 0b1 - - def sender_type(self): - return (self.received_from >> 8) & 0b11 - - def sender_id(self): - return (self.received_from >> 0) & 0b1111_1111 - - @staticmethod - def human_readable_type(sender_type, sender_id): - return [('controller' if sender_id == 0 else 'info'), 'puzzle', 'needy', 'RESERVED TYPE'][sender_type] - - def _parse_state_update(self): - timeleft = self.payload[1] << 0x18 | self.payload[2] << 0x10 | self.payload[3] << 0x08 | self.payload[4] - strikes = self.payload[5] - max_strikes = self.payload[6] - solved_puzzle_modules = self.payload[7] - - return f'{timeleft/1000:3.2f} {strikes:02}/{max_strikes:02} [{solved_puzzle_modules:02}]' - - def parse_message(self): - sender_type = self.sender_type() - message_type = self.payload[0] - try: - if sender_type == 0b00 and self.sender_id() == 0: # controller - if message_type == 0: - return f"ACK {Message.human_readable_type(self.payload[1], self.payload[2])} {self.payload[2]}" - elif message_type == 1: - return "HELLO" - elif message_type == 2: - return "START " + self._parse_state_update() - elif message_type == 3: - return "STATE " + self._parse_state_update() - elif message_type == 4: - return "SOLVED " + self._parse_state_update() - elif message_type == 5: - return "TIMEOUT " + self._parse_state_update() - elif message_type == 6: - return "STRIKEOUT " + self._parse_state_update() - elif message_type == 7: - return "INFO START" - elif sender_type == 0b01: # puzzle - if message_type == 0: - return "REGISTER" - elif message_type == 1: - return f"STRIKE {self.payload[1]}" - elif message_type == 2: - return f"SOLVED" - elif sender_type == 0b10: # needy - if message_type == 0: - return "REGISTER" - elif message_type == 1: - return f"STRIKE {self.payload[1]}" - elif sender_type == 0b00 and self.sender_id() != 0: # info - if message_type == 0: - return "FREE INFO MESSAGE" - - except: - print("Unexpected error: ", sys.exc_info()[0]) - return "PARSE ERROR" - - def serialize(self): - return { - 'time': self.readable_time(), - 'parsed': self.parse_message(), - 'pretty_raw_sender_id': f'{self.priority_bit():01b} {self.sender_type():02b} {self.sender_id():08b}', - 'raw_message': f"{self.payload.hex(' ')}", - 'human_readable_type': Message.human_readable_type(self.sender_type(), self.sender_id()), - 'sender_id': self.sender_id(), - 'internal_id': self.internal_id - } - - def serial_reader(shared_data): with serial.Serial('/dev/ttyUSB0', 115200, timeout=0.05) as ser: while True: @@ -121,7 +37,7 @@ def serial_reader(shared_data): sender = (int(line[0]) << 8) + int(line[1]) size = int(line[2]) message = line[3:3+size] - received = Message(message, sender, datetime.now(), len(shared_data.messages)) + received = Message(message, sender, datetime.now()) shared_data.messages.append(received.serialize()) shared_data.last_message_index += 1 print(shared_data.last_message_index) diff --git a/python/obus.py b/python/obus.py new file mode 100644 index 0000000..823dd4a --- /dev/null +++ b/python/obus.py @@ -0,0 +1,84 @@ +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class Message: + payload: bytes + received_from: int + received_at: datetime + + def readable_time(self): + return self.received_at.strftime('%H:%M:%S') + + def priority_bit(self): + return (self.received_from >> 10) & 0b1 + + def sender_type(self): + return (self.received_from >> 8) & 0b11 + + def sender_id(self): + return (self.received_from >> 0) & 0b1111_1111 + + @staticmethod + def human_readable_type(sender_type, sender_id): + return [('controller' if sender_id == 0 else 'info'), 'puzzle', 'needy', 'RESERVED TYPE'][sender_type] + + def _parse_state_update(self): + timeleft = self.payload[1] << 0x18 | self.payload[2] << 0x10 | self.payload[3] << 0x08 | self.payload[4] + strikes = self.payload[5] + max_strikes = self.payload[6] + solved_puzzle_modules = self.payload[7] + + return f'{timeleft/1000:3.2f} {strikes:02}/{max_strikes:02} [{solved_puzzle_modules:02}]' + + def parse_message(self): + sender_type = self.sender_type() + message_type = self.payload[0] + try: + if sender_type == 0b00 and self.sender_id() == 0: # controller + if message_type == 0: + return f"ACK {Message.human_readable_type(self.payload[1], self.payload[2])} {self.payload[2]}" + elif message_type == 1: + return "HELLO" + elif message_type == 2: + return "START " + self._parse_state_update() + elif message_type == 3: + return "STATE " + self._parse_state_update() + elif message_type == 4: + return "SOLVED " + self._parse_state_update() + elif message_type == 5: + return "TIMEOUT " + self._parse_state_update() + elif message_type == 6: + return "STRIKEOUT " + self._parse_state_update() + elif message_type == 7: + return "INFO START" + elif sender_type == 0b01: # puzzle + if message_type == 0: + return "REGISTER" + elif message_type == 1: + return f"STRIKE {self.payload[1]}" + elif message_type == 2: + return f"SOLVED" + elif sender_type == 0b10: # needy + if message_type == 0: + return "REGISTER" + elif message_type == 1: + return f"STRIKE {self.payload[1]}" + elif sender_type == 0b00 and self.sender_id() != 0: # info + if message_type == 0: + return "FREE INFO MESSAGE" + + except: + print("Unexpected error: ", sys.exc_info()[0]) + return "PARSE ERROR" + + def serialize(self): + return { + 'time': self.readable_time(), + 'parsed': self.parse_message(), + 'pretty_raw_sender_id': f'{self.priority_bit():01b} {self.sender_type():02b} {self.sender_id():08b}', + 'raw_message': f"{self.payload.hex(' ')}", + 'human_readable_type': Message.human_readable_type(self.sender_type(), self.sender_id()), + 'sender_id': self.sender_id() + }