Skip to content

Commit 83925c6

Browse files
Update airplay sync to work again post-rebase
1 parent 87e83ac commit 83925c6

3 files changed

Lines changed: 87 additions & 45 deletions

File tree

amplipi/streams/airplay.py

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import os
99
import io
1010
import sys
11+
import threading
12+
import json
1113

1214

1315
def write_sp_config_file(filename, config):
@@ -43,7 +45,29 @@ def __init__(self, name: str, ap2: bool, disabled: bool = False, mock: bool = Fa
4345
self._connect_time = 0.0
4446
self._coverart_dir = ''
4547
self._log_file: Optional[io.TextIOBase] = None
46-
self.volume_process: Optional[subprocess.Popen] = None
48+
self.src_config_folder: Optional[str] = None
49+
self.volume_watcher_process: Optional[threading.Thread] = None # Populates the fifo that the vol sync script depends on
50+
self.volume_sync_process: Optional[subprocess.Popen] = None
51+
self._volume_fifo: Optional[str] = None
52+
53+
def watch_vol(self):
54+
"""Creates and supplies a FIFO with volume data for volume sync"""
55+
while True:
56+
try:
57+
if self.src is not None:
58+
if self._volume_fifo is None and self.src_config_folder is not None:
59+
fifo_path = f"{self.src_config_folder}/vol"
60+
if not os.path.isfile(fifo_path):
61+
os.mkfifo(fifo_path)
62+
self._volume_fifo = os.open(fifo_path, os.O_WRONLY, os.O_NONBLOCK)
63+
data = json.dumps({
64+
'zones': self.connected_zones,
65+
'volume': self.volume,
66+
})
67+
os.write(self._volume_fifo, bytearray(f"{data}\r\n", encoding="utf8"))
68+
except Exception as e:
69+
logger.error(f"{self.name} volume thread ran into exception: {e}")
70+
time.sleep(0.1)
4771

4872
def reconfig(self, **kwargs):
4973
self.validate_stream(**kwargs)
@@ -73,9 +97,9 @@ def _activate(self, vsrc: int):
7397
logger.info(f'Another Airplay 2 stream is already in use, unable to start {self.name}, mocking connection')
7498
return
7599

76-
src_config_folder = f'{utils.get_folder("config")}/srcs/v{vsrc}'
100+
self.src_config_folder = f'{utils.get_folder("config")}/srcs/v{vsrc}'
77101
try:
78-
os.remove(f'{src_config_folder}/currentSong')
102+
os.remove(f'{self.src_config_folder}/currentSong')
79103
except FileNotFoundError:
80104
pass
81105
self._connect_time = time.time()
@@ -101,7 +125,8 @@ def _activate(self, vsrc: int):
101125
'alsa': {
102126
'output_device': utils.virtual_output_device(vsrc), # alsa output device
103127
# If set too small, buffer underflow occurs on low-powered machines. Too long and the response times with software mixer become annoying.
104-
'audio_backend_buffer_desired_length': 11025
128+
'audio_backend_buffer_desired_length': 11025,
129+
'software_mixer': 'yes'
105130
},
106131
}
107132

@@ -111,10 +136,10 @@ def _activate(self, vsrc: int):
111136
except FileNotFoundError:
112137
pass
113138
os.makedirs(self._coverart_dir, exist_ok=True)
114-
os.makedirs(src_config_folder, exist_ok=True)
115-
config_file = f'{src_config_folder}/shairport.conf'
139+
os.makedirs(self.src_config_folder, exist_ok=True)
140+
config_file = f'{self.src_config_folder}/shairport.conf'
116141
write_sp_config_file(config_file, config)
117-
self._log_file = open(f'{src_config_folder}/log', mode='w')
142+
self._log_file = open(f'{self.src_config_folder}/log', mode='w')
118143
shairport_args = f"{utils.get_folder('streams')}/shairport-sync{'-ap2' if self.ap2 else ''} -c {config_file}".split(' ')
119144
logger.info(f'shairport_args: {shairport_args}')
120145

@@ -127,12 +152,15 @@ def _activate(self, vsrc: int):
127152
# shairport sync only adds the pid to the mpris name if it cannot use the default name
128153
if len(os.popen("pgrep shairport-sync").read().strip().splitlines()) > 1:
129154
mpris_name += f".i{self.proc.pid}"
130-
self.mpris = MPRIS(mpris_name, f'{src_config_folder}/metadata.txt')
155+
self.mpris = MPRIS(mpris_name, f'{self.src_config_folder}/metadata.txt')
131156

