112 lines
2.6 KiB
Python
112 lines
2.6 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
import os
|
||
|
import sys
|
||
|
import socket
|
||
|
import struct
|
||
|
import json
|
||
|
import traceback
|
||
|
from time import sleep
|
||
|
from typing import List
|
||
|
|
||
|
|
||
|
MSGTYPE_GET_WORKSPACES = 1
|
||
|
MSGTYPE_SUBSCRIBE = 2
|
||
|
MSGTYPE_GET_BINDING_STATE = 12
|
||
|
MSGTYPE_EVENT_WORKSPACE = 0x80000000
|
||
|
MSGTYPE_EVENT_MODE = 0x80000002
|
||
|
|
||
|
magic = b"i3-ipc"
|
||
|
msg_header = struct.Struct("II")
|
||
|
|
||
|
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||
|
s.connect(os.getenvb(b"SWAYSOCK"))
|
||
|
|
||
|
def recv():
|
||
|
reply_magic = s.recv(6)
|
||
|
assert reply_magic == magic
|
||
|
reply_header = s.recv(msg_header.size)
|
||
|
reply_length, reply_type = msg_header.unpack(reply_header)
|
||
|
reply_payload = s.recv(reply_length)
|
||
|
return reply_type, reply_payload
|
||
|
|
||
|
def msg(msg_type: int, payload: bytes):
|
||
|
global s
|
||
|
message = (
|
||
|
magic +
|
||
|
msg_header.pack(len(payload), msg_type) +
|
||
|
payload
|
||
|
)
|
||
|
n_sent_bytes = s.send(message)
|
||
|
assert n_sent_bytes == len(message)
|
||
|
|
||
|
def subscribe(types: List[str]):
|
||
|
msg(MSGTYPE_SUBSCRIBE, json.dumps(types).encode("utf-8"))
|
||
|
reply = recv()
|
||
|
success = json.loads(reply[1])["success"]
|
||
|
if not success:
|
||
|
raise Exception("Could not subscribe")
|
||
|
|
||
|
|
||
|
def format_workspaces(workspaces):
|
||
|
result = []
|
||
|
last_output = None
|
||
|
for i, ws in enumerate(sorted(workspaces, key=lambda x: (x["output"], x["num"], x["name"]))):
|
||
|
name = ws["name"]
|
||
|
|
||
|
new_monitor = last_output is not None and last_output != ws["output"]
|
||
|
last_output = ws["output"]
|
||
|
|
||
|
state = (
|
||
|
"focused" if ws["focused"] else
|
||
|
"visible" if ws["visible"] else
|
||
|
""
|
||
|
)
|
||
|
|
||
|
result.append(f"ws{i}|string|{ws['name']}")
|
||
|
result.append(f"ws{i}_state|string|{state}")
|
||
|
result.append(f"ws{i}_new_monitor|bool|{'true' if new_monitor else 'false'}")
|
||
|
|
||
|
while i <= 11:
|
||
|
result.append(f"ws{i}|string|")
|
||
|
result.append(f"ws{i}_state|string|unused")
|
||
|
result.append(f"ws{i}_new_monitor|bool|false")
|
||
|
i += 1
|
||
|
|
||
|
return "\n".join(result)
|
||
|
|
||
|
|
||
|
msg(MSGTYPE_GET_BINDING_STATE, b'')
|
||
|
state_mode = json.loads(recv()[1])["name"]
|
||
|
msg(MSGTYPE_GET_WORKSPACES, b'')
|
||
|
state_workspaces = json.loads(recv()[1])
|
||
|
|
||
|
|
||
|
def print_state():
|
||
|
global state_workspaces
|
||
|
global state_mode
|
||
|
print(f"""{format_workspaces(state_workspaces)}
|
||
|
mode|string|{state_mode}
|
||
|
""", flush=True)
|
||
|
sleep(0.01)
|
||
|
|
||
|
print_state()
|
||
|
|
||
|
subscribe(["workspace", "mode"])
|
||
|
while True:
|
||
|
try:
|
||
|
item_type, item = recv()
|
||
|
|
||
|
if item_type == MSGTYPE_GET_WORKSPACES:
|
||
|
state_workspaces = json.loads(item)
|
||
|
print_state()
|
||
|
elif item_type == MSGTYPE_EVENT_WORKSPACE:
|
||
|
msg(MSGTYPE_GET_WORKSPACES, b'')
|
||
|
elif item_type == MSGTYPE_EVENT_MODE:
|
||
|
state_mode = json.loads(item)["change"]
|
||
|
print_state()
|
||
|
else:
|
||
|
print(f"Unhandled message of type 0x{item_type:x}", file=sys.stderr, flush=True)
|
||
|
except:
|
||
|
traceback.print_exc()
|