pancakecounter/read_mattermost.py
2020-12-06 14:28:52 +01:00

289 lines
7.3 KiB
Python
Executable file

#!/usr/bin/env python3
import sys
import os
from collections import defaultdict
import datetime
import threading
from time import sleep
import json
from typing import Optional
import mattermost
import mattermost.ws
SERVER = "mattermost.zeus.gent"
TEAM_NAME = "zeus"
CHAN_NAME = "pannenkoeken"
EMOJI_NAME = "pancakes"
TAGGERS = [
# Board
"flynn",
"bobby",
"pcassima",
"redfast00",
"francis.",
"hannes",
"arnhoudt",
"mel",
# Sneaky backdoor (actually just for testing and if you read this, nobody has removed it)
"midgard",
]
TOKEN = os.getenv("MM_ACCESS_TOKEN")
USER = os.getenv("MM_USERNAME")
PASSWORD = os.getenv("MM_PASSWORD")
CONFIRMATION_EMOJI = True
try:
since_arg_i = sys.argv.index("--since")
except ValueError:
SINCE = datetime.datetime.now(tz=datetime.timezone.utc)
print(f"Warning: no start time provided, using now. Use `--since {SINCE.isoformat(timespec='seconds')}` to pin the start time.", file=sys.stderr)
if since_arg_i:
SINCE = datetime.datetime.fromisoformat(sys.argv[since_arg_i + 1])
clean = "--clean" in sys.argv[1:]
live = "--live" in sys.argv[1:]
if "--no-confirm" in sys.argv[1:]:
CONFIRMATION_EMOJI = None
if not clean and sys.stdout.isatty():
print("To use this data, redirect stdout to a file and use table.py on it.", file=sys.stderr)
def first(iterable, default=None):
for x in iterable:
return x
return default
##################################
# Log in
mm = mattermost.MMApi(f"https://{SERVER}/api")
if TOKEN:
mm.login(bearer=TOKEN)
else:
assert USER
assert PASSWORD
mm.login(USER, PASSWORD)
our_user_id = mm._my_user_id
##################################
# Get channel
team_data = first(filter(lambda team: team["name"] == TEAM_NAME, mm.get_teams()))
assert team_data, "Team should exist"
channel_data = first(filter(lambda chan: chan["name"] == CHAN_NAME, mm.get_team_channels(team_data["id"])))
assert channel_data, "Channel should exist"
channel = channel_data["id"]
##################################
# Get users
# People who are authorized to do verifications
tagger_ids = {u["id"]: u["username"] for u in mm.get_users_by_usernames_list(TAGGERS)}
users = {u["id"]: u for u in mm.get_users(in_channel=channel)}
for user in users.values():
assert user["username"].find(" ") == -1, f"{user['username']} shouldn't have spaces in username"
def get_username(userid):
# When someone joined later
if userid not in users:
users[userid] = mm.get_user(userid)
username = users[userid]["username"]
assert username.find(" ") == -1, f"{username} shouldn't have spaces in username"
return username
##################################
# Get posts
posts = {}
def get_post(postid, force_fetch=False):
if postid not in posts or force_fetch:
posts[postid] = mm.get_post(postid)
return posts[postid]
def parse_mm_timestamp(mm_timestamp):
return datetime.datetime.fromtimestamp(mm_timestamp / 1000, datetime.timezone.utc)
def to_mm_timestamp(dt):
return int(dt.timestamp() * 1000)
def reaction_qualifies(reaction):
return reaction["emoji_name"] == EMOJI_NAME and reaction["user_id"] in tagger_ids
awarded = {} # awarded[awardee][post_id]: set of verifiers
def award_if_appropriate(reaction):
if not reaction_qualifies(reaction):
return
post = get_post(reaction["post_id"])
if parse_mm_timestamp(post["create_at"]) < SINCE:
return
awardee_id = post["user_id"]
awarder_id = reaction["user_id"]
if awardee_id not in awarded:
awarded[awardee_id] = {}
if post["id"] not in awarded[awardee_id]:
awarded[awardee_id][post["id"]] = set()
if awarder_id in awarded[awardee_id][post["id"]]:
# We already knew that this user verified this post
return
awarded[awardee_id][post["id"]].add(awarder_id)
reaction_time = parse_mm_timestamp(reaction["create_at"]).isoformat(timespec="microseconds")
post_time = parse_mm_timestamp(reaction["create_at"]).isoformat(timespec="microseconds")
awardee = get_username(awardee_id)
awarder = get_username(awarder_id)
print(f"{awardee} {post['id']} at {post_time} verified by {awarder} at {reaction_time}", flush=True)
update_confirmation(post["id"])
def retract_if_appropriate(reaction):
if not reaction_qualifies(reaction):
return
post = get_post(reaction["post_id"])
if parse_mm_timestamp(post["create_at"]) < SINCE:
return
awardee_id = post["user_id"]
awarder_id = reaction["user_id"]
awarded[awardee_id][post["id"]].discard(awarder_id)
if not awarded[awardee_id][post["id"]]:
del awarded[awardee_id][post["id"]]
if not awarded[awardee_id]:
del awarded[awardee_id]
awardee = get_username(awardee_id)
awarder = get_username(awarder_id)
print(f"{awardee} {post['id']} verification removed by {awarder}", flush=True)
update_confirmation(post["id"])
def get_posts_for_channel(mmapi, channel_id, since, **kwargs):
after = None
while True:
data_page = mmapi._get("/v4/channels/"+channel_id+"/posts", params=(
{ "after": after }
if after else
{ "since": to_mm_timestamp(since) }
), **kwargs)
order = list(reversed(data_page["order"]))
for post_id in order:
yield data_page["posts"][post_id]
if not order:
return
after = order[-1]
CONFIRMATION_EMOJI_NAMES = "one,two,three,four,five,six,seven,eight,nine,keycap_ten,asterisk".split(",")
def confirmation_emoji_name(count):
if count < 0:
return "exclamation"
try:
return CONFIRMATION_EMOJI_NAMES[count - 1]
except IndexError:
return CONFIRMATION_EMOJI_NAMES[-1]
def persevere(f, backoff=1):
while True:
try:
f()
return
except Exception as e:
print(e, file=sys.stderr)
print(f"Trying again in {backoff} second(s)", file=sys.stderr)
sleep(backoff)
def update_confirmation(post_id):
if not CONFIRMATION_EMOJI:
return
post = get_post(post_id, force_fetch=True)
remove_reactions_from_post(post)
new_count = len(awarded.get(post["user_id"], []))
if new_count > 0:
persevere(lambda: mm.create_reaction(our_user_id, post_id, confirmation_emoji_name(new_count)))
def remove_reactions_from_post(post):
for reaction in post.get("metadata", {}).get("reactions", []):
if reaction["user_id"] == our_user_id:
persevere(lambda: remove_reaction(post["id"], reaction["emoji_name"]))
def remove_reaction(post_id, emoji_name):
mm._delete(f"/v4/users/me/posts/{post_id}/reactions/{emoji_name}")
def handle_backlog(since):
for post in get_posts_for_channel(mm, channel, since):
remove_reactions_from_post(post)
for reaction in post.get("metadata", {}).get("reactions", []):
award_if_appropriate(reaction)
def handle_live():
def ws_handler(mmws, event_data):
if event_data["broadcast"]["channel_id"] != channel:
return
if event_data["event"] == "reaction_added":
award_if_appropriate(json.loads(event_data["data"]["reaction"]))
elif event_data["event"] == "reaction_removed":
retract_if_appropriate(json.loads(event_data["data"]["reaction"]))
ws = mattermost.ws.MMws(ws_handler, mm, f"wss://{SERVER}/api/v4/websocket")
while True:
sleep(60 * 1000)
if clean:
for post in get_posts_for_channel(mm, channel, SINCE):
remove_reactions_from_post(post)
else:
# Note: skipping this step and updating an existing file would be dangerous: you would miss revocations that happened while not listening.
handle_backlog(SINCE)
if live:
print("Now watching for live posts.", file=sys.stderr)
handle_live()
else:
print("Use --live to keep watching new posts.", file=sys.stderr)
# Logout
if not TOKEN:
mm.revoke_user_session()