132157
vol_sync = f"{utils.get_folder('streams')}/shairport_volume_handler.py"
133-
vol_args = [sys.executable, vol_sync, mpris_name, str(self.id)]
158+
vol_args = [sys.executable, vol_sync, mpris_name, f"{utils.get_folder('config')}/srcs/v{self.vsrc}"]
159+
134160
logger.info(f'{self.name}: starting vol synchronizer: {vol_args}')
135-
self.volume_process = subprocess.Popen(args=vol_args, stdout=self._log_file, stderr=self._log_file)
161+
self.volume_watcher_process = threading.Thread(target=self.watch_vol, daemon=True)
162+
self.volume_watcher_process.start()
163+
self.volume_sync_process = subprocess.Popen(args=vol_args, stdout=self._log_file, stderr=self._log_file)
136164
except Exception as exc:
137165
logger.exception(f'Error starting airplay MPRIS reader: {exc}')
138166

@@ -142,18 +170,22 @@ def _deactivate(self):
142170
self.mpris = None
143171
if self._is_running():
144172
self.proc.stdin.close()
173+
145174
logger.info('stopping shairport-sync')
146175
self.proc.terminate()
176+
if self.volume_sync_process is not None:
177+
self.volume_sync_process.terminate()
178+
147179
if self.proc.wait(1) != 0:
148180
logger.info('killing shairport-sync')
149181
self.proc.kill()
150182
self.proc.communicate()
151183

152-
if self.volume_process is not None:
153-
self.volume_process.terminate()
154-
if self.volume_process.wait(1) != 0:
155-
logger.info('killing shairport vol sync')
156-
self.volume_process.kill()
184+
if self.volume_sync_process is not None:
185+
if self.volume_sync_process.wait(1) != 0:
186+
logger.info('killing shairport vol sync')
187+
self.volume_sync_process.kill()
188+
157189
if '_log_file' in self.__dir__() and self._log_file:
158190
self._log_file.close()
159191
if self.src:
@@ -162,8 +194,11 @@ def _deactivate(self):
162194
except Exception as e:
163195
logger.exception(f'Error removing airplay config files: {e}')
164196
self._disconnect()
197+
165198
self.proc = None
166-
self.volume_process = None
199+
self.volume_sync_process = None
200+
self.volume_watcher_process = None
201+
self._volume_fifo = None
167202

