166 lines
4.2 KiB
Python
166 lines
4.2 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
import re
|
||
|
import json
|
||
|
import threading
|
||
|
import requests
|
||
|
import websocket
|
||
|
import time
|
||
|
|
||
|
|
||
|
# WS data to server: https://github.com/iasoon/pw3-moz2/blob/prototype/src/websocket.rs#L27
|
||
|
# WS data from server: https://github.com/iasoon/pw3-moz2/blob/prototype/src/lobby_manager.rs#L244
|
||
|
# HTTP: https://github.com/iasoon/pw3-moz2/blob/prototype/src/main.rs#L451
|
||
|
|
||
|
class Lobby:
|
||
|
"""
|
||
|
@param url_base: domain name and optionally port and path;
|
||
|
e.g. example.org:8880/mozaic, mozaic.example.org
|
||
|
"""
|
||
|
def __init__(self, lobby_id: str, bot_name: str, token: str, url_base: str, tls: bool=True):
|
||
|
assert not re.match(r"^[^:]+://", url_base), "url_base should not include protocol"
|
||
|
assert url_base[-1] != "/", "url_base should not end with /"
|
||
|
|
||
|
self.lobby_id = lobby_id
|
||
|
self.bot_name = bot_name
|
||
|
self.token = token
|
||
|
self.url_base = url_base
|
||
|
self.tls = tls
|
||
|
|
||
|
self.ws = None
|
||
|
self.thread = None
|
||
|
|
||
|
self.own_player_id = None
|
||
|
self.players = None
|
||
|
|
||
|
|
||
|
@property
|
||
|
def websocket_url(self):
|
||
|
return ("wss://" if self.tls else "ws://") + self.url_base + "/websocket"
|
||
|
|
||
|
@property
|
||
|
def http_url_base(self):
|
||
|
return ("https://" if self.tls else "http://") + self.url_base + f"/api/lobbies/{self.lobby_id}"
|
||
|
|
||
|
|
||
|
def open_websocket(self, on_message, on_error, **kwargs):
|
||
|
|
||
|
def on_open(ws):
|
||
|
ws.send(json.dumps({
|
||
|
"type": "subscribeToLobby",
|
||
|
"lobbyId": self.lobby_id,
|
||
|
}))
|
||
|
ws.send(json.dumps({
|
||
|
"type": "authenticatePlayer",
|
||
|
"lobbyId": self.lobby_id,
|
||
|
"token": self.token,
|
||
|
}))
|
||
|
|
||
|
def internal_on_message(ws, msg):
|
||
|
data = json.loads(msg)
|
||
|
|
||
|
if data["type"] == "lobbyState":
|
||
|
self.players = {
|
||
|
int(player_id): data
|
||
|
for player_id, data in data["data"]["players"].items()
|
||
|
}
|
||
|
|
||
|
on_message(ws, data)
|
||
|
|
||
|
self.ws = websocket.WebSocketApp(
|
||
|
self.websocket_url,
|
||
|
on_open=on_open, on_message=internal_on_message, on_error=on_error, **kwargs
|
||
|
)
|
||
|
|
||
|
self.thread = threading.Thread(
|
||
|
target=self.ws.run_forever
|
||
|
)
|
||
|
self.thread.start()
|
||
|
|
||
|
|
||
|
def _post(self, path, payload, auth=True):
|
||
|
headers = {
|
||
|
"user-agent": "mozaicreceptionist/0.0.1"
|
||
|
}
|
||
|
if auth:
|
||
|
headers["authorization"] = f"Bearer {self.token}"
|
||
|
|
||
|
r = requests.post(self.http_url_base + path, headers=headers, json=payload)
|
||
|
r.raise_for_status()
|
||
|
return r
|
||
|
|
||
|
|
||
|
def join_lobby(self):
|
||
|
own_player_data = self._post(f"/join", {
|
||
|
"name": self.bot_name,
|
||
|
"token": self.token,
|
||
|
}, auth=False).json()
|
||
|
|
||
|
self.own_player_id = own_player_data["id"]
|
||
|
|
||
|
|
||
|
def accept_proposal(self, proposal_id):
|
||
|
self._post(f"/proposals/{proposal_id}/accept", {"status": "Accepted"})
|
||
|
|
||
|
|
||
|
@staticmethod
|
||
|
def from_lobby_url(url, bot_name, token) -> "Lobby":
|
||
|
"""
|
||
|
@param url: e.g. https://mozaic.example.org/lobbies/0123456789abcdef0123456789abcdef
|
||
|
"""
|
||
|
m = re.fullmatch(r"http(s?)://(.+)/lobbies/([^/]+)", url)
|
||
|
if not m:
|
||
|
return ValueError("Unrecognized MOZAIC lobby URL")
|
||
|
|
||
|
tls = m.group(1) == "s"
|
||
|
url_base = m.group(2)
|
||
|
lobby_id = m.group(3)
|
||
|
|
||
|
return Lobby(lobby_id, bot_name, token, url_base, tls)
|
||
|
|
||
|
def on_error(ws, error):
|
||
|
__import__("pprint").pprint(error)
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
import sys
|
||
|
lobby_url = sys.argv[1]
|
||
|
bot_token = sys.argv[2]
|
||
|
bot_name = " ".join(sys.argv[3:])
|
||
|
|
||
|
lobby = Lobby.from_lobby_url(
|
||
|
lobby_url,
|
||
|
bot_name,
|
||
|
bot_token
|
||
|
)
|
||
|
lobby.join_lobby()
|
||
|
|
||
|
def can_answer(proposal):
|
||
|
return proposal["status"] == "pending" and any(
|
||
|
p["player_id"] == lobby.own_player_id and p["status"] == "Unanswered"
|
||
|
for p in proposal["players"]
|
||
|
)
|
||
|
|
||
|
def owner_participates(proposal):
|
||
|
owner_id = proposal["owner_id"]
|
||
|
return owner_id == lobby.own_player_id or any(
|
||
|
(p["player_id"] == owner_id and p["status"] == "Accepted")
|
||
|
for p in proposal["players"]
|
||
|
)
|
||
|
|
||
|
def accept_if_possible_and_owner_participates(proposal):
|
||
|
if can_answer(proposal) and owner_participates(proposal):
|
||
|
proposal_id = proposal["id"]
|
||
|
owner_name = lobby.players[proposal["owner_id"]]["name"]
|
||
|
print(f"Accepting proposal from {owner_name}")
|
||
|
lobby.accept_proposal(proposal_id)
|
||
|
|
||
|
def on_message(ws, msg):
|
||
|
if msg["type"] == "proposalData":
|
||
|
accept_if_possible_and_owner_participates(msg["data"])
|
||
|
elif msg["type"] == "lobbyState":
|
||
|
for proposal in msg["data"]["proposals"].values():
|
||
|
accept_if_possible_and_owner_participates(proposal)
|
||
|
|
||
|
lobby.open_websocket(on_message, on_error)
|