-
Notifications
You must be signed in to change notification settings - Fork 0
/
librespot_mpris_proxy.py
executable file
·149 lines (111 loc) · 4.27 KB
/
librespot_mpris_proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
import asyncio
import logging
import os
import re
import stat
from dbus_next import BusType, PropertyAccess
from dbus_next.aio import MessageBus
from dbus_next.service import ServiceInterface, dbus_property
_LOGGER = logging.getLogger("librespot-mpris-proxy")
class MediaPlayer2Interface(ServiceInterface):
"""MPRIS MediaPlayer2 Tnterface"""
def __init__(self):
super().__init__("org.mpris.MediaPlayer2")
# Indicate we can't be controlled in any way
@dbus_property(name="CanQuit", access=PropertyAccess.READ)
def can_quit(self) -> "b":
return False
@dbus_property(name="CanSetFullscreen", access=PropertyAccess.READ)
def can_set_fullscreen(self) -> "b":
return False
@dbus_property(name="CanRaise", access=PropertyAccess.READ)
def can_raise(self) -> "b":
return False
@dbus_property(name="HasTrackList", access=PropertyAccess.READ)
def has_track_list(self) -> "b":
return False
@dbus_property(name="Identity", access=PropertyAccess.READ)
def identity(self) -> "s":
return "MPRIS-Proxy"
class MediaPlayer2PlayerInterface(ServiceInterface):
"""MPRIS MediaPlayer2 Player Tnterface"""
def __init__(self):
super().__init__("org.mpris.MediaPlayer2.Player")
self._playback_status = "Stopped"
# Supported properties
@dbus_property(name="PlaybackStatus", access=PropertyAccess.READ)
def playback_status(self) -> "s":
return self._playback_status
def set_playback_status(self, val) -> None:
if self._playback_status != val:
self._playback_status = val
self.emit_properties_changed(
{"PlaybackStatus": self._playback_status})
# Indicate we can't be controlled in any way
@dbus_property(name="CanControl", access=PropertyAccess.READ)
def can_control(self) -> "b":
return False
@dbus_property(name="CanGoNext", access=PropertyAccess.READ)
def can_go_next(self) -> "b":
return False
@dbus_property(name="CanGoPrevious", access=PropertyAccess.READ)
def can_go_previous(self) -> "b":
return False
@dbus_property(name="CanPlay", access=PropertyAccess.READ)
def can_play(self) -> "b":
return False
@dbus_property(name="CanPause", access=PropertyAccess.READ)
def can_pause(self) -> "b":
return False
@dbus_property(name="CanSeek", access=PropertyAccess.READ)
def can_seek(self) -> "b":
return False
async def _run():
logging.basicConfig(
format='%(levelname)s: %(message)s', level=logging.INFO)
# Connect to the system bus
_LOGGER.info("Connecting to system bus.")
bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
# Construct and export our interfaces
_LOGGER.info("Exporting MediaPlayer2 interface.")
mediaplayer2 = MediaPlayer2Interface()
bus.export("/org/mpris/MediaPlayer2", mediaplayer2)
_LOGGER.info("Exporting MediaPlayer2.Player interface.")
player = MediaPlayer2PlayerInterface()
bus.export("/org/mpris/MediaPlayer2", player)
# Acquire our friendly name
# TODO generate this on demand?
name = f"org.mpris.MediaPlayer2.librespot_proxy.pid{os.getpid()}"
_LOGGER.info(f"Requesting friendly name '{name}' on bus.")
await bus.request_name(name)
# Create the named pipe
fifo = "/var/run/librespot-mpris-proxy"
try:
os.remove(fifo)
except FileNotFoundError:
pass
os.mkfifo(fifo)
os.chmod(fifo, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP |
stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH)
while True:
# Blocking IO should be ok since we don't need to interact
# with the DBus until we get an update
with open(fifo, mode="r") as f:
contents = f.read()
matches = re.match("Playback Status: (\w+)", contents)
if matches is None:
continue
status = matches.group(1)
_LOGGER.info(f"Set Playback Status: {status}")
player.set_playback_status(status)
# Sleep to allow async tasks to run before blocking again
# TODO might want to re-eval aiofiles
await asyncio.sleep(1)
def main():
try:
asyncio.run(_run())
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()