Refactor to separate parse_numberdealers

This commit is contained in:
Midgard 2024-03-11 14:47:46 +01:00
parent c3459defed
commit a6df150b6f
Signed by: midgard
GPG key ID: 511C112F1331BBB4
3 changed files with 278 additions and 112 deletions

View file

@ -1,122 +1,54 @@
#!/usr/bin/env python3
import sys
import re
import json
import parse_numberdealers
try:
USERS = __import__("users").users()
except ImportError:
USERS = None
def link(link_text: str, message_obj: parse_numberdealers.Message):
return f"[{link_text}]({parse_numberdealers.URL_PREFIX}{message_obj.id})"
NUMBER_EMOJI = {
"zero": "0",
"one": "1",
"two": "2",
"three": "3",
"four": "4",
"five": "5",
"six": "6",
"seven": "7",
"eight": "8",
"nine": "9",
}
URL_PREFIX = "https://mattermost.zeus.gent/zeus/pl/"
second_last_number = None
second_last_line = {"id": None}
last_number = None
last_line = {"id": None}
messages = []
start_number = None
for line in sys.stdin:
line = json.loads(line)
# Ignore non-message posts (e.g. join/leave)
if line.get("type") is not None:
continue
if "user_id" in line and USERS is not None:
line["username"] = USERS.get(line["user_id"])
if "username" in line:
line["mention"] = f" (@{line['username']})"
def mention(message: parse_numberdealers.Message):
if message.username is not None:
return f" (@{message.username})"
else:
line["mention"] = ""
return ""
if "message" in line and line["message"] != "":
message = line["message"]
message = re.sub(r"^[#>]* ?|[*_`]*", "", message)
for emoji, numb in NUMBER_EMOJI.items():
message = re.sub(f" *:{emoji}: *", numb, message)
message = re.sub(" ?:(?:green)?num([0-9]+): ?", lambda m: m.group(1), message)
message = message.replace("\ufe0f", "").replace("\u20e3", "").replace("\u200b", "")
message = message.strip()
def str_from_error(err):
if isinstance(err, parse_numberdealers.UnrecognizedNumber):
msg = f"- Unrecognized post {link(err.message.message, err.message)}"
elif isinstance(err, parse_numberdealers.EditedMessage):
msg = f"- Edited post {link(err.message.message, err.message)}"
elif isinstance(err, parse_numberdealers.NonNumberMessage):
msg = f"- Non-number message {link(err.message.message, err.message)}"
elif isinstance(err, parse_numberdealers.ShouldHaveBeen):
msg = f"- {link(err.message.recognized_number, err.message)} should have been {err.expected_number}"
elif isinstance(err, parse_numberdealers.Duplicate):
msg = f"- Duplicate {link(err.message.recognized_number, err.message)}"
elif isinstance(err, parse_numberdealers.Stray):
msg = f"- Stray {link(err.message.recognized_number, err.message)}"
elif isinstance(err, parse_numberdealers.Skipped):
msg = f"- {link('Skipped', err.message)} {err.expected_number}"
elif isinstance(err, parse_numberdealers.Jump):
msg = f"- Going from {link(err.previous_message.recognized_number, err.previous_message)}" \
f" to {link(err.message.recognized_number, err.message)}"
return msg + mention(err.message)
def main():
import sys
numbers, errors = parse_numberdealers.parse(sys.stdin)
if numbers == [] and errors == []:
print("No input data")
else:
try:
message = line["metadata"]["files"][0]["name"].split(".")[0]
except (KeyError, IndexError):
messages.append(
f"- [Unrecognized post]({URL_PREFIX}{line['id']}){line['mention']}"
)
continue
if numbers == []:
print("No valid number messages!")
else:
print(f"Checked from {numbers[0].recognized_number} up to {numbers[-1].recognized_number}")
if errors:
print("🚨 Errors: 🚨")
print("\n".join(map(str_from_error, errors)))
else:
print("No errors! 🎉")
if line.get("edit_at") is not None:
messages.append(
f"- Edited message [{message}]({URL_PREFIX}{line['id']}){line['mention']}"
)
m = re.fullmatch(r"-?[1-9][0-9]*|0", message)
if not m:
messages.append(
f"- Non-number message [{message}]({URL_PREFIX}{line['id']}){line['mention']}"
)
else:
number = int(m.group(0))
if last_number is None:
start_number = number
last_number = number - 1
second_last_number = number - 2
if number != last_number + 1:
if number == second_last_number + 2 and last_number != second_last_number + 1:
messages.pop()
messages.append(
f"- [{last_number}]({URL_PREFIX}{last_line['id']}) should have been {number - 1}{last_line['mention']}"
)
elif number == last_number:
messages.append(
f"- Duplicate [{number}]({URL_PREFIX}{line['id']}){line['mention']}"
)
elif number == second_last_number + 1 and last_number != second_last_number + 1:
messages.pop()
messages.append(
f"- Stray [{last_number}]({URL_PREFIX}{last_line['id']}){last_line['mention']}"
)
elif number == last_number + 2:
messages.append(
f"- Skipped [{last_number + 1}]({URL_PREFIX}{line['id']}){line['mention']}"
)
else:
messages.append(
f"- Going from [{last_number}]({URL_PREFIX}{last_line['id']}) "
f"to [{number}]({URL_PREFIX}{line['id']}){line['mention']}"
)
second_last_number = last_number
second_last_line = last_line
last_number = number
last_line = line
if start_number is None and messages == []:
print("No input data")
else:
if start_number is None:
print("No valid number messages!")
else:
print(f"Checked from {start_number} up to {number}")
if messages:
print("🚨 Errors: 🚨")
print("\n".join(messages))
else:
print("No errors! 🎉")
if __name__ == "__main__":
main()

