mozaic-receptionist/mozaic_receptionist.py

246 lines
6.4 KiB
Python
Executable File

#!/usr/bin/env python3
import re
import json
import threading
import requests
import websocket
import time
def first(iterable, default=None):
for x in iterable:
return x
return default
# WS data to server: https://github.com/iasoon/pw3-moz2/blob/prototype/src/websocket.rs#L27
# WS data from server: https://github.com/iasoon/pw3-moz2/blob/prototype/src/lobby_manager.rs#L244
# HTTP: https://github.com/iasoon/pw3-moz2/blob/prototype/src/main.rs#L451
class Lobby:
"""
@param url_base: domain name and optionally port and path;
e.g. example.org:8880/mozaic, mozaic.example.org
"""
def __init__(self, lobby_id: str, bot_name: str, token: str, url_base: str, tls: bool=True):
assert not re.match(r"^[^:]+://", url_base), "url_base should not include protocol"
assert url_base[-1] != "/", "url_base should not end with /"
self.lobby_id = lobby_id
self.bot_name = bot_name
self.token = token
self.url_base = url_base
self.tls = tls
self.ws = None
self.thread = None
self.own_player_id = None
self.players = None
self.proposals = None
@property
def websocket_url(self):
return ("wss://" if self.tls else "ws://") + self.url_base + "/websocket"
@property
def http_url_base(self):
return ("https://" if self.tls else "http://") + self.url_base + f"/api/lobbies/{self.lobby_id}"
def get_player_by_name(self, name):
return first(p["id"] for p in self.players.values() if p["name"] == name)
def open_websocket(self, on_message, on_error, **kwargs):
def on_open(ws):
ws.send(json.dumps({
"type": "subscribeToLobby",
"lobbyId": self.lobby_id,
}))
ws.send(json.dumps({
"type": "authenticatePlayer",
"lobbyId": self.lobby_id,
"token": self.token,
}))
def internal_on_message(ws, msg):
data = json.loads(msg)
if data["type"] == "lobbyState":
self.players = {
int(player_id): data
for player_id, data in data["data"]["players"].items()
}
self.proposals = data["data"]["proposals"]
elif data["type"] == "playerData":
self.players[data["data"]["id"]] = data["data"]
elif data["type"] == "proposalData":
self.proposals[data["data"]["id"]] = data["data"]
on_message(ws, data)
self.ws = websocket.WebSocketApp(
self.websocket_url,
on_open=on_open, on_message=internal_on_message, on_error=on_error, **kwargs
)
self.thread = threading.Thread(
target=self.ws.run_forever,
daemon=True
)
self.thread.start()
def _post(self, path, payload, auth=True):
headers = {
"user-agent": "mozaicreceptionist/0.0.1"
}
if auth:
headers["authorization"] = f"Bearer {self.token}"
r = requests.post(self.http_url_base + path, headers=headers, json=payload)
r.raise_for_status()
return r
def join(self):
own_player_data = self._post(f"/join", {
"name": self.bot_name,
"token": self.token,
}, auth=False).json()
self.own_player_id = own_player_data["id"]
def create_proposal(self, map_name, max_turns, players):
self._post(f"/proposals", {
"config": { "mapName": map_name, "maxTurns": max_turns },
"players": players
})
def accept_proposal(self, proposal_id):
self._post(f"/proposals/{proposal_id}/accept", {"status": "Accepted"})
def start_proposal(self, proposal_id):
self._post(f"/proposals/{proposal_id}/start", None)
@staticmethod
def from_lobby_url(url, bot_name, token) -> "Lobby":
"""
@param url: e.g. https://mozaic.example.org/lobbies/0123456789abcdef0123456789abcdef
"""
m = re.fullmatch(r"http(s?)://(.+)/lobbies/([^/]+)", url)
if not m:
return ValueError("Unrecognized MOZAIC lobby URL")
tls = m.group(1) == "s"
url_base = m.group(2)
lobby_id = m.group(3)
return Lobby(lobby_id, bot_name, token, url_base, tls)
def main():
import sys
if len(sys.argv) < 4 or sys.argv[1] in ("--help", "-h"):
print("MOZAIC receptionist -- manage MOZAIC matches from the comfort of your terminal",
file=sys.stderr)
print(f"Usage: {sys.argv[0]} <lobby_url> <bot_token> <bot_name>", file=sys.stderr)
return
lobby_url = sys.argv[1]
bot_token = sys.argv[2]
bot_name = " ".join(sys.argv[3:])
lobby = Lobby.from_lobby_url(
lobby_url,
"🚗 " + bot_name,
bot_token
)
lobby.join()
def can_answer(proposal):
return proposal["status"] == "pending" and any(
p["player_id"] == lobby.own_player_id and p["status"] == "Unanswered"
for p in proposal["players"]
)
def owner_participates(proposal):
owner_id = proposal["owner_id"]
return owner_id == lobby.own_player_id or any(
(p["player_id"] == owner_id and p["status"] == "Accepted")
for p in proposal["players"]
)
def accept_if_possible_and_owner_participates(proposal):
if can_answer(proposal) and owner_participates(proposal):
proposal_id = proposal["id"]
owner_name = lobby.players[proposal["owner_id"]]["name"]
lobby.accept_proposal(proposal_id)
print(f"[Accepted proposal from {owner_name}]")
def start_match_if_possible(proposal):
if lobby.own_player_id == proposal["owner_id"] and proposal["status"] == "pending" and all(
(
p["status"] == "Accepted" and
# Everyone's bot must be connected
lobby.players[p["player_id"]]["client_connected"]
) for p in proposal["players"]
):
lobby.start_proposal(proposal["id"])
print(f"[Started match]")
def on_message(ws, msg):
if msg["type"] == "proposalData":
accept_if_possible_and_owner_participates(msg["data"])
start_match_if_possible(msg["data"])
if msg["type"] == "lobbyState":
for proposal in lobby.proposals.values():
accept_if_possible_and_owner_participates(proposal)
# Be prepared to start a match as soon as the last bot comes online
if msg["type"] in ("lobbyState", "playerData"):
for proposal in lobby.proposals.values():
start_match_if_possible(proposal)
lobby.open_websocket(on_message, print)
try:
print("Note: auto-accepting and auto-starting in the background.")
while True:
print("To create new game, press enter.")
input()
print(" NEW GAME")
print()
map_name = input(" Map> ")
# FIXME Race: player data may not be received yet
print(" P1> " + lobby.players[lobby.own_player_id]["name"])
p2 = lobby.get_player_by_name(input(" P2> "))
while not p2:
print("User not found")
p2 = lobby.get_player_by_name(input(" P2> "))
lobby.create_proposal(map_name, 500, [lobby.own_player_id, p2])
except (KeyboardInterrupt, EOFError):
print(" Quitting")
finally:
# Remove auto prefix from name
Lobby.from_lobby_url(lobby_url, bot_name, bot_token).join()
if __name__ == "__main__":
main()