From ff054442746d5a1167b9eda1c39c74a9286a458f Mon Sep 17 00:00:00 2001 From: steveseguin Date: Tue, 19 May 2026 10:25:16 -0400 Subject: [PATCH] Fix multiviewer stale elements on reconnect --- publish.py | 133 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 114 insertions(+), 19 deletions(-) diff --git a/publish.py b/publish.py index 3a82e15..7236ce7 100644 --- a/publish.py +++ b/publish.py @@ -7662,7 +7662,47 @@ def show_recording_status(): traceback.print_exc() - + + def _remove_pipeline_element_by_name(self, name, upstream=None, downstream=None): + """Best-effort removal for stale per-client elements left in the pipeline.""" + if not self.pipe: + return False + + try: + element = self.pipe.get_by_name(name) + except Exception as exc: + printwarn(f"Failed to look up pipeline element {name}: {exc}") + return False + + if element is None: + return False + + if upstream is not None: + try: + upstream.unlink(element) + except Exception: + pass + + if downstream is not None: + try: + element.unlink(downstream) + except Exception: + pass + + try: + element.set_state(Gst.State.NULL) + except Exception as exc: + printwarn(f"Failed to set stale element {name} to NULL: {exc}") + + try: + removed = self.pipe.remove(element) + if not removed: + printwarn(f"Failed to remove stale pipeline element {name}") + return bool(removed) + except Exception as exc: + printwarn(f"Failed to remove stale pipeline element {name}: {exc}") + return False + async def createPeer(self, UUID): if UUID in self.clients: @@ -9138,44 +9178,89 @@ def _classify_loss(loss_percent: Optional[float]) -> Tuple[str, str]: self.setup_ice_servers(client['webrtc']) pass else: - client['webrtc'] = Gst.ElementFactory.make("webrtcbin", client['UUID']) - client['webrtc'].set_property('bundle-policy', 'max-bundle') - self.setup_ice_servers(client['webrtc']) + uuid = client['UUID'] + webrtc_name = uuid + qv_name = f"qv-{uuid}" + qa_name = f"qa-{uuid}" + atee = self.pipe.get_by_name('audiotee') + vtee = self.pipe.get_by_name('videotee') + + stale_webrtc = self.pipe.get_by_name(webrtc_name) + self._remove_pipeline_element_by_name(qv_name, upstream=vtee, downstream=stale_webrtc) + self._remove_pipeline_element_by_name(qa_name, upstream=atee, downstream=stale_webrtc) + self._remove_pipeline_element_by_name(webrtc_name) + + client['webrtc'] = None + client['qv'] = None + client['qa'] = None + + webrtc = Gst.ElementFactory.make("webrtcbin", webrtc_name) + if webrtc is None: + printwarn(f"Failed to create webrtcbin for {uuid}") + return + webrtc.set_property('bundle-policy', 'max-bundle') + self.setup_ice_servers(webrtc) try: # Ensure minimum latency to prevent crashes buffer_ms = max(self.buffer, 10) - client['webrtc'].set_property('latency', buffer_ms) - client['webrtc'].set_property('async-handling', True) + webrtc.set_property('latency', buffer_ms) + webrtc.set_property('async-handling', True) client["_latency_applied"] = buffer_ms except Exception as E: pass - self.pipe.add(client['webrtc']) + if not self.pipe.add(webrtc): + printwarn(f"Failed to add webrtcbin {webrtc_name} to pipeline") + try: + webrtc.set_state(Gst.State.NULL) + except Exception: + pass + return + client['webrtc'] = webrtc if self.view: - self._install_viewer_rtpbin_overrides(client['webrtc']) - - atee = self.pipe.get_by_name('audiotee') - vtee = self.pipe.get_by_name('videotee') + self._install_viewer_rtpbin_overrides(webrtc) if vtee is not None: - qv = Gst.ElementFactory.make('queue', f"qv-{client['UUID']}") - self.pipe.add(qv) + qv = Gst.ElementFactory.make('queue', qv_name) + if qv is None: + printwarn(f"Failed to create video queue {qv_name}") + return + if not self.pipe.add(qv): + printwarn(f"Failed to add video queue {qv_name} to pipeline") + try: + qv.set_state(Gst.State.NULL) + except Exception: + pass + return + client['qv'] = qv if not Gst.Element.link(vtee, qv): + printwarn(f"Failed to link videotee to {qv_name}") return - if not Gst.Element.link(qv, client['webrtc']): + if not Gst.Element.link(qv, webrtc): + printwarn(f"Failed to link {qv_name} to webrtcbin {webrtc_name}") return if qv is not None: qv.sync_state_with_parent() - client['qv'] = qv if atee is not None: - qa = Gst.ElementFactory.make('queue', f"qa-{client['UUID']}") - self.pipe.add(qa) + qa = Gst.ElementFactory.make('queue', qa_name) + if qa is None: + printwarn(f"Failed to create audio queue {qa_name}") + return + if not self.pipe.add(qa): + printwarn(f"Failed to add audio queue {qa_name} to pipeline") + try: + qa.set_state(Gst.State.NULL) + except Exception: + pass + return + client['qa'] = qa if not Gst.Element.link(atee, qa): + printwarn(f"Failed to link audiotee to {qa_name}") return - if not Gst.Element.link(qa, client['webrtc']): + if not Gst.Element.link(qa, webrtc): + printwarn(f"Failed to link {qa_name} to webrtcbin {webrtc_name}") return if qa is not None: qa.sync_state_with_parent() - client['qa'] = qa if self.midi and (self.midi_thread == None): self.midi_thread = threading.Thread(target=midi2vdo, args=(self.midi,), daemon=True) @@ -9659,6 +9744,16 @@ def _stop_pipeline_internal(self, UUID): self.pipe.remove(qv) except Exception as e: printwarn(f"Failed to cleanup video queue: {e}") + + if self.multiviewer and self.pipe: + uuid = client.get('UUID', UUID) + try: + stale_webrtc = self.pipe.get_by_name(uuid) + except Exception: + stale_webrtc = None + self._remove_pipeline_element_by_name(f"qa-{uuid}", upstream=atee, downstream=stale_webrtc) + self._remove_pipeline_element_by_name(f"qv-{uuid}", upstream=vtee, downstream=stale_webrtc) + self._remove_pipeline_element_by_name(uuid) # Always remove from clients dict, even if cleanup failed self.clients.pop(UUID, None)