pancakecounter/read_mattermost.py
2020-12-09 14:47:10 +01:00

315 lines
7.8 KiB
Python
Executable file

#!/usr/bin/env python3
import sys
import os
from collections import defaultdict
import datetime
import threading
from time import sleep
import json
from typing import Optional, Mapping, Set
import mattermost
import mattermost.ws
SERVER = "mattermost.zeus.gent"
TEAM_NAME = "zeus"
CHAN_NAME = "pannenkoeken"
EMOJI_NAME = "pancakes"
DOUBLE_EMOJI_NAME = "own_pancakes"
TAGGERS = [
# Board
"flynn",
"bobby",
"pcassima",
"redfast00",
"francis.",
"hannes",
"arnhoudt",
"mel",
# Sneaky backdoor (actually just for testing and if you read this, nobody has removed it)
"midgard",
]
TOKEN = os.getenv("MM_ACCESS_TOKEN")
USER = os.getenv("MM_USERNAME")
PASSWORD = os.getenv("MM_PASSWORD")
# Set to False to disable reacting with an emoji for the count
CONFIRMATION_EMOJI = True
try:
since_arg_i = sys.argv.index("--since")
except ValueError:
SINCE = datetime.datetime.now(tz=datetime.timezone.utc)
print(f"Warning: no start time provided, using now. Use `--since {SINCE.isoformat(timespec='seconds')}` to pin the start time.", file=sys.stderr)
if since_arg_i:
SINCE = datetime.datetime.fromisoformat(sys.argv[since_arg_i + 1])
clean = "--clean" in sys.argv[1:]
live = "--live" in sys.argv[1:]
if "--no-confirm" in sys.argv[1:]:
CONFIRMATION_EMOJI = False
if not clean and sys.stdout.isatty():
print("To use this data, redirect stdout to a file and use table.py on it.", file=sys.stderr)
def first(iterable, default=None):
for x in iterable:
return x
return default
##################################
# Log in
mm = mattermost.MMApi(f"https://{SERVER}/api")
if TOKEN:
mm.login(bearer=TOKEN)
else:
assert USER
assert PASSWORD
mm.login(USER, PASSWORD)
our_user_id = mm._my_user_id
##################################
# Get channel
team_data = first(filter(lambda team: team["name"] == TEAM_NAME, mm.get_teams()))
assert team_data, "Team should exist"
channel_data = first(filter(lambda chan: chan["name"] == CHAN_NAME, mm.get_team_channels(team_data["id"])))
assert channel_data, "Channel should exist"
channel = channel_data["id"]
##################################
# Get users
# People who are authorized to do verifications
tagger_ids = {u["id"]: u["username"] for u in mm.get_users_by_usernames_list(TAGGERS)}
users = {u["id"]: u for u in mm.get_users(in_channel=channel)}
for user in users.values():
assert user["username"].find(" ") == -1, f"{user['username']} shouldn't have spaces in username"
def get_username(userid):
# When someone joined later
if userid not in users:
users[userid] = mm.get_user(userid)
username = users[userid]["username"]
assert username.find(" ") == -1, f"{username} shouldn't have spaces in username"
return username
##################################
# Get posts
posts = {}
def get_post(postid, force_fetch=False):
if postid not in posts or force_fetch:
posts[postid] = mm.get_post(postid)
return posts[postid]
def parse_mm_timestamp(mm_timestamp):
return datetime.datetime.fromtimestamp(mm_timestamp / 1000, datetime.timezone.utc)
def to_mm_timestamp(dt):
return int(dt.timestamp() * 1000)
def reaction_qualifies(reaction):
if reaction["user_id"] not in tagger_ids:
return 0
if reaction["emoji_name"] == EMOJI_NAME:
return 1
if reaction["emoji_name"] == DOUBLE_EMOJI_NAME:
return 2
return 0
def post_score(awardee_id, post_id, awarder_id):
return max(awarded[awardee_id][post_id][awarder_id], default=0)
def emit_change_line(post, awardee_id, awarder_id, prev_score, score):
if score == prev_score:
return
awardee = get_username(awardee_id)
awarder = get_username(awarder_id)
post_time = parse_mm_timestamp(post["create_at"]).isoformat(timespec="microseconds")
if score == 0:
message = f"{awarder} retracted their verification"
elif prev_score == 0:
message = f"{awarder} verified with score {score}"
else:
message = f"{awarder} updated their verification's score from {prev_score} to {score}"
print(f"{awardee} {post['id']} at {post_time}: {message}", flush=True)
# awarded[awardee][post_id][verifier]: set of values
awarded: Mapping[str, Mapping[str, Mapping[str, Set[int]]]] = \
defaultdict(lambda: defaultdict(lambda: defaultdict(set)))
def process_change(reaction, action):
value = reaction_qualifies(reaction)
if value == 0:
return
post = get_post(reaction["post_id"])
if parse_mm_timestamp(post["create_at"]) < SINCE:
return
awardee_id = post["user_id"]
awarder_id = reaction["user_id"]
prev_score = post_score(awardee_id, post["id"], awarder_id)
action(awarded[awardee_id][post["id"]][awarder_id], value)
score = post_score(awardee_id, post["id"], awarder_id)
emit_change_line(post, awardee_id, awarder_id, prev_score, score)
update_confirmation(post["id"])
def award_if_appropriate(reaction):
process_change(
reaction,
lambda values_set, value: values_set.add(value)
)
def retract_if_appropriate(reaction):
process_change(
reaction,
lambda values_set, value: values_set.discard(value)
)
def get_posts_for_channel(mmapi, channel_id, since, **kwargs):
after = None
while True:
data_page = mmapi._get("/v4/channels/"+channel_id+"/posts", params=(
{ "after": after }
if after else
{ "since": to_mm_timestamp(since) }
), **kwargs)
order = list(reversed(data_page["order"]))
for post_id in order:
yield data_page["posts"][post_id]
if not order:
return
after = order[-1]
CONFIRMATION_EMOJI_NAMES = "one,two,three,four,five,six,seven,eight,nine,keycap_ten,asterisk".split(",")
def confirmation_emoji_name(count):
if count < 0:
return "exclamation"
try:
return CONFIRMATION_EMOJI_NAMES[count - 1]
except IndexError:
return CONFIRMATION_EMOJI_NAMES[-1]
def persevere(f, backoff=1):
while True:
try:
f()
return
except Exception as e:
print(e, file=sys.stderr)
print(f"Trying again in {backoff} second(s)", file=sys.stderr)
sleep(backoff)
# awarded[awardee][post_id][verifier]: set of values
def count_verifications(user_id):
return sum(
max(
(
max(values, default=0) for values in post_verifications.values()
),
default=0
)
for post_verifications in awarded[user_id].values()
)
def update_confirmation(post_id):
if not CONFIRMATION_EMOJI:
return
post = get_post(post_id, force_fetch=True)
remove_reactions_from_post(post)
new_count = count_verifications(post["user_id"])
if new_count > 0:
persevere(lambda: mm.create_reaction(our_user_id, post_id, confirmation_emoji_name(new_count)))
def remove_reactions_from_post(post):
for reaction in post.get("metadata", {}).get("reactions", []):
if reaction["user_id"] == our_user_id:
persevere(lambda: remove_reaction(post["id"], reaction["emoji_name"]))
def remove_reaction(post_id, emoji_name):
mm._delete(f"/v4/users/me/posts/{post_id}/reactions/{emoji_name}")
def handle_backlog(since):
for post in get_posts_for_channel(mm, channel, since):
remove_reactions_from_post(post)
for reaction in post.get("metadata", {}).get("reactions", []):
award_if_appropriate(reaction)
def handle_live():
def ws_handler(mmws, event_data):
if event_data["broadcast"]["channel_id"] != channel:
return
if event_data["event"] == "reaction_added":
award_if_appropriate(json.loads(event_data["data"]["reaction"]))
elif event_data["event"] == "reaction_removed":
retract_if_appropriate(json.loads(event_data["data"]["reaction"]))
ws = mattermost.ws.MMws(ws_handler, mm, f"wss://{SERVER}/api/v4/websocket")
while True:
sleep(60 * 1000)
if clean:
for _post in get_posts_for_channel(mm, channel, SINCE):
remove_reactions_from_post(_post)
else:
# Note: skipping this step and updating an existing file would be dangerous: you would miss revocations that happened while not listening.
handle_backlog(SINCE)
if live:
print("Now watching for live posts.", file=sys.stderr)
handle_live()
else:
print("Use --live to keep watching new posts.", file=sys.stderr)
# Logout
if not TOKEN:
mm.revoke_user_session()