dotfiles/yambar/sway.py

122 lines
2.8 KiB
Python
Executable file

#!/usr/bin/env python3
import os
import sys
import socket
import struct
import json
import traceback
import time
from typing import List
MSGTYPE_GET_WORKSPACES = 1
MSGTYPE_SUBSCRIBE = 2
MSGTYPE_GET_BINDING_STATE = 12
MSGTYPE_EVENT_WORKSPACE = 0x80000000
MSGTYPE_EVENT_MODE = 0x80000002
magic = b"i3-ipc"
msg_header = struct.Struct("II")
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
SWAYSOCK = os.getenvb(b"SWAYSOCK")
if not SWAYSOCK:
while True:
# print("\n".join(result), flush=True)
print("sway_running|bool|false\n", flush=True)
time.sleep(60 * 60 * 60)
s.connect(SWAYSOCK)
def recv():
reply_magic = s.recv(6)
assert reply_magic == magic
reply_header = s.recv(msg_header.size)
reply_length, reply_type = msg_header.unpack(reply_header)
reply_payload = s.recv(reply_length)
return reply_type, reply_payload
def msg(msg_type: int, payload: bytes):
global s
message = (
magic +
msg_header.pack(len(payload), msg_type) +
payload
)
n_sent_bytes = s.send(message)
assert n_sent_bytes == len(message)
def subscribe(types: List[str]):
msg(MSGTYPE_SUBSCRIBE, json.dumps(types).encode("utf-8"))
reply = recv()
success = json.loads(reply[1])["success"]
if not success:
raise Exception("Could not subscribe")
def format_workspaces(workspaces):
result = []
last_output = None
i = 0
for i, ws in enumerate(sorted(
workspaces,
key=lambda x: ("" if x["output"] == "eDP-1" else x["output"], x["num"], x["name"])
)):
name = ws["name"]
new_monitor = last_output is not None and last_output != ws["output"]
last_output = ws["output"]
state = (
"focused" if ws["focused"] else
"visible" if ws["visible"] else
""
)
result.append(f"ws{i}|string|{ws['name']}")
result.append(f"ws{i}_state|string|{state}")
result.append(f"ws{i}_new_monitor|bool|{'true' if new_monitor else 'false'}")
while i <= 11:
result.append(f"ws{i}|string|")
result.append(f"ws{i}_state|string|unused")
result.append(f"ws{i}_new_monitor|bool|false")
i += 1
return "\n".join(result)
msg(MSGTYPE_GET_BINDING_STATE, b'')
state_mode = json.loads(recv()[1])["name"]
msg(MSGTYPE_GET_WORKSPACES, b'')
state_workspaces = json.loads(recv()[1])
def print_state():
global state_workspaces
global state_mode
print(f"""{format_workspaces(state_workspaces)}
mode|string|{state_mode}
sway_running|bool|true
""", flush=True)
print_state()
subscribe(["workspace", "mode"])
while True:
try:
item_type, item = recv()
if item_type == MSGTYPE_GET_WORKSPACES:
state_workspaces = json.loads(item)
print_state()
elif item_type == MSGTYPE_EVENT_WORKSPACE:
msg(MSGTYPE_GET_WORKSPACES, b'')
elif item_type == MSGTYPE_EVENT_MODE:
state_mode = json.loads(item)["change"]
print_state()
else:
print(f"Unhandled message of type 0x{item_type:x}", file=sys.stderr, flush=True)
except:
traceback.print_exc()