obus/debugging_tool/server.py

135 lines
4.6 KiB
Python
Raw Normal View History

2020-08-16 02:16:36 +00:00
from threading import Thread
from flask import Flask, jsonify, send_file
from time import sleep
from dataclasses import dataclass
from datetime import datetime
2020-08-19 00:10:30 +00:00
import serial
import uuid
from collections import deque
2020-08-16 02:16:36 +00:00
app = Flask(__name__)
2020-09-09 17:52:23 +00:00
server_id = uuid.uuid4()
print("Server ID: ", server_id)
@dataclass
class SharedData:
messages: deque
last_message_index: int
# Keep this the same as max_messages on the client!
max_message_cache = 200
2020-09-09 17:52:23 +00:00
shared_data = SharedData(deque(maxlen=max_message_cache), -1)
2020-08-16 02:16:36 +00:00
@dataclass
class Message:
payload: bytes
received_from: int
received_at: datetime
internal_id: int
def readable_time(self):
return self.received_at.strftime('%H:%M:%S')
def priority_bit(self):
return (self.received_from >> 10) & 0b1
def sender_type(self):
return (self.received_from >> 8) & 0b11
def sender_id(self):
return (self.received_from >> 0) & 0b1111_1111
def human_readable_type(self):
return ['controller', 'puzzle', 'needy', 'RESERVED TYPE'][self.sender_type()]
def _parse_state_update(self):
timeleft = self.payload[1] << 0x18 | self.payload[2] << 0x10 | self.payload[3] << 0x08 | self.payload[4]
strikes = self.payload[5]
max_strikes = self.payload[6]
return f'{timeleft/1000:3.2f} {strikes:02}/{max_strikes:02}'
def parse_message(self):
sender_type = self.sender_type()
message_type = self.payload[0]
2020-09-08 16:52:38 +00:00
try:
if sender_type == 0b00: # controller
if message_type == 0:
return "ACK"
elif message_type == 1:
return "HELLO"
elif message_type == 2:
return "START " + self._parse_state_update()
elif message_type == 3:
return "STATE " + self._parse_state_update()
elif message_type == 4:
return "SOLVED " + self._parse_state_update()
elif message_type == 5:
return "TIMEOUT " + self._parse_state_update()
elif message_type == 6:
return "STRIKEOUT " + self._parse_state_update()
elif sender_type == 0b01: # puzzle
if message_type == 0:
return "REGISTER"
elif message_type == 1:
return f"STRIKE {self.payload[1]}"
elif message_type == 2:
return f"SOLVED"
except:
print("Unexpected error: ", sys.exc_info()[0])
2020-09-08 16:52:38 +00:00
return "PARSE ERROR"
2020-08-16 02:16:36 +00:00
def serialize(self):
return {
'time': self.readable_time(),
'parsed': self.parse_message(),
'pretty_raw_sender_id': f'{self.priority_bit():01b} {self.sender_type():02b} {self.sender_id():08b}',
'raw_message': f"{self.payload.hex(' ')}",
'human_readable_type': self.human_readable_type(),
'sender_id': self.sender_id(),
'internal_id': self.internal_id
}
2020-09-09 17:52:23 +00:00
def serial_reader(shared_data):
2020-09-08 16:57:03 +00:00
with serial.Serial('/dev/ttyACM0', 115200, timeout=10) as ser:
while True:
line = ser.readline()
print(line.decode('ascii'))
if line.startswith(b"message"):
line = line.decode('ascii')
line = line.strip()
parts = line.split(' ')
sender = int(parts[1])
message = bytes(int(p) for p in parts[2:])
2020-09-09 17:52:23 +00:00
received = Message(message, sender, datetime.now(), len(shared_data.messages))
shared_data.messages.append(received.serialize())
shared_data.last_message_index += 1
print(shared_data.last_message_index)
print("READER = ", shared_data)
2020-08-16 02:16:36 +00:00
@app.route('/')
def index():
return send_file('static/index.html')
@app.route('/<last_received>/api.json')
def api(last_received):
last_received = int(last_received)
2020-09-09 17:52:23 +00:00
print("REQUEST = ", shared_data)
if last_received < shared_data.last_message_index - max_message_cache:
return jsonify({"server_id": server_id, "newest_msg": last_message_index, "messages": list(shared_data.messages)})
else:
2020-09-09 17:52:23 +00:00
return jsonify({"server_id": server_id, "newest_msg": shared_data.last_message_index, "messages": list(shared_data.messages)[max_message_cache - (shared_data.last_message_index - last_received):]})
2020-08-16 02:16:36 +00:00
@app.route('/max_messages.json')
def get_max_messages():
return jsonify([max_message_cache])
2020-08-16 02:16:36 +00:00
if __name__ == '__main__':
2020-09-09 17:52:23 +00:00
thread = Thread(target=serial_reader, args=(shared_data, ))
2020-08-16 02:16:36 +00:00
thread.start()
app.run(debug=False, host='0.0.0.0')