mmcli/mmcli.py

307 lines
9 KiB
Python
Executable file

#!/usr/bin/env python3
import sys
import argparse
import os
import json
from typing import Dict, Optional, List
import re
# from threading import Lock
import mattermost
# from mmws import MMws
class NotFound(Exception):
def __init__(self, type_: str, name: str):
super().__init__(f"{type_} {name} not found")
self.type = type_
self.name = name
def first(iterable, default=None):
for x in iterable:
return x
return default
def yes_no(x):
return "yes" if x else "no"
def http_to_ws(url):
"""
Transform url from http to ws and https to wss
"""
assert url.startswith("http://") or url.startswith("https://")
return "ws" + url[4:]
def get_posts_for_channel(self, channel_id: str, progress=lambda x: None, **kwargs) -> List[Dict]:
"""
@raises ApiException: Passed on from lower layers.
"""
page = 0
posts = []
while True:
data_page = self._get(f"/v4/channels/{channel_id}/posts", params={"page":str(page), "per_page":200, **kwargs})
if data_page["order"] == []:
break
page += 1
posts.extend(data_page["posts"][order] for order in data_page["order"])
progress(len(posts))
sleep(0.1)
# Mattermost gives newest first, so reverse order
posts.reverse()
return posts
ID_PREFIX = "id:"
def predicate_for_query(query: str):
"""
@return: a function that returns whether `query` matches its argument
"""
if query.startswith(ID_PREFIX):
id_ = query[len(ID_PREFIX):]
return lambda x: x["id"] == id_
else:
return lambda x: x["name"] == query
def resolve_team(mm_api: mattermost.MMApi, query: str) -> Optional[Dict]:
return first(filter(
predicate_for_query(query),
mm_api.get_teams()
))
def resolve_channel(mm_api: mattermost.MMApi, team_id: str, query: str) -> Optional[Dict]:
return first(filter(
predicate_for_query(query),
mm_api.get_team_channels(team_id)
))
def resolve_team_channel(mm_api: mattermost.MMApi, query: str) -> Dict:
query_parts = query.split("/")
del query
if len(query_parts) != 2:
raise ValueError("Team/channel ID should be '<team>/<channel>'")
team = resolve_team(mm_api, query_parts[0])
if not team:
raise NotFound("team", query_parts[0])
channel = resolve_channel(mm_api, team["id"], query_parts[1])
if not channel:
return NotFound("channel", query_parts[1])
return team, channel
def login(mm_api, parsed):
print(
f"Logging in as {parsed.user}; password provided: {yes_no(parsed.password)}; "
f"TOTP token provided: {yes_no(parsed.totp)}",
file=sys.stderr)
mm_api.login(parsed.user, parsed.password, parsed.totp)
if parsed.format == "json":
print(json.dumps({"token": mm_api._bearer}))
elif parsed.format == "tsv":
print(mm_api._bearer)
else:
assert False
def cat(mm_api: mattermost.MMApi, parsed):
# channels = [
# resolve_team_channel(mm_api, query)
# for query in parsed.channels
# ]
team, channel = resolve_team_channel(mm_api, parsed.channel)
users = list(mm_api.get_users())
if not parsed.ids:
def attribute(key_value):
key, value = key_value
if key == "channel_id":
assert value == channel["id"]
return "channel", channel["name"]
if key == "user_id":
return "username", first(u["username"] for u in users if u["id"] == value)
return key_value
else:
def attribute(key_value):
return key_value
# backlog = []
# backlog_lock = Lock()
if parsed.follow:
raise NotImplementedError("--follow is not yet supported")
# def webs_handler(mmws, event_data):
# if event_data["event"] == "posted":
# with backlog_lock:
# if backlog is not None:
# backlog.append(event_data["data"])
# return
# print(post_str(attribute, event_data["data"], parsed))
# ws_url = http_to_ws(mm_api._url) + "/v4/websocket"
# MMws(webs_handler, mm_api, ws_url)
# return
posts = get_posts_for_channel(mm_api, channel["id"], after=parsed.after)
for post in posts:
print(post_str(attribute, post, parsed))
# with backlog_lock:
# for post in backlog:
# print(post_str(attribute, post, parsed))
# backlog = None
def send(mm_api: mattermost.MMApi, parsed):
read_stdin = parsed.message is None or parsed.channel is None
team, channel = resolve_team_channel(mm_api, parsed.channel) if parsed.channel is not None else (None, None)
if read_stdin:
if sys.stdin.isatty():
print("Reading from tty. (You can type the message objects below. Or maybe you meant to redirect something to stdin.)", file=sys.stderr)
for line in sys.stdin:
msg = json.loads(line)
if "channel_id" in msg:
channel_id = msg["channel_id"]
elif "channel" in msg:
_, local_channel = resolve_team_channel(mm_api, msg["channel"])
channel_id = local_channel["id"]
elif channel is not None:
channel_id = channel["id"]
else:
print(f"Illegal message, missing channel: {line.strip()}", file=sys.stderr)
raise ValueError("Illegal message, missing channel")
sent = mm_api.create_post(channel_id, msg["message"], props={"from_mmcli": "true"}, filepaths=msg.get("attachments", msg.get("attachments")))
print(sent)
else:
sent = mm_api.create_post(channel["id"], parsed.message, props={"from_mmcli": "true"}, filepaths=parsed.attach)
print(sent)
def post_str(attribute, post, parsed):
obj = {
k: v
for k, v in map(attribute, post.items())
if (v or k == "message") and (k != "update_at" or post["update_at"] != post["create_at"])
}
if parsed.format == "json":
return json.dumps(obj)
if parsed.format == "tsv":
msg = obj.get("message", "").replace("\\", "\\\\").replace("\t", r"\t").replace("\n", r"\n")
return f"{obj['id']}\t{obj['create_at']}\t{obj.get('username') or obj['user_id']}\t{msg}"
assert False
ACTIONS = {
"login": {"function": login, "accesstoken_required": False},
"cat": {"function": cat},
"send": {"function": send},
}
FORMATTERS = { "json", "tsv" }
ENVVAR_SERVER = "MM_SERVER"
ENVVAR_USERNAME = "MM_USERNAME"
ENVVAR_PASSWORD = "MM_PASSWORD"
ENVVAR_TOTP = "MM_TOTP"
ENVVAR_ACCESSTOKEN = "MM_ACCESSTOKEN"
def main():
prog_name = os.path.basename(sys.argv[0])
description = "Interact with Mattermost on the CLI"
epilog = f"""
For further help, use `{prog_name} <action> -h`.
Where a "URL name" is required, "id:" plus an ID can also be used instead. So these could both be valid:
town-square
id:123abc456def789ghi012jkl34
Hint: JSON output can be filtered on the command line with jq(1).
""".strip()
argparser = argparse.ArgumentParser(
prog_name, description=description, epilog=epilog,
formatter_class=argparse.RawTextHelpFormatter
)
argparser.add_argument("-i", "--ids", help="use IDs instead of names", action="store_true")
argparser.add_argument(
"--format", help="output format; only json has all fields; default: json", choices=FORMATTERS, default="json")
argparser.add_argument(
"--server",
help="e.g.: mattermost.example.org; example.org/mattermost; envvar: {ENVVAR_SERVER}",
default=os.getenv(ENVVAR_SERVER))
subparsers = argparser.add_subparsers(title="actions", dest="action", required=True)
parser_login = subparsers.add_parser("login", help="retrieve an access token")
parser_login.add_argument("login_id", help="username or email", default=os.getenv(ENVVAR_USERNAME))
parser_login.add_argument("--password", default=os.getenv(ENVVAR_PASSWORD))
parser_login.add_argument("--totp", default=os.getenv(ENVVAR_TOTP))
# TODO support multiple channels
# parser_cat = subparsers.add_parser("cat", help="list messages in channel(s)")
# parser_cat.add_argument(
# "channels", nargs="+", help="URL names of team and channel: '<team>/<channel>'")
parser_cat = subparsers.add_parser("cat", help="list messages in channel")
parser_cat.add_argument("channel", help="URL names of team and channel: '<team>/<channel>'")
# ---
parser_cat.add_argument("--after", help="all after post with ID")
parser_cat.add_argument("--since", help="all after timestamp")
parser_cat.add_argument("-f", "--follow", action="store_true", help="keep running, printing new posts as they come in")
parser_send = subparsers.add_parser("send", help="send message(s)")
parser_send.add_argument(
"--channel", help="URL names of team and channel: '<team>/<channel>'; if not provided, "
"messages must be provided on stdin and each must specify channel")
parser_send.add_argument(
"--message", help="message; if not provided, messages will be expected on stdin")
parser_send.add_argument(
"--attach", nargs="+", help="filename of file to attach")
parsed = argparser.parse_args()
if not parsed.server:
argparser.error(
f"server is required; use argument --server or environment variable {ENVVAR_SERVER}")
access_token = os.getenv(ENVVAR_ACCESSTOKEN)
if ACTIONS[parsed.action].get("accesstoken_required", True) and not access_token:
argparser.error(
f"`{prog_name} {parsed.action}` requires access token; get one with `{prog_name} login` "
f"and set environment variable {ENVVAR_ACCESSTOKEN}")
server = parsed.server if re.match(r"^[a-z]+://", parsed.server) else f"https://{parsed.server}"
mm_api = mattermost.MMApi(f"{server}/api")
if access_token:
mm_api._headers.update({"Authorization": f"Bearer {access_token}"})
ACTIONS[parsed.action]["function"](mm_api, parsed)
if __name__ == "__main__":
main()