#!/usr/bin/env python3 import sys import os from collections import defaultdict import datetime import threading from time import sleep import json import mattermost import mattermost.ws SERVER = "mattermost.zeus.gent" TEAM_NAME = "zeus" CHAN_NAME = "pannenkoeken" EMOJI_NAME = "pancakes" TAGGERS = [ # Board "flynn", "bobby", "pcassima", "redfast00", "francis.", "hannes", "arnhoudt", "mel", # Sneaky backdoor (actually just for testing and if you read this, nobody has removed it) "midgard", ] TOKEN = os.getenv("MM_ACCESS_TOKEN") USER = os.getenv("MM_USERNAME") PASSWORD = os.getenv("MM_PASSWORD") try: since_arg_i = sys.argv.index("--since") except ValueError: SINCE = datetime.datetime.now(tz=datetime.timezone.utc) print(f"Warning: no start time provided, using now. Use `--since {SINCE.isoformat(timespec='seconds')}` to pin the start time.", file=sys.stderr) if since_arg_i: SINCE = datetime.datetime.fromisoformat(sys.argv[since_arg_i + 1]) if sys.stdout.isatty(): print("To use this data, redirect stdout to a file and use table.py on it.", file=sys.stderr) def first(iterable, default=None): for x in iterable: return x return default ################################## # Log in mm = mattermost.MMApi(f"https://{SERVER}/api") if TOKEN: mm.login(bearer=TOKEN) else: assert USER assert PASSWORD mm.login(USER, PASSWORD) ################################## # Get channel team_data = first(filter(lambda team: team["name"] == TEAM_NAME, mm.get_teams())) assert team_data, "Team should exist" channel_data = first(filter(lambda chan: chan["name"] == CHAN_NAME, mm.get_team_channels(team_data["id"]))) assert channel_data, "Channel should exist" channel = channel_data["id"] ################################## # Get users # People who are authorized to grant pancakes tagger_ids = {u["id"]: u["username"] for u in mm.get_users_by_usernames_list(TAGGERS)} users = {u["id"]: u for u in mm.get_users(in_channel=channel)} for user in users.values(): assert user["username"].find(" ") == -1, f"{user['username']} shouldn't have spaces in username" def get_username(userid): # When someone joined later if userid not in users: users[userid] = mm.get_user(userid) return users[userid]["username"] ################################## # Get posts posts = {} def get_post(postid): if postid not in posts: posts[postid] = mm.get_post(postid) return posts[postid] def parse_mm_timestamp(mm_timestamp): return datetime.datetime.fromtimestamp(mm_timestamp / 1000, datetime.timezone.utc) def to_mm_timestamp(dt): return int(dt.timestamp() * 1000) def reaction_qualifies(reaction): return reaction["emoji_name"] == EMOJI_NAME and reaction["user_id"] in tagger_ids awarded = defaultdict(set) def award_if_appropriate(reaction): if not reaction_qualifies(reaction): return post = get_post(reaction["post_id"]) if parse_mm_timestamp(post["create_at"]) < SINCE: return awarder_id = reaction["user_id"] if awarder_id in awarded[post["id"]]: return awarded[post["id"]].add(awarder_id) reaction_time = parse_mm_timestamp(reaction["create_at"]).isoformat(timespec="microseconds") post_time = parse_mm_timestamp(reaction["create_at"]).isoformat(timespec="microseconds") awardee = get_username(post["user_id"]) awarder = get_username(awarder_id) print(f"{awardee} {post['id']} at {post_time} verified by {awarder} at {reaction_time}", flush=True) def retract_if_appropriate(reaction): if not reaction_qualifies(reaction): return post = get_post(reaction["post_id"]) if parse_mm_timestamp(post["create_at"]) < SINCE: return awarder_id = reaction["user_id"] awarded[post["id"]].discard(awarder_id) awardee = get_username(post["user_id"]) awarder = get_username(awarder_id) print(f"{awardee} {post['id']} verification removed by {awarder}", flush=True) def get_posts_for_channel(mmapi, channel_id, since, **kwargs): after = None while True: data_page = mmapi._get("/v4/channels/"+channel_id+"/posts", params=( { "after": after } if after else { "since": to_mm_timestamp(since) } ), **kwargs) order = list(reversed(data_page["order"])) for post_id in order: yield data_page["posts"][post_id] if not order: return after = order[-1] def handle_backlog(since): for post in get_posts_for_channel(mm, channel, since): for reaction in post.get("metadata", {}).get("reactions", []): award_if_appropriate(reaction) def handle_live(): def ws_handler(mmws, event_data): if event_data["broadcast"]["channel_id"] != channel: return if event_data["event"] == "reaction_added": award_if_appropriate(json.loads(event_data["data"]["reaction"])) elif event_data["event"] == "reaction_removed": retract_if_appropriate(json.loads(event_data["data"]["reaction"])) ws = mattermost.ws.MMws(ws_handler, mm, f"wss://{SERVER}/api/v4/websocket") while True: sleep(60 * 1000) live = "--live" in sys.argv[1:] # Note: skipping this step and updating an existing file would be dangerous: you would miss revocations that happened while not listening. handle_backlog(SINCE) if live: print("Now watching for live posts.", file=sys.stderr) handle_live() else: print("Use --live to keep watching new posts.", file=sys.stderr) # Logout if not TOKEN: mm.revoke_user_session()