169
parse_numberdealers.py Executable file
View file

@ -0,0 +1,169 @@
#!/usr/bin/env python3
import re
import json
from dataclasses import dataclass
from typing import Optional, List
@dataclass
class Message:
id: str
username: Optional[str]
message: Optional[str]
first_filename: Optional[str]
create_at: int
recognized_number: Optional[int]
@dataclass
class NumberdealersError:
message: Message
previous_message: Message
expected_number: Optional[int]
class UnrecognizedNumber(NumberdealersError): pass
class EditedMessage(NumberdealersError): pass
class NonNumberMessage(NumberdealersError): pass
class ShouldHaveBeen(NumberdealersError): pass
class Duplicate(NumberdealersError): pass
class Stray(NumberdealersError): pass
class Skipped(NumberdealersError): pass
class Jump(NumberdealersError): pass
try:
USERS = __import__("users").users()
except ImportError:
USERS = None
NUMBER_EMOJI = {
"zero": "0",
"one": "1",
"two": "2",
"three": "3",
"four": "4",
"five": "5",
"six": "6",
"seven": "7",
"eight": "8",
"nine": "9",
}
URL_PREFIX = "https://mattermost.zeus.gent/zeus/pl/"
def parse(message_json_lines):
second_last_number = None
second_last_message = None
last_number = None
last_message = None
numbers = []
errors = []
start_number = None
for line in message_json_lines:
line = json.loads(line)
# Ignore non-message posts (e.g. join/leave)
if line.get("type") is not None:
continue
username = None
if "user_id" in line and USERS is not None and line["user_id"] in USERS:
username = USERS[line["user_id"]].get("username")
if username is None:
username = line.get("username")
try:
first_filename = line["metadata"]["files"][0]["name"]
except (KeyError, IndexError):
first_filename = None
message_obj = Message(
id=line["id"],
username=username,
message=line.get("message"),
first_filename=first_filename,
create_at=line["create_at"],
recognized_number=None
)
message = None
if "message" in line and line["message"] != "":
message = line["message"]
message = re.sub(r"^[#>]* ?|[*_`]*", "", message)
for emoji, numb in NUMBER_EMOJI.items():
message = re.sub(f" *:{emoji}: *", numb, message)
message = re.sub(" ?:(?:green)?num([0-9]+): ?", lambda m: m.group(1), message)
message = message.replace("\ufe0f", "").replace("\u20e3", "").replace("\u200b", "")
message = message.strip()
elif first_filename is not None:
message = first_filename.split(".")[0]
else:
errors.append(
UnrecognizedNumber(
message_obj, last_message, last_number+1 if last_number is not None else None
)
)
continue
if line.get("edit_at") is not None:
errors.append(
EditedMessage(message_obj, last_message, last_number+1 if last_number is not None else None)
)
m = re.fullmatch(r"-?[1-9][0-9]*|0", message)
if not m:
errors.append(
NonNumberMessage(message_obj, last_message, last_number+1 if last_number is not None else None)
)
else:
number = int(m.group(0))
message_obj.recognized_number = number
if last_number is None:
start_number = number
last_number = number - 1
second_last_number = number - 2
numbers.append(message_obj)
if number != last_number + 1:
if number == second_last_number + 2 and last_number != second_last_number + 1:
errors.pop()
errors.append(
ShouldHaveBeen(last_message, second_last_message, number-1)
)
elif number == last_number:
errors.append(
Duplicate(message_obj, last_message, last_number+1)
)
elif number == second_last_number + 1 and last_number != second_last_number + 1:
errors.pop()
errors.append(
Stray(last_message, second_last_message, last_number+1)
)
elif last_number == second_last_number + 1 and number == last_number + 2:
errors.pop()
errors.append(
Skipped(last_message, second_last_message, number-1)
)
else:
errors.append(
Jump(message_obj, last_message, last_number+1)
)
second_last_number = last_number
second_last_message = last_message
last_number = number
last_message = message_obj
return numbers, errors
def main():
import sys
from datetime import datetime, timezone
numbers, _errors = parse(sys.stdin)
for number in numbers:
moment = datetime.fromtimestamp(number.create_at / 1000, timezone.utc)
moment_str = str(moment).replace("+00:00", "")
print(f"{moment_str}\t{number.username}\t{number.recognized_number}")
if __name__ == "__main__":
main()