168203
def info(self) -> models.SourceInfo:
169204
source = models.SourceInfo(
Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,37 @@
11
"""Script for synchronizing AmpliPi and Spotify volumes"""
22
import argparse
3-
import logging
4-
import sys
53
from time import sleep
6-
7-
from volume_synchronizer import VolumeSynchronizer, StreamData
8-
94
from dasbus.connection import SessionMessageBus
5+
from dasbus.typing import Variant
6+
from volume_synchronizer import VolSyncDispatcher, StreamWatcher, VolEvents
107

118

12-
logger = logging.getLogger(__name__)
13-
logger.setLevel(logging.DEBUG)
14-
sh = logging.StreamHandler(sys.stdout)
15-
logger.addHandler(sh)
16-
17-
18-
class ShairportData(StreamData):
9+
class ShairportWatcher(StreamWatcher):
1910
"""A class that watches and tracks changes to airplay-side volume"""
2011

2112
def __init__(self, service_suffix: str):
13+
super().__init__()
2214
self.mpris = SessionMessageBus().get_proxy(
2315
service_name=f"org.mpris.MediaPlayer2.{service_suffix}",
2416
object_path="/org/mpris/MediaPlayer2",
2517
interface_name="org.mpris.MediaPlayer2.Player"
2618
)
27-
super().__init__()
19+
20+
self.dbus = SessionMessageBus().get_proxy(
21+
service_name=f"org.mpris.MediaPlayer2.{service_suffix}",
22+
object_path="/org/mpris/MediaPlayer2",
23+
interface_name="org.freedesktop.DBus.Properties"
24+
)
2825

2926
async def watch_vol(self):
3027
"""Watch the shairport mpris stream for volume changes and update amplipi volume info accordingly"""
3128
while True:
3229
try:
33-
if self.volume is not None and self.volume != self.mpris.Volume:
30+
if self.volume != self.mpris.Volume:
3431
self.logger.debug(f"Airplay volume changed from {self.volume} to {self.mpris.Volume}")
35-
self.callback("stream_volume_changed")
36-
self.volume = float(self.mpris.Volume)
32+
self.volume = float(self.mpris.Volume)
33+
self.schedule_event(VolEvents.CHANGE_AMPLIPI)
34+
# self.delta = self.mpris.Volume - self.volume
3735

3836
except Exception as e:
3937
self.logger.exception(f"Error: {e}")
@@ -54,8 +52,15 @@ def set_vol(self, amplipi_volume: float, vol_set_point: float) -> float: # This
5452
# vol_set_point: if vol_set_point is retained as the set point, any changes to amplipi will reflect for 1-2 seconds at most and then
5553
# bounce back to where it had been, resulting in a glitchy front end interface
5654

57-
# In any future MPRIS based volume synchronisers, you can check if self.mpris.CanControl is true and then potentially directly set self.mpris.Volume
58-
# Apple just disallows externally sourced volume changes due to their walled garden
55+
# In any future MPRIS based volume synchronizers, you can check if self.mpris.CanControl is true and then potentially directly set self.mpris.Volume
56+
# Note that we cannot do this due to this line: <property name='Volume' type='d' access='read'/>
57+
# That exists in the MPRIS config xml at https://github.com/mikebrady/shairport-sync/blob/master/org.mpris.MediaPlayer2.xml
58+
59+
# self.dbus.Set(
60+
# 'org.mpris.MediaPlayer2',
61+
# 'Volume',
62+
# Variant("d", amplipi_volume)
63+
# )
5964

6065
return amplipi_volume
6166
except Exception as e:
@@ -67,10 +72,9 @@ def set_vol(self, amplipi_volume: float, vol_set_point: float) -> float: # This
6772
parser = argparse.ArgumentParser(description="Read metadata from a given URL and write it to a file.")
6873

6974
parser.add_argument("service_suffix", help="Name of mpris instance", type=str)
70-
parser.add_argument("stream_id", help="The stream's amplipi side stream_id", type=int)
75+
parser.add_argument("config_dir", help="The directory of the vsrc config", type=str)
7176
parser.add_argument("--debug", action="store_true", help="Change log level from WARNING to DEBUG")
7277

7378
args = parser.parse_args()
7479

75-
handler = VolumeSynchronizer(ShairportData(service_suffix=args.service_suffix), args.stream_id, args.debug)
76-
handler.watcher_loop()
80+
handler = VolSyncDispatcher(ShairportWatcher(service_suffix=args.service_suffix), args.config_dir, args.debug)

streams/volume_synchronizer.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def __init__(self, config_dir: str, schedule_event: Callable, logger: logging.Lo
7474
"""Event scheduler function provided by VolSyncDispatcher, has limited valid inputs that can be seen in the VolEvents enum"""
7575

7676
self.logger: logging.Logger = logger
77-
self.volume: float = None
77+
self.volume: Optional[float] = None
7878
self.config_dir: str = config_dir
7979

8080
self.connected_zones: List[int] = []
@@ -88,13 +88,16 @@ def get_vol(self):
8888
Read the volume FIFO from .config/amplipi/srcs/v{vsrc}/vol to load the currently connected zones and the averaged volume of them
8989
If the read volume is different than the previous volume, send a volume change event to the stream
9090
"""
91-
with open(f'{self.config_dir}/vol', 'r') as fifo:
92-
while True:
93-
data = json.loads(fifo.readline().strip())
94-
if self.volume != data["volume"]:
95-
self.volume = data["volume"]
96-
self.schedule_event(VolEvents.CHANGE_STREAM)
97-
self.connected_zones = data["zones"]
91+
try:
92+
with open(f'{self.config_dir}/vol', 'r') as fifo:
93+
while True:
94+
data = json.loads(fifo.readline().strip())
95+
if self.volume != data["volume"]:
96+
self.volume = data["volume"]
97+
self.schedule_event(VolEvents.CHANGE_STREAM)
98+
self.connected_zones = data["zones"]
99+
except Exception as e:
100+
self.logger.exception(f"Error while getting writing to {self.config_dir}/vol fifo: {e}")
98101

99102
def set_vol(self, stream_volume: float, vol_set_point: float):
100103
"""Update AmpliPi's volume to match the stream volume"""

0 commit comments

Comments
 (0)