import datetime import socket import sys from typing import Optional from utils import mac_address, broadcast_address class Frame: def __init__(self, destination: bytes, protocol: bytes, message: str, source: bytes = mac_address): self.destination: bytes = destination self.source: bytes = source self.protocol: bytes = protocol self.message: str = message def decode_packet(packet_bytes: bytes) -> Optional[Frame]: protocol: bytes = packet_bytes[12:14] if protocol != b"\x60\x00": return None try: destination = packet_bytes[0:6] source = packet_bytes[6:12] message = packet_bytes[14:].decode("utf-8").rstrip("\x00") return Frame(destination=destination, source=source, protocol=protocol, message=message) except: # print("[Error] Could not decode message.") return None if __name__ == "__main__": print("Starting listener...") sys.stdout.flush() connection = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(3)) while True: byte_string, addr = connection.recvfrom(65536) frame = decode_packet(byte_string) if frame is not None \ and (frame.destination == mac_address or frame.destination == broadcast_address): dt_string = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S") print("----") print("Revcd from {} to {} @{}".format(frame.source, frame.destination, dt_string)) if frame.message == "ZeusWPI is de max!": print("Congratulations, packet received correctly!") else: print("Wrong message, please check that you are sending \"ZeusWPI is de max!\"") sys.stdout.flush()