#!/usr/bin/env python3 import sys import argparse import os import json from typing import Dict, Optional, List, Iterable import re from time import sleep import threading import mattermost from .parsedt import parse_datetime_to_utc from .mmws import MMws class NotFound(Exception): def __init__(self, type_: str, name: str): super().__init__(f"{type_} {name} not found") self.type = type_ self.name = name def first(iterable, default=None): for x in iterable: return x return default def yes_no(x): return "yes" if x else "no" def http_to_ws(url): """ Transform url from http to ws and https to wss """ assert url.startswith("http://") or url.startswith("https://") return "ws" + url[4:] def warn_if_tty(you_can_type="the message objects", write_message_to=sys.stderr): if sys.stdin.isatty(): print(f"Reading from tty. (You can type {you_can_type} below. Or maybe you meant to redirect something to stdin.)", file=write_message_to) def get_posts_for_channel(self, channel_id: str, progress=lambda x: None, after=None, since=None, **kwargs) -> Iterable[Dict]: """ @raises ApiException: Passed on from lower layers. """ per_page = 200 page = 0 total = 0 # if after and since: # raise ValueError("after and since cannot be used together") if since: raise Exception("'since' functionality is broken in the API and behaves non-deterministically. It cannot be meaningfully used.") # Posts in channel updated after a given timestamp: pagination is broken in the API # current_since = since # while True: # data_page = self._get(f"/v4/channels/{channel_id}/posts", params={"since": current_since, **kwargs}) # order = data_page["order"] # yield from ( # data_page["posts"][post_id] # for post_id in reversed(order) # ) # total += len(order) # progress(total) # if len(order) < 1000: # For some reason the pages go up to 1000 posts if 'since' is given # break # current_since = data_page["posts"][order[0]]["create_at"] # sleep(0.1) elif after: # Posts in channel after a given ID: API gives pages with OLDEST messages first, so we can # yield each page when it is fetched while True: data_page = self._get(f"/v4/channels/{channel_id}/posts", params={"page": page, "per_page": per_page, "after": after, **kwargs}) order = data_page["order"] yield from ( data_page["posts"][post_id] for post_id in reversed(order) ) total += len(order) progress(total) if len(order) < per_page: break page += 1 sleep(0.1) else: # All posts in channel: API gives pages with NEWEST messages first, so reverse the order in # the end (and don't reverse the order of each page separately) posts = [] while True: data_page = self._get(f"/v4/channels/{channel_id}/posts", params={"page": page, "per_page": per_page, **kwargs}) order = data_page["order"] posts.extend( data_page["posts"][post_id] for post_id in order ) progress(len(posts)) if len(order) < per_page: break page += 1 sleep(0.1) yield from reversed(posts) ID_PREFIX = "id:" def predicate_for_query(query: str): """ @return: a function that returns whether `query` matches its argument """ if query.startswith(ID_PREFIX): id_ = query[len(ID_PREFIX):] return lambda x: x["id"] == id_ else: return lambda x: x["name"] == query def resolve_team(mm_api: mattermost.MMApi, query: str) -> Optional[Dict]: return first(filter( predicate_for_query(query), mm_api.get_teams() )) def resolve_channel(mm_api: mattermost.MMApi, team_id: str, query: str) -> Optional[Dict]: return first(filter( predicate_for_query(query), mm_api.get_team_channels(team_id) )) def resolve_team_channel(mm_api: mattermost.MMApi, query: str) -> Dict: query_parts = query.split("/") del query if len(query_parts) != 2: raise ValueError("Team/channel ID should be '/'") team = resolve_team(mm_api, query_parts[0]) if not team: raise NotFound("team", query_parts[0]) if query_parts[1].startswith("id:"): channel = mm_api.get_channel(query_parts[1][3:]) else: channel = resolve_channel(mm_api, team["id"], query_parts[1]) if not channel: return NotFound("channel", query_parts[1]) return team, channel def login(mm_api, cmdline_args): print( f"Logging in as {cmdline_args.login_id!r}; password provided: {yes_no(cmdline_args.password)}; " f"TOTP token provided: {yes_no(cmdline_args.totp)}", file=sys.stderr) mm_api.login(cmdline_args.login_id, cmdline_args.password, cmdline_args.totp) if cmdline_args.format == "json": print(json.dumps({"token": mm_api._bearer})) elif cmdline_args.format == "tsv": print(mm_api._bearer) else: assert False def cat(mm_api: mattermost.MMApi, cmdline_args): # channels = [ # resolve_team_channel(mm_api, query) # for query in cmdline_args.channels # ] team, channel = resolve_team_channel(mm_api, cmdline_args.channel) if not cmdline_args.ids: users = list(mm_api.get_users()) def attribute(key_value): key, value = key_value if key == "channel_id": assert value == channel["id"] return "channel", channel["name"] if key == "user_id": return "username", first(u["username"] for u in users if u["id"] == value) return key_value else: def attribute(key_value): return key_value # In a list to allow overwriting from within print_initial_messages without using global backlog = [ [] ] backlog_lock = threading.Lock() def print_initial_messages(): posts = get_posts_for_channel(mm_api, channel["id"], after=cmdline_args.after, since=cmdline_args.since) for post in posts: print(str_for_post(attribute, post, cmdline_args)) with backlog_lock: for post in backlog[0]: print(str_for_post(attribute, post, cmdline_args)) backlog[0] = None if cmdline_args.follow: def simple_websocket_callback(_mmws, event_data): if event_data.get("event") == "posted": post = json.loads(event_data["data"]["post"]) if post["channel_id"] != channel["id"]: return print(str_for_post(attribute, post, cmdline_args), flush=True) def initial_websocket_callback(mmws: MMws, event_data): if event_data.get("event") == "posted": post = json.loads(event_data["data"]["post"]) if post["channel_id"] != channel["id"]: return with backlog_lock: if backlog[0] is not None: backlog[0].append(post) return else: mmws.ws_handler = simple_websocket_callback simple_websocket_callback(mmws, event_data) ws_url = http_to_ws(mm_api._url) + "/v4/websocket" mmws = MMws(initial_websocket_callback, mm_api.access_token, ws_url) thread = threading.Thread(target=print_initial_messages) thread.setDaemon(True) thread.start() mmws.run_websocket() else: print_initial_messages() def tail(mm_api: mattermost.MMApi, cmdline_args): team, channel = resolve_team_channel(mm_api, cmdline_args.channel) if not cmdline_args.ids: users = list(mm_api.get_users()) def attribute(key_value): key, value = key_value if key == "channel_id": assert value == channel["id"] return "channel", channel["name"] if key == "user_id": return "username", first(u["username"] for u in users if u["id"] == value) return key_value else: def attribute(key_value): return key_value # In a list to allow overwriting from within print_initial_messages without using global backlog = [ [] ] backlog_lock = threading.Lock() def print_initial_messages(): data_page = mm_api._get(f"/v4/channels/{channel['id']}/posts") order = data_page["order"] posts = [ data_page["posts"][post_id] for post_id in reversed(order) ] for post in posts: print(str_for_post(attribute, post, cmdline_args)) with backlog_lock: for post in backlog[0]: print(str_for_post(attribute, post, cmdline_args)) backlog[0] = None if cmdline_args.follow: def simple_websocket_callback(_mmws, event_data): if event_data.get("event") == "posted": post = json.loads(event_data["data"]["post"]) if post["channel_id"] != channel["id"]: return print(str_for_post(attribute, post, cmdline_args), flush=True) def initial_websocket_callback(mmws: MMws, event_data): if event_data.get("event") == "posted": post = json.loads(event_data["data"]["post"]) if post["channel_id"] != channel["id"]: return with backlog_lock: if backlog[0] is not None: backlog[0].append(post) return else: mmws.ws_handler = simple_websocket_callback simple_websocket_callback(mmws, event_data) ws_url = http_to_ws(mm_api._url) + "/v4/websocket" mmws = MMws(initial_websocket_callback, mm_api.access_token, ws_url) thread = threading.Thread(target=print_initial_messages) thread.setDaemon(True) thread.start() mmws.run_websocket() else: print_initial_messages() def ls(mm_api: mattermost.MMApi, cmdline_args): # TODO --follow doesn't work for channel creation and deletion yet # In a list to allow overwriting from within print_initial_channels without using global backlog = [ [] ] backlog_lock = threading.Lock() team = resolve_team(mm_api, cmdline_args.team) events = {"channel_converted", "channel_created", "channel_deleted", "channel_updated"} def print_initial_channels(): for channel in mm_api.get_team_channels(team["id"]): print(str_for_chan(lambda x: x, channel, cmdline_args)) with backlog_lock: for channel in backlog[0]: print(str_for_chan(lambda x: x, channel, cmdline_args)) backlog[0] = None if cmdline_args.follow: def simple_websocket_callback(_mmws, event_data): if event_data.get("event") in events: channel = json.loads(event_data["data"]["channel"]) if channel["team_id"] != team["id"]: return print(str_for_chan(lambda x: x, channel, cmdline_args)) def initial_websocket_callback(mmws: MMws, event_data): if event_data.get("event") in events: channel = json.loads(event_data["data"]["channel"]) if channel["team_id"] != team["id"]: return with backlog_lock: if backlog[0] is not None: backlog[0].append(channel) return else: mmws.ws_handler = simple_websocket_callback simple_websocket_callback(mmws, event_data) ws_url = http_to_ws(mm_api._url) + "/v4/websocket" mmws = MMws(initial_websocket_callback, mm_api.access_token, ws_url) thread = threading.Thread(target=print_initial_channels) thread.setDaemon(True) thread.start() mmws.run_websocket() else: print_initial_channels() def send(mm_api: mattermost.MMApi, cmdline_args): read_stdin = cmdline_args.message is None or cmdline_args.channel is None team, channel = resolve_team_channel(mm_api, cmdline_args.channel) if cmdline_args.channel is not None else (None, None) if read_stdin: warn_if_tty() for line in sys.stdin: msg = json.loads(line) if "channel_id" in msg: channel_id = msg["channel_id"] elif "channel" in msg: _, local_channel = resolve_team_channel(mm_api, msg["channel"]) channel_id = local_channel["id"] elif channel is not None: channel_id = channel["id"] else: print(f"Illegal message, missing channel: {line.strip()}", file=sys.stderr) raise ValueError("Illegal message, missing channel") sent = mm_api.create_post(channel_id, msg["message"], props={"from_mmcli": "true"}, filepaths=msg.get("attachments", msg.get("attachments"))) print(sent) else: sent = mm_api.create_post(channel["id"], cmdline_args.message, props={"from_mmcli": "true"}, filepaths=cmdline_args.attach) print(sent) def rm(mm_api: mattermost.MMApi, cmdline_args): mm_api.delete_post(cmdline_args.msgid) def edit(mm_api: mattermost.MMApi, cmdline_args): if cmdline_args.message is None: warn_if_tty(you_can_type="the new message text") new_text = sys.stdin.read() else: new_text = cmdline_args.message mm_api.patch_post(cmdline_args.msgid, message=new_text) def status(mm_api: mattermost.MMApi, cmdline_args): if not cmdline_args.status: raise ValueError("No status selected") # This API endpoint requires the user ID to be passed explicitly in the request body, # duplicating the info in the URL. But "me" does not suffice here. my_user_id = mm_api.get_user()["id"] mm_api._put(f"/v4/users/me/status", data={ "user_id": my_user_id, "status": cmdline_args.status }) def customstatus(mm_api: mattermost.MMApi, cmdline_args): until = parse_datetime_to_utc(cmdline_args.until) if cmdline_args.until else None if cmdline_args.text or cmdline_args.emoji: mm_api._put(f"/v4/users/me/status/custom", data={ "emoji": cmdline_args.emoji, "text": cmdline_args.text, "expires_at": until.isoformat() if until else None }) else: mm_api._delete(f"/v4/users/me/status/custom") def lastread(mm_api: mattermost.MMApi, cmdline_args): team, channel = resolve_team_channel(mm_api, cmdline_args.channel) if cmdline_args.channel is not None else (None, None) assert channel is not None response = mm_api._get(f"/v4/users/me/channels/{channel['id']}/posts/unread?limit_after=1&limit_before=0") if response["order"]: assert len(response["order"]) == 1 last_read_id = response["order"][0] else: last_read_id = None if cmdline_args.format == "json": print(json.dumps(last_read_id)) if cmdline_args.format == "tsv": print(last_read_id or "null") def tsv_escape(text): return text.replace("\\", "\\\\").replace("\t", r"\t").replace("\n", r"\n") def str_for_post(attribute, post, cmdline_args): obj = { k: v for k, v in map(attribute, post.items()) if (v or k == "message") and (k != "update_at" or post["update_at"] != post["create_at"]) } if cmdline_args.format == "json": return json.dumps(obj) if cmdline_args.format == "tsv": msg = tsv_escape(obj.get("message", "")) return f"{obj['id']}\t{obj['create_at']}\t{obj.get('username') or obj['user_id']}\t{msg}" assert False def str_for_chan(attribute, channel, cmdline_args): obj = { k: v for k, v in map(attribute, channel.items()) } if cmdline_args.format == "json": return json.dumps(obj) if cmdline_args.format == "tsv": # TODO header = tsv_escape(obj.get("header", "")) purpose = tsv_escape(obj.get("purpose", "")) return f"{obj['id']}\t{obj['name']}\t{obj.get('display_name')}\t{obj.get('create_at')}\t{obj.get('delete_at')}\t{purpose}\t{header}" assert False ACTIONS = { "login": {"function": login, "accesstoken_required": False}, "cat": {"function": cat}, "tail": {"function": tail}, "ls": {"function": ls}, "send": {"function": send}, "rm": {"function": rm}, "edit": {"function": edit}, "status": {"function": status}, "customstatus": {"function": customstatus}, "lastread": {"function": lastread}, } FORMATTERS = { "json", "tsv" } ENVVAR_SERVER = "MM_SERVER" ENVVAR_USERNAME = "MM_USERNAME" ENVVAR_PASSWORD = "MM_PASSWORD" ENVVAR_TOTP = "MM_TOTP" ENVVAR_ACCESSTOKEN = "MM_ACCESSTOKEN" def main(): prog_name = os.path.basename(sys.argv[0]) description = "Interact with Mattermost on the CLI and in scripts" epilog = f""" For further help, use `{prog_name} -h`. Where a "URL name" is required, "id:" plus an ID can also be used instead. So these could both be valid: town-square id:123abc456def789ghi012jkl34 Hint: JSON output can be filtered with jq(1). """.strip() argparser = argparse.ArgumentParser( prog_name, description=description, epilog=epilog, formatter_class=argparse.RawTextHelpFormatter ) argparser.add_argument("-i", "--ids", help="use IDs instead of names", action="store_true") argparser.add_argument( "--format", help="output format; only json has all fields; default: %(default)s", choices=FORMATTERS, default="json") argparser.add_argument( "--server", help=f"e.g.: mattermost.example.org; example.org/mattermost; envvar: {ENVVAR_SERVER}", default=os.getenv(ENVVAR_SERVER)) subparsers = argparser.add_subparsers(title="actions", dest="action", required=True) password_argument_warning = f""" Security note: Other programs and users can typically read which arguments you give to any program. Therefore it strongly advised to use the environment variable (envvar) method when passing the credentials to the program. In many shells you can do so like this: {ENVVAR_USERNAME}='aiden' {ENVVAR_PASSWORD}='2FifeVg2UGbCETYdaWscf7hmDvUHbp' {prog_name} login """.strip() parser_login = subparsers.add_parser( "login", help="retrieve an access token", epilog=password_argument_warning, formatter_class=argparse.RawTextHelpFormatter) parser_login.add_argument( "login_id", help=f"username or email; envvar: {ENVVAR_USERNAME}", default=os.getenv(ENVVAR_USERNAME)) parser_login.add_argument( "--password", help=f"see security note below; envvar: {ENVVAR_PASSWORD}", default=os.getenv(ENVVAR_PASSWORD)) parser_login.add_argument( "--totp", help=f"see security note below; envvar: {ENVVAR_TOTP}", default=os.getenv(ENVVAR_TOTP)) # TODO support multiple channels # parser_cat = subparsers.add_parser("cat", help="list messages in channel(s)") # parser_cat.add_argument( # "channels", nargs="+", help="URL names of team and channel: '/'") parser_cat = subparsers.add_parser("cat", help="list messages in channel") parser_cat.add_argument("channel", help="URL names of team and channel: '/'") # --- parser_cat.add_argument("--after", help="all after post with ID") parser_cat.add_argument("--since", help="all after timestamp") parser_cat.add_argument("-f", "--follow", action="store_true", help="keep running, printing new posts as they come in") parser_tail = subparsers.add_parser("tail", help="list newest messages in channel") parser_tail.add_argument("channel", help="URL names of team and channel: '/'") parser_tail.add_argument("-f", "--follow", action="store_true", help="keep running, printing new posts as they come in") parser_ls = subparsers.add_parser("ls", help="list channels") parser_ls.add_argument("team", help="URL name of team") parser_ls.add_argument("-f", "--follow", action="store_true", help="keep running, printing changes to channels as they come in") send_json_format = """ The input format accepted on stdin is one JSON object per line. The possible fields are 'message', 'channel' (URL names of team and channel: '/'), 'channel_id' """.strip() parser_send = subparsers.add_parser( "send", help="send message(s)", epilog=send_json_format) parser_send.add_argument( "--channel", help="URL names of team and channel: '/'; if not provided, " "messages must be provided on stdin and each must specify channel") parser_send.add_argument( "--message", help="message; if not provided, messages will be expected on stdin") parser_send.add_argument( "--attach", nargs="+", help="filename of file to attach") parser_rm = subparsers.add_parser("rm", help="delete message(s)") parser_rm.add_argument("msgid", help="ID of message to delete") parser_edit = subparsers.add_parser( "edit", help="edit message(s)", epilog="The input accepted on stdin will be used as-is as the new text.") parser_edit.add_argument("msgid", help="ID of message to edit") parser_edit.add_argument( "--message", help="message; if not provided, message will be expected on stdin") parser_status = subparsers.add_parser("status", help="update user status") parser_status.add_argument("--online", dest="status", action="store_const", const="online", help="Set status to online") parser_status.add_argument("--away", dest="status", action="store_const", const="away", help="Set status to away") parser_status.add_argument("--dnd", dest="status", action="store_const", const="dnd", help="Set status to 'do not disturb'") parser_status.add_argument("--offline", dest="status", action="store_const", const="offline", help="Set status to offline") parser_customstatus = subparsers.add_parser("customstatus", help="update custom user status (emoji and message)") parser_customstatus.add_argument("--until", help="Datetime of when to clear the custom status") parser_customstatus.add_argument("--emoji", help="Name of emoji (without colons), e.g. coffee") parser_customstatus.add_argument("text" , help="Text for the status", nargs="?") parser_lastread = subparsers.add_parser("lastread", help="last read message in channel; will be null if all messages are read") parser_lastread.add_argument("channel", help="URL names of team and channel: '/'") parsed = argparser.parse_args() if not parsed.server: argparser.error( f"server is required; use argument --server or environment variable {ENVVAR_SERVER}") access_token = os.getenv(ENVVAR_ACCESSTOKEN) if ACTIONS[parsed.action].get("accesstoken_required", True) and not access_token: argparser.error( f"`{prog_name} {parsed.action}` requires access token; get one with `{prog_name} login` " f"and set environment variable {ENVVAR_ACCESSTOKEN}") server = parsed.server if re.match(r"^[a-z]+://", parsed.server) else f"https://{parsed.server}" mm_api = mattermost.MMApi(f"{server}/api") mm_api.access_token = access_token if access_token: mm_api._headers.update({"Authorization": f"Bearer {access_token}"}) ACTIONS[parsed.action]["function"](mm_api, parsed) if __name__ == "__main__": main()