182 lines
5.4 KiB
Python
Executable file
182 lines
5.4 KiB
Python
Executable file
#!/usr/bin/env python3
|
||
|
||
import os
|
||
import sys
|
||
import re
|
||
import subprocess
|
||
import datetime
|
||
import time
|
||
import logging
|
||
import mpd
|
||
from ipo import ipo, opi, p
|
||
|
||
# pylint: disable=no-member
|
||
|
||
NAME = "Mattermost MPD now playing status"
|
||
VERSION = "1.0.1"
|
||
|
||
### Config ########
|
||
EMOJI_PAUSED = "pause_button"
|
||
EMOJI_PLAYING = "musical_note"
|
||
###################
|
||
|
||
MPD_HOST = os.getenv("MPD_HOST", "localhost") # Host (for TCP) or socket path
|
||
MPD_PORT = os.getenv("MPD_PORT", "6600") # Ignored when not using TCP
|
||
|
||
LOGGER = logging.getLogger("mmmpd")
|
||
|
||
MPD_STATE_PLAY = "play"
|
||
MPD_STATE_PAUSE = "pause"
|
||
|
||
|
||
# Mattermost stuff {{{1
|
||
# ----------------
|
||
|
||
def set_status(emoji, text, expires_datetime):
|
||
LOGGER.info("Custom status expiring %s: :%s: %s", expires_datetime or "never", emoji, text)
|
||
subprocess.run([
|
||
"mmcli", "customstatus",
|
||
"--until", expires_datetime.isoformat() if expires_datetime else "",
|
||
"--emoji", emoji,
|
||
"--", text
|
||
], check=True)
|
||
|
||
|
||
def clear_status():
|
||
LOGGER.info("Clearing status")
|
||
subprocess.run(["mmcli", "customstatus"], check=True)
|
||
|
||
|
||
# MPD stuff {{{1
|
||
# ---------
|
||
|
||
def song_string(song_info):
|
||
if song_info.get("title") is None:
|
||
artist = song_info.get("artist") or ""
|
||
title = re.sub(
|
||
r"\.[a-z0-9]+$",
|
||
"",
|
||
os.path.basename(song_info.get("file"))
|
||
) or "Unknown song"
|
||
else:
|
||
artist = song_info.get("artist") or "Unknown artist"
|
||
title = song_info.get("title") or "Unknown song"
|
||
string = f"{artist}{' – ' if artist else ''}{title}"
|
||
if len(string) > 102:
|
||
string = (ipo(string) |
|
||
p(re.sub)(r"\bOrchestr[ea]\b", r"Orch.") |
|
||
p(re.sub)(r"\bFestival\b", r"Fest.") |
|
||
p(re.sub)(r"\bSymphon(y|ic)\b", r"Symph.") |
|
||
p(re.sub)(r"\bHarmon(y|ic)\b", r"Harm.") |
|
||
p(re.sub)(r"\bPhilharmon(y|ic)\b", r"Phil.") |
|
||
p(re.sub)(r"\ballegro\b", r"all.", flags=re.IGNORECASE) |
|
||
p(re.sub)(r"\bandante\b", r"and.", flags=re.IGNORECASE) |
|
||
p(re.sub)(r"\badagio\b", r"adg.", flags=re.IGNORECASE) |
|
||
p(re.sub)(r"\bma non troppo\b", r"m.n.t.", flags=re.IGNORECASE) |
|
||
p(re.sub)(r"\bviolin\b", r"vln.", flags=re.IGNORECASE) |
|
||
p(re.sub)(r"\bpiano\b", r"pno.", flags=re.IGNORECASE) |
|
||
p(re.sub)(r"\bmolto\b", r"mlt.", flags=re.IGNORECASE) |
|
||
p(re.sub)(r"\bespressivo\b", r"essprs.", flags=re.IGNORECASE) |
|
||
p(re.sub)(r"\bsostenuto\b", r"sost.", flags=re.IGNORECASE) |
|
||
p(re.sub)(r"\b[Nn]o(?:\. ?| )([0-9])", r"№\1") |
|
||
p(re.sub)(r"in ([A-Za-z]) ?sharp", r"in \1♯") |
|
||
p(re.sub)(r"in ([A-Za-z]) ?flat", r"in \1♭") |
|
||
p(re.sub)(r"in ([A-Z]([#b♭♯])?) ?major", lambda x: f"in {x[1].upper()}", flags=re.IGNORECASE) |
|
||
p(re.sub)(r"in ([A-Z]([#b♭♯])?) ?minor", lambda x: f"in {x[1].lower()}", flags=re.IGNORECASE) |
|
||
p(re.sub)(r"\band\b", r"&") |
|
||
opi)
|
||
return string
|
||
|
||
|
||
def formatted_status(mpd_client):
|
||
status = mpd_client.status()
|
||
state = status.get("state")
|
||
LOGGER.debug("Player state: %r", state)
|
||
if state not in (MPD_STATE_PLAY, MPD_STATE_PAUSE):
|
||
return None
|
||
|
||
song = mpd_client.currentsong()
|
||
song_str = song_string(song)
|
||
emoji = EMOJI_PAUSED if state == MPD_STATE_PAUSE else EMOJI_PLAYING
|
||
|
||
expire = None
|
||
elapsed = status.get("elapsed")
|
||
duration = status.get("duration")
|
||
if state == MPD_STATE_PLAY and elapsed is not None and duration:
|
||
now = datetime.datetime.now().astimezone()
|
||
try:
|
||
# 1 second extra to allow some time to set the new song without flickering status
|
||
expire = now + datetime.timedelta(seconds=1 + float(duration) - float(elapsed))
|
||
except ValueError as e:
|
||
LOGGER.error("Could not calculate expiry time", exc_info=e)
|
||
|
||
return (emoji, song_str, expire)
|
||
|
||
|
||
def host_and_pass_from_MPD_HOST(mpd_host_string):
|
||
"""
|
||
mpc accepts passwords by setting MPD_HOST to pass@host. For compatibility, we do the same.
|
||
"""
|
||
m = re.match(r"""
|
||
(?: # Optional password, followed by @, but cannot
|
||
(?P<pass> [^@]+?) @ # start with @ (that's for abstract sockets on Linux).
|
||
)? # If present, after the password there'll be an @.
|
||
(?P<host> .*)
|
||
""", mpd_host_string, re.VERBOSE)
|
||
if not m:
|
||
return None
|
||
return (m.group("host"), m.group("pass") or None)
|
||
|
||
|
||
def create_mpd_client(mpd_host, mpd_port, mpd_pass):
|
||
mpd_client = mpd.MPDClient()
|
||
mpd_client.connect(mpd_host, port=mpd_port)
|
||
if mpd_pass:
|
||
mpd_client.password(mpd_pass)
|
||
LOGGER.info("Connected")
|
||
return mpd_client
|
||
|
||
|
||
# Driving stuff {{{1
|
||
# -------------
|
||
|
||
def set_status_from_mpd(mpd_client):
|
||
status = formatted_status(mpd_client)
|
||
if status is None:
|
||
clear_status()
|
||
else:
|
||
emoji, song_str, expire = status
|
||
set_status(emoji, song_str, expire)
|
||
|
||
|
||
def loop(mpd_client, on_status_change):
|
||
while True:
|
||
mpd_client.idle("player")
|
||
LOGGER.debug("Got event from MPD")
|
||
on_status_change(mpd_client)
|
||
time.sleep(1)
|
||
|
||
|
||
def main(mpd_host_string, mpd_port):
|
||
if "-v" in sys.argv[1:]:
|
||
logging.basicConfig(level=logging.INFO)
|
||
LOGGER.info("Log level INFO")
|
||
elif "-vv" in sys.argv[1:]:
|
||
logging.basicConfig(level=logging.DEBUG)
|
||
LOGGER.info("Log level DEBUG")
|
||
|
||
mpd_host, mpd_pass = host_and_pass_from_MPD_HOST(mpd_host_string)
|
||
mpd_client = create_mpd_client(mpd_host, mpd_port, mpd_pass)
|
||
|
||
set_status_from_mpd(mpd_client)
|
||
try:
|
||
loop(mpd_client, set_status_from_mpd)
|
||
except KeyboardInterrupt:
|
||
pass
|
||
finally:
|
||
clear_status()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main(MPD_HOST, MPD_PORT)
|