mmcli/mmcli.py

311 lines
9.1 KiB
Python
Executable file

#!/usr/bin/env python3
import sys
import argparse
import os
from collections import defaultdict
import datetime
from time import sleep
import time
import json
from typing import Dict, Set, Generator, Optional, NamedTuple
import mattermost
import re
from threading import Lock
from mmws import MMws
class NotFound(Exception):
def __init__(self, type_: str, name: str):
super().__init__(f"{type_} {name} not found")
self.type = type_
self.name = name
def first(iterable, default=None):
for x in iterable:
return x
return default
def yes_no(x):
return "yes" if x else "no"
def http_to_ws(url):
"""
Transform url from http to ws and https to wss
"""
assert url.startswith("http://") or url.startswith("https://")
return "ws" + url[4:]
def get_posts_for_channel(self, channel_id: str, progress=lambda x: None, **kwargs) -> Generator[Dict, None, None]:
"""
@raises ApiException: Passed on from lower layers.
"""
page = 0
posts = []
while True:
data_page = self._get(f"/v4/channels/{channel_id}/posts", params={"page":str(page), "per_page":200, **kwargs})
if data_page["order"] == []:
break
page += 1
posts.extend(data_page["posts"][order] for order in data_page["order"])
progress(len(posts))
sleep(0.1)
# Mattermost gives newest first, so reverse order
posts.reverse()
return posts
ID_PREFIX = "id:"
def predicate_for_query(query: str):
"""
@return: a function that returns whether `query` matches its argument
"""
if query.startswith(ID_PREFIX):
id_ = query[len(ID_PREFIX):]
return lambda x: x["id"] == id_
else:
return lambda x: x["name"] == query
def resolve_team(mm_api: mattermost.MMApi, query: str) -> Optional[Dict]:
return first(filter(
predicate_for_query(query),
mm_api.get_teams()
))
def resolve_channel(mm_api: mattermost.MMApi, team_id: str, query: str) -> Optional[Dict]:
return first(filter(
predicate_for_query(query),
mm_api.get_team_channels(team_id)
))
def resolve_team_channel(mm_api: mattermost.MMApi, query: str) -> Dict:
query_parts = query.split("/")
del query
if len(query_parts) != 2:
raise ValueError("Team/channel ID should be '<team>/<channel>'")
team = resolve_team(mm_api, query_parts[0])
if not team:
raise NotFound("team", query_parts[0])
channel = resolve_channel(mm_api, team["id"], query_parts[1])
if not channel:
return NotFound("channel", query_parts[1])
return team, channel
def login(mm_api, parsed):
print(
f"Logging in as {parsed.user}; password provided: {yes_no(parsed.password)}; "
f"TOTP token provided: {yes_no(parsed.totp)}",
file=sys.stderr)
mm_api.login(parsed.user, parsed.password, parsed.totp)
if parsed.format == "json":
print(json.dumps({"token": mm_api._bearer}))
elif parsed.format == "tsv":
print(mm_api._bearer)
else:
assert False
def cat(mm_api: mattermost.MMApi, parsed):
# channels = [
# resolve_team_channel(mm_api, query)
# for query in parsed.channels
# ]
team, channel = resolve_team_channel(mm_api, parsed.channel)
users = list(mm_api.get_users())
if not parsed.ids:
def attribute(key_value):
key, value = key_value
if key == "channel_id":
assert value == channel["id"]
return "channel", channel["name"]
if key == "user_id":
return "username", first(u["username"] for u in users if u["id"] == value)
return key_value
else:
def attribute(key_value):
return key_value
# backlog = []
# backlog_lock = Lock()
if parsed.follow:
raise NotImplementedError("--follow is not yet supported")
# def webs_handler(mmws, event_data):
# if event_data["event"] == "posted":
# with backlog_lock:
# if backlog is not None:
# backlog.append(event_data["data"])
# return
# print(post_str(attribute, event_data["data"], parsed))
# ws_url = http_to_ws(mm_api._url) + "/v4/websocket"
# MMws(webs_handler, mm_api, ws_url)
# return
posts = get_posts_for_channel(mm_api, channel["id"], after=parsed.after)
for post in posts:
print(post_str(attribute, post, parsed))
# with backlog_lock:
# for post in backlog:
# print(post_str(attribute, post, parsed))
# backlog = None
def send(mm_api: mattermost.MMApi, parsed):
read_stdin = parsed.message is None or parsed.channel is None
team, channel = resolve_team_channel(mm_api, parsed.channel) if parsed.channel is not None else (None, None)
if read_stdin:
if sys.stdin.isatty():
print("Reading from tty. (You can type the message objects below. Or maybe you meant to redirect something to stdin.)", file=sys.stderr)
for line in sys.stdin:
msg = json.loads(line)
if "channel_id" in msg:
channel_id = msg["channel_id"]
elif "channel" in msg:
_, local_channel = resolve_team_channel(mm_api, msg["channel"])
channel_id = local_channel["id"]
elif channel is not None:
channel_id = channel["id"]
else:
print(f"Illegal message, missing channel: {line.strip()}", file=sys.stderr)
raise ValueError("Illegal message, missing channel")
sent = mm_api.create_post(channel_id, msg["message"], props={"from_mmcli": "true"}, filepaths=msg.get("attachments", msg.get("attachments")))
print(sent)
else:
sent = mm_api.create_post(channel["id"], parsed.message, props={"from_mmcli": "true"}, filepaths=parsed.attach)
print(sent)
def post_str(attribute, post, parsed):
obj = {
k: v
for k, v in map(attribute, post.items())
if (v or k == "message") and (k != "update_at" or post["update_at"] != post["create_at"])
}
if parsed.format == "json":
return json.dumps(obj)
if parsed.format == "tsv":
msg = obj.get("message", "").replace("\\", "\\\\").replace("\t", r"\t").replace("\n", r"\n")
return f"{obj['id']}\t{obj['create_at']}\t{obj.get('username') or obj['user_id']}\t{msg}"
assert False
ACTIONS = {
"login": {"function": login, "accesstoken_required": False},
"cat": {"function": cat},
"send": {"function": send},
}
FORMATTERS = { "json", "tsv" }
ENVVAR_SERVER = "MM_SERVER"
ENVVAR_USERNAME = "MM_USERNAME"
ENVVAR_PASSWORD = "MM_PASSWORD"
ENVVAR_TOTP = "MM_TOTP"
ENVVAR_ACCESSTOKEN = "MM_ACCESSTOKEN"
def main():
prog_name = os.path.basename(sys.argv[0])
description = "Interact with Mattermost on the CLI"
epilog = f"""
For further help, use `{prog_name} <action> -h`.
Where a "URL name" is required, "id:" plus an ID can also be used instead. So these could both be valid:
town-square
id:123abc456def789ghi012jkl34
Hint: JSON output can be filtered on the command line with jq(1).
""".strip()
argparser = argparse.ArgumentParser(
prog_name, description=description, epilog=epilog,
formatter_class=argparse.RawTextHelpFormatter
)
argparser.add_argument("-i", "--ids", help="use IDs instead of names", action="store_true")
argparser.add_argument(
"--format", help="output format; only json has all fields; default: json", choices=FORMATTERS, default="json")
argparser.add_argument(
"--server",
help="e.g.: mattermost.example.org; example.org/mattermost; envvar: {ENVVAR_SERVER}",
default=os.getenv(ENVVAR_SERVER))
subparsers = argparser.add_subparsers(title="actions", dest="action", required=True)
parser_login = subparsers.add_parser("login", help="retrieve an access token")
parser_login.add_argument("login_id", help="username or email", default=os.getenv(ENVVAR_USERNAME))
parser_login.add_argument("--password", default=os.getenv(ENVVAR_PASSWORD))
parser_login.add_argument("--totp", default=os.getenv(ENVVAR_TOTP))
# TODO support multiple channels
# parser_cat = subparsers.add_parser("cat", help="list messages in channel(s)")
# parser_cat.add_argument(
# "channels", nargs="+", help="URL names of team and channel: '<team>/<channel>'")
parser_cat = subparsers.add_parser("cat", help="list messages in channel")
parser_cat.add_argument("channel", help="URL names of team and channel: '<team>/<channel>'")
# ---
parser_cat.add_argument("--after", help="all after post with ID")
parser_cat.add_argument("--since", help="all after timestamp")
parser_cat.add_argument("-f", "--follow", action="store_true", help="keep running, printing new posts as they come in")
parser_send = subparsers.add_parser("send", help="send message(s)")
parser_send.add_argument(
"--channel", help="URL names of team and channel: '<team>/<channel>'; if not provided, "
"messages must be provided on stdin and each must specify channel")
parser_send.add_argument(
"--message", help="message; if not provided, messages will be expected on stdin")
parser_send.add_argument(
"--attach", nargs="+", help="filename of file to attach")
parsed = argparser.parse_args()
if not parsed.server:
argparser.error(
f"server is required; use argument --server or environment variable {ENVVAR_SERVER}")
access_token = os.getenv(ENVVAR_ACCESSTOKEN)
if ACTIONS[parsed.action].get("accesstoken_required", True) and not access_token:
argparser.error(
f"`{prog_name} {parsed.action}` requires access token; get one with `{prog_name} login` "
f"and set environment variable {ENVVAR_ACCESSTOKEN}")
server = parsed.server if re.match(r"^[a-z]+://", parsed.server) else f"https://{parsed.server}"
mm_api = mattermost.MMApi(f"{server}/api")
if access_token:
mm_api._headers.update({"Authorization": f"Bearer {access_token}"})
ACTIONS[parsed.action]["function"](mm_api, parsed)
if __name__ == "__main__":
main()