mmmpd/mmmpd

182 lines
5.4 KiB
Python
Executable file
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import sys
import re
import subprocess
import datetime
import time
import logging
import mpd
from ipo import ipo, opi, p
# pylint: disable=no-member
NAME = "Mattermost MPD now playing status"
VERSION = "1.0.1"
### Config ########
EMOJI_PAUSED = "pause_button"
EMOJI_PLAYING = "musical_note"
###################
MPD_HOST = os.getenv("MPD_HOST", "localhost") # Host (for TCP) or socket path
MPD_PORT = os.getenv("MPD_PORT", "6600") # Ignored when not using TCP
LOGGER = logging.getLogger("mmmpd")
MPD_STATE_PLAY = "play"
MPD_STATE_PAUSE = "pause"
# Mattermost stuff {{{1
# ----------------
def set_status(emoji, text, expires_datetime):
LOGGER.info("Custom status expiring %s: :%s: %s", expires_datetime or "never", emoji, text)
subprocess.run([
"mmcli", "customstatus",
"--until", expires_datetime.isoformat() if expires_datetime else "",
"--emoji", emoji,
"--", text
], check=True)
def clear_status():
LOGGER.info("Clearing status")
subprocess.run(["mmcli", "customstatus"], check=True)
# MPD stuff {{{1
# ---------
def song_string(song_info):
if song_info.get("title") is None:
artist = song_info.get("artist") or ""
title = re.sub(
r"\.[a-z0-9]+$",
"",
os.path.basename(song_info.get("file"))
) or "Unknown song"
else:
artist = song_info.get("artist") or "Unknown artist"
title = song_info.get("title") or "Unknown song"
string = f"{artist}{' ' if artist else ''}{title}"
if len(string) > 102:
string = (ipo(string) |
p(re.sub)(r"\bOrchestr[ea]\b", r"Orch.") |
p(re.sub)(r"\bFestival\b", r"Fest.") |
p(re.sub)(r"\bSymphon(y|ic)\b", r"Symph.") |
p(re.sub)(r"\bHarmon(y|ic)\b", r"Harm.") |
p(re.sub)(r"\bPhilharmon(y|ic)\b", r"Phil.") |
p(re.sub)(r"\ballegro\b", r"all.", flags=re.IGNORECASE) |
p(re.sub)(r"\bandante\b", r"and.", flags=re.IGNORECASE) |
p(re.sub)(r"\badagio\b", r"adg.", flags=re.IGNORECASE) |
p(re.sub)(r"\bma non troppo\b", r"m.n.t.", flags=re.IGNORECASE) |
p(re.sub)(r"\bviolin\b", r"vln.", flags=re.IGNORECASE) |
p(re.sub)(r"\bpiano\b", r"pno.", flags=re.IGNORECASE) |
p(re.sub)(r"\bmolto\b", r"mlt.", flags=re.IGNORECASE) |
p(re.sub)(r"\bespressivo\b", r"essprs.", flags=re.IGNORECASE) |
p(re.sub)(r"\bsostenuto\b", r"sost.", flags=re.IGNORECASE) |
p(re.sub)(r"\b[Nn]o(?:\. ?| )([0-9])", r"\1") |
p(re.sub)(r"in ([A-Za-z]) ?sharp", r"in \1♯") |
p(re.sub)(r"in ([A-Za-z]) ?flat", r"in \1♭") |
p(re.sub)(r"in ([A-Z]([#b♭♯])?) ?major", lambda x: f"in {x[1].upper()}", flags=re.IGNORECASE) |
p(re.sub)(r"in ([A-Z]([#b♭♯])?) ?minor", lambda x: f"in {x[1].lower()}", flags=re.IGNORECASE) |
p(re.sub)(r"\band\b", r"&") |
opi)
return string
def formatted_status(mpd_client):
status = mpd_client.status()
state = status.get("state")
LOGGER.debug("Player state: %r", state)
if state not in (MPD_STATE_PLAY, MPD_STATE_PAUSE):
return None
song = mpd_client.currentsong()
song_str = song_string(song)
emoji = EMOJI_PAUSED if state == MPD_STATE_PAUSE else EMOJI_PLAYING
expire = None
elapsed = status.get("elapsed")
duration = status.get("duration")
if state == MPD_STATE_PLAY and elapsed is not None and duration:
now = datetime.datetime.now().astimezone()
try:
# 1 second extra to allow some time to set the new song without flickering status
expire = now + datetime.timedelta(seconds=1 + float(duration) - float(elapsed))
except ValueError as e:
LOGGER.error("Could not calculate expiry time", exc_info=e)
return (emoji, song_str, expire)
def host_and_pass_from_MPD_HOST(mpd_host_string):
"""
mpc accepts passwords by setting MPD_HOST to pass@host. For compatibility, we do the same.
"""
m = re.match(r"""
(?: # Optional password, followed by @, but cannot
(?P<pass> [^@]+?) @ # start with @ (that's for abstract sockets on Linux).
)? # If present, after the password there'll be an @.
(?P<host> .*)
""", mpd_host_string, re.VERBOSE)
if not m:
return None
return (m.group("host"), m.group("pass") or None)
def create_mpd_client(mpd_host, mpd_port, mpd_pass):
mpd_client = mpd.MPDClient()
mpd_client.connect(mpd_host, port=mpd_port)
if mpd_pass:
mpd_client.password(mpd_pass)
LOGGER.info("Connected")
return mpd_client
# Driving stuff {{{1
# -------------
def set_status_from_mpd(mpd_client):
status = formatted_status(mpd_client)
if status is None:
clear_status()
else:
emoji, song_str, expire = status
set_status(emoji, song_str, expire)
def loop(mpd_client, on_status_change):
while True:
mpd_client.idle("player")
LOGGER.debug("Got event from MPD")
on_status_change(mpd_client)
time.sleep(1)
def main(mpd_host_string, mpd_port):
if "-v" in sys.argv[1:]:
logging.basicConfig(level=logging.INFO)
LOGGER.info("Log level INFO")
elif "-vv" in sys.argv[1:]:
logging.basicConfig(level=logging.DEBUG)
LOGGER.info("Log level DEBUG")
mpd_host, mpd_pass = host_and_pass_from_MPD_HOST(mpd_host_string)
mpd_client = create_mpd_client(mpd_host, mpd_port, mpd_pass)
set_status_from_mpd(mpd_client)
try:
loop(mpd_client, set_status_from_mpd)
except KeyboardInterrupt:
pass
finally:
clear_status()
if __name__ == "__main__":
main(MPD_HOST, MPD_PORT)