#!/usr/bin/env python3 import os import sys import re import subprocess import datetime import time import logging import mpd from ipo import ipo, opi, p # pylint: disable=no-member NAME = "Mattermost MPD now playing status" VERSION = "1.0.1" ### Config ######## EMOJI_PAUSED = "pause_button" EMOJI_PLAYING = "musical_note" ################### MPD_HOST = os.getenv("MPD_HOST", "localhost") # Host (for TCP) or socket path MPD_PORT = os.getenv("MPD_PORT", "6600") # Ignored when not using TCP LOGGER = logging.getLogger("mmmpd") MPD_STATE_PLAY = "play" MPD_STATE_PAUSE = "pause" # Mattermost stuff {{{1 # ---------------- def set_status(emoji, text, expires_datetime): LOGGER.info("Custom status expiring %s: :%s: %s", expires_datetime or "never", emoji, text) subprocess.run([ "mmcli", "customstatus", "--until", expires_datetime.isoformat() if expires_datetime else "", "--emoji", emoji, "--", text ], check=True) def clear_status(): LOGGER.info("Clearing status") subprocess.run(["mmcli", "customstatus"], check=True) # MPD stuff {{{1 # --------- def song_string(song_info): if song_info.get("title") is None: artist = song_info.get("artist") or "" title = re.sub( r"\.[a-z0-9]+$", "", os.path.basename(song_info.get("file")) ) or "Unknown song" else: artist = song_info.get("artist") or "Unknown artist" title = song_info.get("title") or "Unknown song" string = f"{artist}{' – ' if artist else ''}{title}" if len(string) > 102: string = (ipo(string) | p(re.sub)(r"\bOrchestr[ea]\b", r"Orch.") | p(re.sub)(r"\bFestival\b", r"Fest.") | p(re.sub)(r"\bSymphon(y|ic)\b", r"Symph.") | p(re.sub)(r"\bHarmon(y|ic)\b", r"Harm.") | p(re.sub)(r"\bPhilharmon(y|ic)\b", r"Phil.") | p(re.sub)(r"\ballegro\b", r"all.", flags=re.IGNORECASE) | p(re.sub)(r"\bandante\b", r"and.", flags=re.IGNORECASE) | p(re.sub)(r"\badagio\b", r"adg.", flags=re.IGNORECASE) | p(re.sub)(r"\bma non troppo\b", r"m.n.t.", flags=re.IGNORECASE) | p(re.sub)(r"\bviolin\b", r"vln.", flags=re.IGNORECASE) | p(re.sub)(r"\bpiano\b", r"pno.", flags=re.IGNORECASE) | p(re.sub)(r"\bmolto\b", r"mlt.", flags=re.IGNORECASE) | p(re.sub)(r"\bespressivo\b", r"essprs.", flags=re.IGNORECASE) | p(re.sub)(r"\bsostenuto\b", r"sost.", flags=re.IGNORECASE) | p(re.sub)(r"\b[Nn]o(?:\. ?| )([0-9])", r"№\1") | p(re.sub)(r"in ([A-Za-z]) ?sharp", r"in \1♯") | p(re.sub)(r"in ([A-Za-z]) ?flat", r"in \1♭") | p(re.sub)(r"in ([A-Z]([#b♭♯])?) ?major", lambda x: f"in {x[1].upper()}", flags=re.IGNORECASE) | p(re.sub)(r"in ([A-Z]([#b♭♯])?) ?minor", lambda x: f"in {x[1].lower()}", flags=re.IGNORECASE) | p(re.sub)(r"\band\b", r"&") | opi) return string def formatted_status(mpd_client): status = mpd_client.status() state = status.get("state") LOGGER.debug("Player state: %r", state) if state not in (MPD_STATE_PLAY, MPD_STATE_PAUSE): return None song = mpd_client.currentsong() song_str = song_string(song) emoji = EMOJI_PAUSED if state == MPD_STATE_PAUSE else EMOJI_PLAYING expire = None elapsed = status.get("elapsed") duration = status.get("duration") if state == MPD_STATE_PLAY and elapsed is not None and duration: now = datetime.datetime.now().astimezone() try: # 1 second extra to allow some time to set the new song without flickering status expire = now + datetime.timedelta(seconds=1 + float(duration) - float(elapsed)) except ValueError as e: LOGGER.error("Could not calculate expiry time", exc_info=e) return (emoji, song_str, expire) def host_and_pass_from_MPD_HOST(mpd_host_string): """ mpc accepts passwords by setting MPD_HOST to pass@host. For compatibility, we do the same. """ m = re.match(r""" (?: # Optional password, followed by @, but cannot (?P [^@]+?) @ # start with @ (that's for abstract sockets on Linux). )? # If present, after the password there'll be an @. (?P .*) """, mpd_host_string, re.VERBOSE) if not m: return None return (m.group("host"), m.group("pass") or None) def create_mpd_client(mpd_host, mpd_port, mpd_pass): mpd_client = mpd.MPDClient() mpd_client.connect(mpd_host, port=mpd_port) if mpd_pass: mpd_client.password(mpd_pass) LOGGER.info("Connected") return mpd_client # Driving stuff {{{1 # ------------- def set_status_from_mpd(mpd_client): status = formatted_status(mpd_client) if status is None: clear_status() else: emoji, song_str, expire = status set_status(emoji, song_str, expire) def loop(mpd_client, on_status_change): while True: mpd_client.idle("player") LOGGER.debug("Got event from MPD") on_status_change(mpd_client) time.sleep(1) def main(mpd_host_string, mpd_port): if "-v" in sys.argv[1:]: logging.basicConfig(level=logging.INFO) LOGGER.info("Log level INFO") elif "-vv" in sys.argv[1:]: logging.basicConfig(level=logging.DEBUG) LOGGER.info("Log level DEBUG") mpd_host, mpd_pass = host_and_pass_from_MPD_HOST(mpd_host_string) mpd_client = create_mpd_client(mpd_host, mpd_port, mpd_pass) set_status_from_mpd(mpd_client) try: loop(mpd_client, set_status_from_mpd) except KeyboardInterrupt: pass finally: clear_status() if __name__ == "__main__": main(MPD_HOST, MPD_PORT)