frame-it/listener.py

65 lines
1.9 KiB
Python
Raw Normal View History

2020-01-21 03:27:35 +01:00
import datetime
import socket
import sys
from typing import Optional
from utils import mac_address, broadcast_address
class Frame:
def __init__(self,
destination: bytes,
protocol: bytes,
message: str,
source: bytes = mac_address):
self.destination: bytes = destination
self.source: bytes = source
self.protocol: bytes = protocol
self.message: str = message
def decode_packet(packet_bytes: bytes) -> Optional[Frame]:
protocol: bytes = packet_bytes[12:14]
if protocol != b"\x60\x00":
return None
try:
destination = packet_bytes[0:6]
source = packet_bytes[6:12]
message = packet_bytes[14:].decode("utf-8").rstrip("\x00")
return Frame(destination=destination,
source=source,
protocol=protocol,
message=message)
except:
# print("[Error] Could not decode message.")
return None
if __name__ == "__main__":
print("Starting listener...")
sys.stdout.flush()
connection = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(3))
while True:
byte_string, addr = connection.recvfrom(65536)
frame = decode_packet(byte_string)
if frame is not None \
and (frame.destination == mac_address or frame.destination == broadcast_address):
dt_string = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
print("----")
print("Revcd from {} to {} @{}".format(frame.source, frame.destination, dt_string))
if frame.message == "ZeusWPI is de max!":
print("Congratulations, packet received correctly!")
else:
print("Wrong message, please check that you are sending \"ZeusWPI is de max!\"")
sys.stdout.flush()