381 lines
12 KiB
Python
Executable file
381 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import argparse
|
|
import os
|
|
import json
|
|
from typing import Dict, Optional, List, Iterable
|
|
import re
|
|
from time import sleep
|
|
import threading
|
|
import mattermost
|
|
|
|
from mmws import MMws
|
|
|
|
|
|
class NotFound(Exception):
|
|
def __init__(self, type_: str, name: str):
|
|
super().__init__(f"{type_} {name} not found")
|
|
self.type = type_
|
|
self.name = name
|
|
|
|
|
|
def first(iterable, default=None):
|
|
for x in iterable:
|
|
return x
|
|
return default
|
|
|
|
|
|
def yes_no(x):
|
|
return "yes" if x else "no"
|
|
|
|
|
|
def http_to_ws(url):
|
|
"""
|
|
Transform url from http to ws and https to wss
|
|
"""
|
|
assert url.startswith("http://") or url.startswith("https://")
|
|
return "ws" + url[4:]
|
|
|
|
|
|
def get_posts_for_channel(self, channel_id: str, progress=lambda x: None, after=None, since=None, **kwargs) -> Iterable[Dict]:
|
|
"""
|
|
@raises ApiException: Passed on from lower layers.
|
|
"""
|
|
per_page = 200
|
|
page = 0
|
|
total = 0
|
|
|
|
# if after and since:
|
|
# raise ValueError("after and since cannot be used together")
|
|
|
|
if since:
|
|
raise Exception("'since' functionality is broken in the API and behaves non-deterministically. It cannot be meaningfully used.")
|
|
# Posts in channel updated after a given timestamp: pagination is broken in the API
|
|
# current_since = since
|
|
# while True:
|
|
# data_page = self._get(f"/v4/channels/{channel_id}/posts", params={"since": current_since, **kwargs})
|
|
# order = data_page["order"]
|
|
|
|
# yield from (
|
|
# data_page["posts"][post_id]
|
|
# for post_id in reversed(order)
|
|
# )
|
|
# total += len(order)
|
|
# progress(total)
|
|
# if len(order) < 1000: # For some reason the pages go up to 1000 posts if 'since' is given
|
|
# break
|
|
# current_since = data_page["posts"][order[0]]["create_at"]
|
|
# sleep(0.1)
|
|
|
|
elif after:
|
|
# Posts in channel after a given ID: API gives pages with OLDEST messages first, so we can
|
|
# yield each page when it is fetched
|
|
while True:
|
|
data_page = self._get(f"/v4/channels/{channel_id}/posts", params={"page": page, "per_page": per_page, "after": after, **kwargs})
|
|
order = data_page["order"]
|
|
|
|
yield from (
|
|
data_page["posts"][post_id]
|
|
for post_id in reversed(order)
|
|
)
|
|
total += len(order)
|
|
progress(total)
|
|
if len(order) < per_page:
|
|
break
|
|
page += 1
|
|
sleep(0.1)
|
|
|
|
else:
|
|
# All posts in channel: API gives pages with NEWEST messages first, so reverse the order in
|
|
# the end (and don't reverse the order of each page separately)
|
|
posts = []
|
|
while True:
|
|
data_page = self._get(f"/v4/channels/{channel_id}/posts", params={"page": page, "per_page": per_page, **kwargs})
|
|
order = data_page["order"]
|
|
|
|
posts.extend(
|
|
data_page["posts"][post_id]
|
|
for post_id in order
|
|
)
|
|
progress(len(posts))
|
|
if len(order) < per_page:
|
|
break
|
|
page += 1
|
|
sleep(0.1)
|
|
|
|
yield from reversed(posts)
|
|
|
|
|
|
|
|
ID_PREFIX = "id:"
|
|
|
|
def predicate_for_query(query: str):
|
|
"""
|
|
@return: a function that returns whether `query` matches its argument
|
|
"""
|
|
if query.startswith(ID_PREFIX):
|
|
id_ = query[len(ID_PREFIX):]
|
|
return lambda x: x["id"] == id_
|
|
else:
|
|
return lambda x: x["name"] == query
|
|
|
|
|
|
def resolve_team(mm_api: mattermost.MMApi, query: str) -> Optional[Dict]:
|
|
return first(filter(
|
|
predicate_for_query(query),
|
|
mm_api.get_teams()
|
|
))
|
|
|
|
|
|
def resolve_channel(mm_api: mattermost.MMApi, team_id: str, query: str) -> Optional[Dict]:
|
|
return first(filter(
|
|
predicate_for_query(query),
|
|
mm_api.get_team_channels(team_id)
|
|
))
|
|
|
|
|
|
def resolve_team_channel(mm_api: mattermost.MMApi, query: str) -> Dict:
|
|
query_parts = query.split("/")
|
|
del query
|
|
if len(query_parts) != 2:
|
|
raise ValueError("Team/channel ID should be '<team>/<channel>'")
|
|
|
|
team = resolve_team(mm_api, query_parts[0])
|
|
if not team:
|
|
raise NotFound("team", query_parts[0])
|
|
|
|
channel = resolve_channel(mm_api, team["id"], query_parts[1])
|
|
if not channel:
|
|
return NotFound("channel", query_parts[1])
|
|
|
|
return team, channel
|
|
|
|
|
|
def login(mm_api, cmdline_args):
|
|
print(
|
|
f"Logging in as {cmdline_args.user}; password provided: {yes_no(cmdline_args.password)}; "
|
|
f"TOTP token provided: {yes_no(cmdline_args.totp)}",
|
|
file=sys.stderr)
|
|
mm_api.login(cmdline_args.user, cmdline_args.password, cmdline_args.totp)
|
|
|
|
if cmdline_args.format == "json":
|
|
print(json.dumps({"token": mm_api._bearer}))
|
|
elif cmdline_args.format == "tsv":
|
|
print(mm_api._bearer)
|
|
else:
|
|
assert False
|
|
|
|
|
|
def cat(mm_api: mattermost.MMApi, cmdline_args):
|
|
|
|
# channels = [
|
|
# resolve_team_channel(mm_api, query)
|
|
# for query in cmdline_args.channels
|
|
# ]
|
|
team, channel = resolve_team_channel(mm_api, cmdline_args.channel)
|
|
|
|
users = list(mm_api.get_users())
|
|
if not cmdline_args.ids:
|
|
def attribute(key_value):
|
|
key, value = key_value
|
|
if key == "channel_id":
|
|
assert value == channel["id"]
|
|
return "channel", channel["name"]
|
|
if key == "user_id":
|
|
return "username", first(u["username"] for u in users if u["id"] == value)
|
|
return key_value
|
|
else:
|
|
def attribute(key_value):
|
|
return key_value
|
|
|
|
|
|
# In a list to allow overwriting from within print_initial_messages without using global
|
|
backlog = [ [] ]
|
|
backlog_lock = threading.Lock()
|
|
|
|
def print_initial_messages():
|
|
posts = get_posts_for_channel(mm_api, channel["id"], after=cmdline_args.after, since=cmdline_args.since)
|
|
for post in posts:
|
|
print(str_for_post(attribute, post, cmdline_args))
|
|
|
|
with backlog_lock:
|
|
for post in backlog[0]:
|
|
print(str_for_post(attribute, post, cmdline_args))
|
|
backlog[0] = None
|
|
|
|
if cmdline_args.follow:
|
|
def simple_websocket_callback(_mmws, event_data):
|
|
if event_data.get("event") == "posted":
|
|
post = json.loads(event_data["data"]["post"])
|
|
if post["channel_id"] != channel["id"]:
|
|
return
|
|
print(str_for_post(attribute, post, cmdline_args), flush=True)
|
|
|
|
def initial_websocket_callback(mmws: MMws, event_data):
|
|
if event_data.get("event") == "posted":
|
|
post = json.loads(event_data["data"]["post"])
|
|
if post["channel_id"] != channel["id"]:
|
|
return
|
|
with backlog_lock:
|
|
if backlog[0] is not None:
|
|
backlog[0].append(post)
|
|
return
|
|
else:
|
|
mmws.ws_handler = simple_websocket_callback
|
|
simple_websocket_callback(mmws, event_data)
|
|
|
|
ws_url = http_to_ws(mm_api._url) + "/v4/websocket"
|
|
mmws = MMws(initial_websocket_callback, mm_api.access_token, ws_url)
|
|
|
|
thread = threading.Thread(target=print_initial_messages)
|
|
thread.setDaemon(True)
|
|
thread.start()
|
|
|
|
mmws.run_websocket()
|
|
|
|
else:
|
|
print_initial_messages()
|
|
|
|
|
|
def send(mm_api: mattermost.MMApi, cmdline_args):
|
|
read_stdin = cmdline_args.message is None or cmdline_args.channel is None
|
|
|
|
team, channel = resolve_team_channel(mm_api, cmdline_args.channel) if cmdline_args.channel is not None else (None, None)
|
|
|
|
if read_stdin:
|
|
if sys.stdin.isatty():
|
|
print("Reading from tty. (You can type the message objects below. Or maybe you meant to redirect something to stdin.)", file=sys.stderr)
|
|
|
|
for line in sys.stdin:
|
|
msg = json.loads(line)
|
|
|
|
if "channel_id" in msg:
|
|
channel_id = msg["channel_id"]
|
|
elif "channel" in msg:
|
|
_, local_channel = resolve_team_channel(mm_api, msg["channel"])
|
|
channel_id = local_channel["id"]
|
|
elif channel is not None:
|
|
channel_id = channel["id"]
|
|
else:
|
|
print(f"Illegal message, missing channel: {line.strip()}", file=sys.stderr)
|
|
raise ValueError("Illegal message, missing channel")
|
|
|
|
sent = mm_api.create_post(channel_id, msg["message"], props={"from_mmcli": "true"}, filepaths=msg.get("attachments", msg.get("attachments")))
|
|
print(sent)
|
|
|
|
else:
|
|
sent = mm_api.create_post(channel["id"], cmdline_args.message, props={"from_mmcli": "true"}, filepaths=cmdline_args.attach)
|
|
print(sent)
|
|
|
|
|
|
def tsv_escape(text):
|
|
return text.replace("\\", "\\\\").replace("\t", r"\t").replace("\n", r"\n")
|
|
|
|
|
|
def str_for_post(attribute, post, cmdline_args):
|
|
obj = {
|
|
k: v
|
|
for k, v in map(attribute, post.items())
|
|
if (v or k == "message") and (k != "update_at" or post["update_at"] != post["create_at"])
|
|
}
|
|
|
|
if cmdline_args.format == "json":
|
|
return json.dumps(obj)
|
|
if cmdline_args.format == "tsv":
|
|
msg = tsv_escape(obj.get("message", ""))
|
|
return f"{obj['id']}\t{obj['create_at']}\t{obj.get('username') or obj['user_id']}\t{msg}"
|
|
assert False
|
|
|
|
|
|
ACTIONS = {
|
|
"login": {"function": login, "accesstoken_required": False},
|
|
"cat": {"function": cat},
|
|
"send": {"function": send},
|
|
}
|
|
|
|
FORMATTERS = { "json", "tsv" }
|
|
|
|
ENVVAR_SERVER = "MM_SERVER"
|
|
ENVVAR_USERNAME = "MM_USERNAME"
|
|
ENVVAR_PASSWORD = "MM_PASSWORD"
|
|
ENVVAR_TOTP = "MM_TOTP"
|
|
ENVVAR_ACCESSTOKEN = "MM_ACCESSTOKEN"
|
|
|
|
def main():
|
|
prog_name = os.path.basename(sys.argv[0])
|
|
description = "Interact with Mattermost on the CLI"
|
|
epilog = f"""
|
|
For further help, use `{prog_name} <action> -h`.
|
|
|
|
Where a "URL name" is required, "id:" plus an ID can also be used instead. So these could both be valid:
|
|
town-square
|
|
id:123abc456def789ghi012jkl34
|
|
|
|
Hint: JSON output can be filtered on the command line with jq(1).
|
|
""".strip()
|
|
|
|
argparser = argparse.ArgumentParser(
|
|
prog_name, description=description, epilog=epilog,
|
|
formatter_class=argparse.RawTextHelpFormatter
|
|
)
|
|
argparser.add_argument("-i", "--ids", help="use IDs instead of names", action="store_true")
|
|
argparser.add_argument(
|
|
"--format", help="output format; only json has all fields; default: json", choices=FORMATTERS, default="json")
|
|
|
|
argparser.add_argument(
|
|
"--server",
|
|
help=f"e.g.: mattermost.example.org; example.org/mattermost; envvar: {ENVVAR_SERVER}",
|
|
default=os.getenv(ENVVAR_SERVER))
|
|
|
|
subparsers = argparser.add_subparsers(title="actions", dest="action", required=True)
|
|
|
|
parser_login = subparsers.add_parser("login", help="retrieve an access token")
|
|
parser_login.add_argument("login_id", help="username or email", default=os.getenv(ENVVAR_USERNAME))
|
|
parser_login.add_argument("--password", default=os.getenv(ENVVAR_PASSWORD))
|
|
parser_login.add_argument("--totp", default=os.getenv(ENVVAR_TOTP))
|
|
|
|
# TODO support multiple channels
|
|
# parser_cat = subparsers.add_parser("cat", help="list messages in channel(s)")
|
|
# parser_cat.add_argument(
|
|
# "channels", nargs="+", help="URL names of team and channel: '<team>/<channel>'")
|
|
parser_cat = subparsers.add_parser("cat", help="list messages in channel")
|
|
parser_cat.add_argument("channel", help="URL names of team and channel: '<team>/<channel>'")
|
|
# ---
|
|
parser_cat.add_argument("--after", help="all after post with ID")
|
|
parser_cat.add_argument("--since", help="all after timestamp")
|
|
parser_cat.add_argument("-f", "--follow", action="store_true", help="keep running, printing new posts as they come in")
|
|
|
|
parser_send = subparsers.add_parser("send", help="send message(s)")
|
|
parser_send.add_argument(
|
|
"--channel", help="URL names of team and channel: '<team>/<channel>'; if not provided, "
|
|
"messages must be provided on stdin and each must specify channel")
|
|
parser_send.add_argument(
|
|
"--message", help="message; if not provided, messages will be expected on stdin")
|
|
parser_send.add_argument(
|
|
"--attach", nargs="+", help="filename of file to attach")
|
|
|
|
parsed = argparser.parse_args()
|
|
|
|
if not parsed.server:
|
|
argparser.error(
|
|
f"server is required; use argument --server or environment variable {ENVVAR_SERVER}")
|
|
|
|
access_token = os.getenv(ENVVAR_ACCESSTOKEN)
|
|
if ACTIONS[parsed.action].get("accesstoken_required", True) and not access_token:
|
|
argparser.error(
|
|
f"`{prog_name} {parsed.action}` requires access token; get one with `{prog_name} login` "
|
|
f"and set environment variable {ENVVAR_ACCESSTOKEN}")
|
|
|
|
server = parsed.server if re.match(r"^[a-z]+://", parsed.server) else f"https://{parsed.server}"
|
|
mm_api = mattermost.MMApi(f"{server}/api")
|
|
mm_api.access_token = access_token
|
|
if access_token:
|
|
mm_api._headers.update({"Authorization": f"Bearer {access_token}"})
|
|
|
|
ACTIONS[parsed.action]["function"](mm_api, parsed)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|