Extract message class
This commit is contained in:
parent
394b84bdd8
commit
801575954c
2 changed files with 86 additions and 86 deletions
|
@ -7,6 +7,7 @@ import serial
|
|||
import uuid
|
||||
from collections import deque
|
||||
import sys
|
||||
from obus import Message
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
@ -23,91 +24,6 @@ max_message_cache = 200
|
|||
shared_data = SharedData(deque(maxlen=max_message_cache), -1)
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
class Message:
|
||||
payload: bytes
|
||||
received_from: int
|
||||
received_at: datetime
|
||||
internal_id: int
|
||||
|
||||
def readable_time(self):
|
||||
return self.received_at.strftime('%H:%M:%S')
|
||||
|
||||
def priority_bit(self):
|
||||
return (self.received_from >> 10) & 0b1
|
||||
|
||||
def sender_type(self):
|
||||
return (self.received_from >> 8) & 0b11
|
||||
|
||||
def sender_id(self):
|
||||
return (self.received_from >> 0) & 0b1111_1111
|
||||
|
||||
@staticmethod
|
||||
def human_readable_type(sender_type, sender_id):
|
||||
return [('controller' if sender_id == 0 else 'info'), 'puzzle', 'needy', 'RESERVED TYPE'][sender_type]
|
||||
|
||||
def _parse_state_update(self):
|
||||
timeleft = self.payload[1] << 0x18 | self.payload[2] << 0x10 | self.payload[3] << 0x08 | self.payload[4]
|
||||
strikes = self.payload[5]
|
||||
max_strikes = self.payload[6]
|
||||
solved_puzzle_modules = self.payload[7]
|
||||
|
||||
return f'{timeleft/1000:3.2f} {strikes:02}/{max_strikes:02} [{solved_puzzle_modules:02}]'
|
||||
|
||||
def parse_message(self):
|
||||
sender_type = self.sender_type()
|
||||
message_type = self.payload[0]
|
||||
try:
|
||||
if sender_type == 0b00 and self.sender_id() == 0: # controller
|
||||
if message_type == 0:
|
||||
return f"ACK {Message.human_readable_type(self.payload[1], self.payload[2])} {self.payload[2]}"
|
||||
elif message_type == 1:
|
||||
return "HELLO"
|
||||
elif message_type == 2:
|
||||
return "START " + self._parse_state_update()
|
||||
elif message_type == 3:
|
||||
return "STATE " + self._parse_state_update()
|
||||
elif message_type == 4:
|
||||
return "SOLVED " + self._parse_state_update()
|
||||
elif message_type == 5:
|
||||
return "TIMEOUT " + self._parse_state_update()
|
||||
elif message_type == 6:
|
||||
return "STRIKEOUT " + self._parse_state_update()
|
||||
elif message_type == 7:
|
||||
return "INFO START"
|
||||
elif sender_type == 0b01: # puzzle
|
||||
if message_type == 0:
|
||||
return "REGISTER"
|
||||
elif message_type == 1:
|
||||
return f"STRIKE {self.payload[1]}"
|
||||
elif message_type == 2:
|
||||
return f"SOLVED"
|
||||
elif sender_type == 0b10: # needy
|
||||
if message_type == 0:
|
||||
return "REGISTER"
|
||||
elif message_type == 1:
|
||||
return f"STRIKE {self.payload[1]}"
|
||||
elif sender_type == 0b00 and self.sender_id() != 0: # info
|
||||
if message_type == 0:
|
||||
return "FREE INFO MESSAGE"
|
||||
|
||||
except:
|
||||
print("Unexpected error: ", sys.exc_info()[0])
|
||||
return "PARSE ERROR"
|
||||
|
||||
def serialize(self):
|
||||
return {
|
||||
'time': self.readable_time(),
|
||||
'parsed': self.parse_message(),
|
||||
'pretty_raw_sender_id': f'{self.priority_bit():01b} {self.sender_type():02b} {self.sender_id():08b}',
|
||||
'raw_message': f"{self.payload.hex(' ')}",
|
||||
'human_readable_type': Message.human_readable_type(self.sender_type(), self.sender_id()),
|
||||
'sender_id': self.sender_id(),
|
||||
'internal_id': self.internal_id
|
||||
}
|
||||
|
||||
|
||||
def serial_reader(shared_data):
|
||||
with serial.Serial('/dev/ttyUSB0', 115200, timeout=0.05) as ser:
|
||||
while True:
|
||||
|
@ -121,7 +37,7 @@ def serial_reader(shared_data):
|
|||
sender = (int(line[0]) << 8) + int(line[1])
|
||||
size = int(line[2])
|
||||
message = line[3:3+size]
|
||||
received = Message(message, sender, datetime.now(), len(shared_data.messages))
|
||||
received = Message(message, sender, datetime.now())
|
||||
shared_data.messages.append(received.serialize())
|
||||
shared_data.last_message_index += 1
|
||||
print(shared_data.last_message_index)
|
||||
|
|
84
python/obus.py
Normal file
84
python/obus.py
Normal file
|
@ -0,0 +1,84 @@
|
|||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
@dataclass
|
||||
class Message:
|
||||
payload: bytes
|
||||
received_from: int
|
||||
received_at: datetime
|
||||
|
||||
def readable_time(self):
|
||||
return self.received_at.strftime('%H:%M:%S')
|
||||
|
||||
def priority_bit(self):
|
||||
return (self.received_from >> 10) & 0b1
|
||||
|
||||
def sender_type(self):
|
||||
return (self.received_from >> 8) & 0b11
|
||||
|
||||
def sender_id(self):
|
||||
return (self.received_from >> 0) & 0b1111_1111
|
||||
|
||||
@staticmethod
|
||||
def human_readable_type(sender_type, sender_id):
|
||||
return [('controller' if sender_id == 0 else 'info'), 'puzzle', 'needy', 'RESERVED TYPE'][sender_type]
|
||||
|
||||
def _parse_state_update(self):
|
||||
timeleft = self.payload[1] << 0x18 | self.payload[2] << 0x10 | self.payload[3] << 0x08 | self.payload[4]
|
||||
strikes = self.payload[5]
|
||||
max_strikes = self.payload[6]
|
||||
solved_puzzle_modules = self.payload[7]
|
||||
|
||||
return f'{timeleft/1000:3.2f} {strikes:02}/{max_strikes:02} [{solved_puzzle_modules:02}]'
|
||||
|
||||
def parse_message(self):
|
||||
sender_type = self.sender_type()
|
||||
message_type = self.payload[0]
|
||||
try:
|
||||
if sender_type == 0b00 and self.sender_id() == 0: # controller
|
||||
if message_type == 0:
|
||||
return f"ACK {Message.human_readable_type(self.payload[1], self.payload[2])} {self.payload[2]}"
|
||||
elif message_type == 1:
|
||||
return "HELLO"
|
||||
elif message_type == 2:
|
||||
return "START " + self._parse_state_update()
|
||||
elif message_type == 3:
|
||||
return "STATE " + self._parse_state_update()
|
||||
elif message_type == 4:
|
||||
return "SOLVED " + self._parse_state_update()
|
||||
elif message_type == 5:
|
||||
return "TIMEOUT " + self._parse_state_update()
|
||||
elif message_type == 6:
|
||||
return "STRIKEOUT " + self._parse_state_update()
|
||||
elif message_type == 7:
|
||||
return "INFO START"
|
||||
elif sender_type == 0b01: # puzzle
|
||||
if message_type == 0:
|
||||
return "REGISTER"
|
||||
elif message_type == 1:
|
||||
return f"STRIKE {self.payload[1]}"
|
||||
elif message_type == 2:
|
||||
return f"SOLVED"
|
||||
elif sender_type == 0b10: # needy
|
||||
if message_type == 0:
|
||||
return "REGISTER"
|
||||
elif message_type == 1:
|
||||
return f"STRIKE {self.payload[1]}"
|
||||
elif sender_type == 0b00 and self.sender_id() != 0: # info
|
||||
if message_type == 0:
|
||||
return "FREE INFO MESSAGE"
|
||||
|
||||
except:
|
||||
print("Unexpected error: ", sys.exc_info()[0])
|
||||
return "PARSE ERROR"
|
||||
|
||||
def serialize(self):
|
||||
return {
|
||||
'time': self.readable_time(),
|
||||
'parsed': self.parse_message(),
|
||||
'pretty_raw_sender_id': f'{self.priority_bit():01b} {self.sender_type():02b} {self.sender_id():08b}',
|
||||
'raw_message': f"{self.payload.hex(' ')}",
|
||||
'human_readable_type': Message.human_readable_type(self.sender_type(), self.sender_id()),
|
||||
'sender_id': self.sender_id()
|
||||
}
|
Loading…
Reference in a new issue