65
plot_numberdealers.py Executable file
View file

@ -0,0 +1,65 @@
#!/usr/bin/env python3
from datetime import timezone, datetime
import parse_numberdealers
import matplotlib.pyplot as plt
import numpy as np
def xy(messages):
x = []
y = []
for msg in messages:
x.append(datetime.fromtimestamp(msg.create_at / 1000, timezone.utc))
# x.append(msg.create_at / 1000)
y.append(msg.recognized_number)
return x, y
def find(xs, condition):
for i, x in enumerate(xs):
if condition(x):
return i
def abline(slope, intercept):
"""Plot a line from slope and intercept"""
axes = plt.gca()
x_vals = np.array(axes.get_xlim())
y_vals = slope * x_vals + intercept
plt.plot(x_vals, y_vals, '--')
def main():
import sys
with open(sys.argv[1], encoding="utf-8") as fh:
numbers_og, _errors = parse_numberdealers.parse(fh)
with open(sys.argv[2], encoding="utf-8") as fh:
numbers_ng, _errors = parse_numberdealers.parse(fh)
# start_of_current_slope = find(numbers_og, lambda msg: msg.recognized_number <= 10464)
# slope_og = (
# numbers_og[-1].recognized_number - numbers_og[start_of_current_slope].recognized_number
# ) / (
# numbers_og[-1].create_at/1000 - numbers_og[start_of_current_slope].create_at/1000
# )
# # b = y - a x
# intercept_og = numbers_og[-1].recognized_number - slope_og * numbers_og[-1].create_at
# slope_ng = (
# numbers_ng[-1].recognized_number - numbers_ng[0].recognized_number
# ) / (
# numbers_ng[-1].create_at/1000 - numbers_ng[0].create_at/1000
# )
# intercept_ng = numbers_ng[-1].recognized_number - slope_ng * numbers_ng[-1].create_at
fig, ax = plt.subplots()
ax.plot(*xy(numbers_og))
ax.plot(*xy(numbers_ng))
# abline(slope_og, -120000)
# abline(slope_ng, -500000)
plt.show()
if __name__ == "__main__":
main()