#!/usr/bin/env python3 import sys import os from collections import defaultdict import datetime from time import sleep import json from typing import Dict, Set import mattermost import mattermost.ws SERVER = "mattermost.zeus.gent" TEAM_NAME = "zeus" CHAN_NAME = "pannenkoeken" EMOJI_NAME = "pancakes" DOUBLE_EMOJI_NAME = "own_pancakes" TAGGERS = [ # Board "flynn", "bobby", "pcassima", "redfast00", "francis.", "hannes", "arnhoudt", "mel", # Sneaky backdoor (actually just for testing and if you read this, nobody has removed it) "midgard", ] TOKEN = os.getenv("MM_ACCESS_TOKEN") USER = os.getenv("MM_USERNAME") PASSWORD = os.getenv("MM_PASSWORD") # Set to False to disable reacting with an emoji for the count CONFIRMATION_EMOJI = True since_arg_i = None try: since_arg_i = sys.argv.index("--since") except ValueError: SINCE = datetime.datetime.now(tz=datetime.timezone.utc) print(f"Warning: no start time provided, using now. Use `--since " f"{SINCE.isoformat(timespec='seconds')}` to pin the start time.", file=sys.stderr) if since_arg_i: SINCE = datetime.datetime.fromisoformat(sys.argv[since_arg_i + 1]) clean = "--clean" in sys.argv[1:] live = "--live" in sys.argv[1:] if "--no-confirm" in sys.argv[1:]: CONFIRMATION_EMOJI = False if not clean and sys.stdout.isatty(): print("To use this data, redirect stdout to a file and use table.py on it.", file=sys.stderr) def first(iterable, default=None): for x in iterable: return x return default ################################## # Log in mm = mattermost.MMApi(f"https://{SERVER}/api") if TOKEN: mm.login(bearer=TOKEN) else: assert USER, "USER and PASSWORD or TOKEN envvars must be set" assert PASSWORD, "USER and PASSWORD or TOKEN envvars must be set" mm.login(USER, PASSWORD) # pylint: disable=protected-access # bad library design: can't fix this without extra API request our_user_id = mm._my_user_id ################################## # Get channel team_data = first(filter( lambda team: team["name"] == TEAM_NAME, mm.get_teams() )) assert team_data, "Team should exist" channel_data = first(filter( lambda chan: chan["name"] == CHAN_NAME, mm.get_team_channels(team_data["id"]) )) assert channel_data, "Channel should exist" channel = channel_data["id"] ################################## # Get users # People who are authorized to do verifications tagger_ids = {u["id"]: u["username"] for u in mm.get_users_by_usernames_list(TAGGERS)} users = {u["id"]: u for u in mm.get_users(in_channel=channel)} for user in users.values(): assert user["username"].find(" ") == -1, f"{user['username']} shouldn't have spaces in username" def get_username(userid): # When someone joined later if userid not in users: users[userid] = mm.get_user(userid) username = users[userid]["username"] assert username.find(" ") == -1, f"{username} shouldn't have spaces in username" return username ################################## # Get posts posts = {} def get_post(postid, force_fetch=False): if postid not in posts or force_fetch: posts[postid] = mm.get_post(postid) return posts[postid] def parse_mm_timestamp(mm_timestamp): return datetime.datetime.fromtimestamp(mm_timestamp / 1000, datetime.timezone.utc) def to_mm_timestamp(dt): # pylint: disable=invalid-name return int(dt.timestamp() * 1000) def reaction_qualifies(reaction): if reaction["user_id"] not in tagger_ids: return 0 if reaction["emoji_name"] == EMOJI_NAME: return 1 if reaction["emoji_name"] == DOUBLE_EMOJI_NAME: return 2 return 0 def post_score(awardee_id, post_id, awarder_id): return max(awarded[awardee_id][post_id][awarder_id], default=0) def emit_change_line(post, awardee_id, awarder_id, prev_score, score): if score == prev_score: return awardee = get_username(awardee_id) awarder = get_username(awarder_id) post_time = parse_mm_timestamp(post["create_at"]).isoformat(timespec="microseconds") if score == 0: message = f"{awarder} retracted their verification" elif prev_score == 0: message = f"{awarder} verified with score {score}" else: message = f"{awarder} updated their verification's score from {prev_score} to {score}" print(f"{awardee} {post['id']} at {post_time}: {message}", flush=True) # awarded[awardee][post_id][verifier]: set of values awarded: Dict[str, Dict[str, Dict[str, Set[int]]]] = \ defaultdict(lambda: defaultdict(lambda: defaultdict(set))) def process_change(reaction, action): value = reaction_qualifies(reaction) if value == 0: return post = get_post(reaction["post_id"]) if parse_mm_timestamp(post["create_at"]) < SINCE: return awardee_id = post["user_id"] awarder_id = reaction["user_id"] prev_score = post_score(awardee_id, post["id"], awarder_id) action(awarded[awardee_id][post["id"]][awarder_id], value) score = post_score(awardee_id, post["id"], awarder_id) emit_change_line(post, awardee_id, awarder_id, prev_score, score) update_confirmation(post["id"]) def award_if_appropriate(reaction): process_change( reaction, lambda values_set, value: values_set.add(value) ) def retract_if_appropriate(reaction): process_change( reaction, lambda values_set, value: values_set.discard(value) ) def get_posts_for_channel(mmapi, channel_id, since, **kwargs): after = None while True: data_page = mmapi._get("/v4/channels/"+channel_id+"/posts", params=( { "after": after } if after else { "since": to_mm_timestamp(since) } ), **kwargs) order = list(reversed(data_page["order"])) for post_id in order: yield data_page["posts"][post_id] if not order: return after = order[-1] CONFIRMATION_EMOJI_NAMES = [ *(f"num{i}" for i in range(1, 99 + 1)), "heavy_plus_sign" ] def confirmation_emoji_name(count): if count < 0: return "exclamation" try: return CONFIRMATION_EMOJI_NAMES[count - 1] except IndexError: return CONFIRMATION_EMOJI_NAMES[-1] def persevere(f, e_type=Exception, backoff=1): while True: try: f() return except e_type as exc: print(exc, file=sys.stderr) print(f"Trying again in {backoff} second(s)", file=sys.stderr) sleep(backoff) # awarded[awardee][post_id][verifier]: set of values def count_verifications(user_id): return sum( max( ( max(values, default=0) for values in post_verifications.values() ), default=0 ) for post_verifications in awarded[user_id].values() ) def update_confirmation(post_id): if not CONFIRMATION_EMOJI: return post = get_post(post_id, force_fetch=True) remove_reactions_from_post(post) new_count = count_verifications(post["user_id"]) if new_count > 0: persevere(lambda: mm.create_reaction(our_user_id, post_id, confirmation_emoji_name(new_count))) def remove_reactions_from_post(post): for reaction in post.get("metadata", {}).get("reactions", []): if reaction["user_id"] == our_user_id: # pylint: disable=cell-var-from-loop # persevere doesn't store lambda persevere(lambda: remove_reaction(post["id"], reaction["emoji_name"])) def remove_reaction(post_id, emoji_name): # pylint: disable=protected-access # library recommends this in docs mm._delete(f"/v4/users/me/posts/{post_id}/reactions/{emoji_name}") def handle_backlog(since): for post in get_posts_for_channel(mm, channel, since): remove_reactions_from_post(post) for reaction in post.get("metadata", {}).get("reactions", []): award_if_appropriate(reaction) def handle_live(): def ws_handler(_mmws, event_data): if event_data["broadcast"]["channel_id"] != channel: return if event_data["event"] == "reaction_added": award_if_appropriate(json.loads(event_data["data"]["reaction"])) elif event_data["event"] == "reaction_removed": retract_if_appropriate(json.loads(event_data["data"]["reaction"])) _ = mattermost.ws.MMws(ws_handler, mm, f"wss://{SERVER}/api/v4/websocket") while True: sleep(60 * 1000) if clean: for _post in get_posts_for_channel(mm, channel, SINCE): remove_reactions_from_post(_post) else: # Note: skipping this step and updating an existing file would be dangerous: # you would miss revocations that happened while not listening. handle_backlog(SINCE) if live: print("Now watching for live posts.", file=sys.stderr) handle_live() else: print("Use --live to keep watching new posts.", file=sys.stderr) # Logout if not TOKEN: mm.revoke_user_session()