174 lines
6.5 KiB
Python
174 lines
6.5 KiB
Python
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
import struct
|
|
import enum
|
|
from typing import NamedTuple
|
|
|
|
|
|
@enum.unique
|
|
class ControllerMessageType(enum.Enum):
|
|
ACK = 0
|
|
HELLO = 1
|
|
GAMESTART = 2
|
|
STATE = 3
|
|
SOLVED = 4
|
|
TIMEOUT = 5
|
|
STRIKEOUT = 6
|
|
INFOSTART = 7
|
|
|
|
|
|
class ModuleAddress(NamedTuple):
|
|
module_type: int
|
|
identifier: int
|
|
|
|
def is_puzzle(self):
|
|
return self.module_type == 1
|
|
|
|
def as_binary(self):
|
|
return (self.module_type << 8) | self.identifier
|
|
|
|
|
|
@dataclass
|
|
class Message:
|
|
payload: bytes
|
|
received_from: int
|
|
received_at: datetime
|
|
|
|
@classmethod
|
|
def create_controller_message(cls, controller_message_type, content):
|
|
return cls(bytes([controller_message_type.value]) + content, 0, datetime.now())
|
|
|
|
@classmethod
|
|
def create_controller_infostart(cls, seed):
|
|
assert 0 <= seed <= 0xFFFFFFFF
|
|
return cls.create_controller_message(ControllerMessageType.INFOSTART, struct.pack('>L', seed))
|
|
|
|
@classmethod
|
|
def create_controller_ack(cls, module_address):
|
|
return cls.create_controller_message(ControllerMessageType.ACK, struct.pack('>H', module_address.as_binary()))
|
|
|
|
@classmethod
|
|
def create_controller_hello(cls):
|
|
return cls.create_controller_message(ControllerMessageType.HELLO, b'')
|
|
|
|
@classmethod
|
|
def create_controller_generic_stateupdate(cls, controller_message_type, timeleft, current_strikes, max_strikes, amount_puzzle_modules_left):
|
|
time_left_ms = int(timeleft.total_seconds() * 1000)
|
|
assert 0 <= time_left_ms <= 0xFFFFFFFF
|
|
return cls.create_controller_message(controller_message_type, struct.pack('>LBBB', time_left_ms, current_strikes, max_strikes, amount_puzzle_modules_left))
|
|
|
|
@classmethod
|
|
def create_controller_gamestart(cls, *args):
|
|
return cls.create_controller_generic_stateupdate(ControllerMessageType.GAMESTART, *args)
|
|
|
|
@classmethod
|
|
def create_controller_state(cls, *args):
|
|
return cls.create_controller_generic_stateupdate(ControllerMessageType.STATE, *args)
|
|
|
|
@classmethod
|
|
def create_controller_solved(cls, *args):
|
|
return cls.create_controller_generic_stateupdate(ControllerMessageType.SOLVED, *args)
|
|
|
|
@classmethod
|
|
def create_controller_timeout(cls, *args):
|
|
return cls.create_controller_generic_stateupdate(ControllerMessageType.TIMEOUT, *args)
|
|
|
|
@classmethod
|
|
def create_controller_strikeout(cls, *args):
|
|
return cls.create_controller_generic_stateupdate(ControllerMessageType.STRIKEOUT, *args)
|
|
|
|
def get_puzzle_register(self):
|
|
'''Returns the address of the puzzle if this is a register puzzle message, None otherwise'''
|
|
if self.sender_type() == 1 and self.payload[0] == 0:
|
|
return self.module_address()
|
|
return None
|
|
|
|
def get_puzzle_strike_details(self):
|
|
'''Returns the address and amount of strikes of the puzzle if this is a puzzle strike message, None otherwise'''
|
|
if self.sender_type() == 1 and self.payload[0] == 1:
|
|
return (self.module_address(), self.payload[1])
|
|
return None
|
|
|
|
def get_puzzle_solved(self):
|
|
'''Returns the address of the puzzle if this is a solved puzzle message, None otherwise'''
|
|
if self.sender_type() == 1 and self.payload[0] == 2:
|
|
return self.module_address()
|
|
return None
|
|
|
|
def readable_time(self):
|
|
return self.received_at.strftime('%H:%M:%S')
|
|
|
|
def priority_bit(self):
|
|
return (self.received_from >> 10) & 0b1
|
|
|
|
def sender_type(self):
|
|
return (self.received_from >> 8) & 0b11
|
|
|
|
def sender_id(self):
|
|
return (self.received_from >> 0) & 0b1111_1111
|
|
|
|
def module_address(self):
|
|
return ModuleAddress(self.sender_type(), self.sender_id())
|
|
|
|
@staticmethod
|
|
def human_readable_type(sender_type, sender_id):
|
|
return [('controller' if sender_id == 0 else 'info'), 'puzzle', 'needy', 'RESERVED TYPE'][sender_type]
|
|
|
|
def _parse_state_update(self):
|
|
timeleft = self.payload[1] << 0x18 | self.payload[2] << 0x10 | self.payload[3] << 0x08 | self.payload[4]
|
|
strikes = self.payload[5]
|
|
max_strikes = self.payload[6]
|
|
solved_puzzle_modules = self.payload[7]
|
|
|
|
return f'{timeleft/1000:3.2f} {strikes:02}/{max_strikes:02} [{solved_puzzle_modules:02}]'
|
|
|
|
def parse_message(self):
|
|
sender_type = self.sender_type()
|
|
message_type = self.payload[0]
|
|
try:
|
|
if sender_type == 0b00 and self.sender_id() == 0: # controller
|
|
if message_type == 0:
|
|
return f"ACK {Message.human_readable_type(self.payload[1], self.payload[2])} {self.payload[2]}"
|
|
elif message_type == 1:
|
|
return "HELLO"
|
|
elif message_type == 2:
|
|
return "START " + self._parse_state_update()
|
|
elif message_type == 3:
|
|
return "STATE " + self._parse_state_update()
|
|
elif message_type == 4:
|
|
return "SOLVED " + self._parse_state_update()
|
|
elif message_type == 5:
|
|
return "TIMEOUT " + self._parse_state_update()
|
|
elif message_type == 6:
|
|
return "STRIKEOUT " + self._parse_state_update()
|
|
elif message_type == 7:
|
|
return "INFO START"
|
|
elif sender_type == 0b01: # puzzle
|
|
if message_type == 0:
|
|
return "REGISTER"
|
|
elif message_type == 1:
|
|
return f"STRIKE {self.payload[1]}"
|
|
elif message_type == 2:
|
|
return f"SOLVED"
|
|
elif sender_type == 0b10: # needy
|
|
if message_type == 0:
|
|
return "REGISTER"
|
|
elif message_type == 1:
|
|
return f"STRIKE {self.payload[1]}"
|
|
elif sender_type == 0b00 and self.sender_id() != 0: # info
|
|
if message_type == 0:
|
|
return "FREE INFO MESSAGE"
|
|
|
|
except:
|
|
print("Unexpected error: ", sys.exc_info()[0])
|
|
return "PARSE ERROR"
|
|
|
|
def serialize(self):
|
|
return {
|
|
'time': self.readable_time(),
|
|
'parsed': self.parse_message(),
|
|
'pretty_raw_sender_id': f'{self.priority_bit():01b} {self.sender_type():02b} {self.sender_id():08b}',
|
|
'raw_message': f"{self.payload.hex(' ')}",
|
|
'human_readable_type': Message.human_readable_type(self.sender_type(), self.sender_id()),
|
|
'sender_id': self.sender_id()
|
|
}
|