Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 114 additions & 19 deletions publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -7662,7 +7662,47 @@ def show_recording_status():
traceback.print_exc()




def _remove_pipeline_element_by_name(self, name, upstream=None, downstream=None):
"""Best-effort removal for stale per-client elements left in the pipeline."""
if not self.pipe:
return False

try:
element = self.pipe.get_by_name(name)
except Exception as exc:
printwarn(f"Failed to look up pipeline element {name}: {exc}")
return False

if element is None:
return False

if upstream is not None:
try:
upstream.unlink(element)
except Exception:
pass

if downstream is not None:
try:
element.unlink(downstream)
except Exception:
pass

try:
element.set_state(Gst.State.NULL)
except Exception as exc:
printwarn(f"Failed to set stale element {name} to NULL: {exc}")

try:
removed = self.pipe.remove(element)
if not removed:
printwarn(f"Failed to remove stale pipeline element {name}")
return bool(removed)
except Exception as exc:
printwarn(f"Failed to remove stale pipeline element {name}: {exc}")
return False

async def createPeer(self, UUID):

if UUID in self.clients:
Expand Down Expand Up @@ -9138,44 +9178,89 @@ def _classify_loss(loss_percent: Optional[float]) -> Tuple[str, str]:
self.setup_ice_servers(client['webrtc'])
pass
else:
client['webrtc'] = Gst.ElementFactory.make("webrtcbin", client['UUID'])
client['webrtc'].set_property('bundle-policy', 'max-bundle')
self.setup_ice_servers(client['webrtc'])
uuid = client['UUID']
webrtc_name = uuid
qv_name = f"qv-{uuid}"
qa_name = f"qa-{uuid}"
atee = self.pipe.get_by_name('audiotee')
vtee = self.pipe.get_by_name('videotee')

stale_webrtc = self.pipe.get_by_name(webrtc_name)
self._remove_pipeline_element_by_name(qv_name, upstream=vtee, downstream=stale_webrtc)
self._remove_pipeline_element_by_name(qa_name, upstream=atee, downstream=stale_webrtc)
self._remove_pipeline_element_by_name(webrtc_name)

client['webrtc'] = None
client['qv'] = None
client['qa'] = None

webrtc = Gst.ElementFactory.make("webrtcbin", webrtc_name)
if webrtc is None:
printwarn(f"Failed to create webrtcbin for {uuid}")
return
webrtc.set_property('bundle-policy', 'max-bundle')
self.setup_ice_servers(webrtc)

try:
# Ensure minimum latency to prevent crashes
buffer_ms = max(self.buffer, 10)
client['webrtc'].set_property('latency', buffer_ms)
client['webrtc'].set_property('async-handling', True)
webrtc.set_property('latency', buffer_ms)
webrtc.set_property('async-handling', True)
client["_latency_applied"] = buffer_ms
except Exception as E:
pass
self.pipe.add(client['webrtc'])
if not self.pipe.add(webrtc):
printwarn(f"Failed to add webrtcbin {webrtc_name} to pipeline")
try:
webrtc.set_state(Gst.State.NULL)
except Exception:
pass
return
client['webrtc'] = webrtc
if self.view:
self._install_viewer_rtpbin_overrides(client['webrtc'])

atee = self.pipe.get_by_name('audiotee')
vtee = self.pipe.get_by_name('videotee')
self._install_viewer_rtpbin_overrides(webrtc)

if vtee is not None:
qv = Gst.ElementFactory.make('queue', f"qv-{client['UUID']}")
self.pipe.add(qv)
qv = Gst.ElementFactory.make('queue', qv_name)
if qv is None:
printwarn(f"Failed to create video queue {qv_name}")
return
if not self.pipe.add(qv):
printwarn(f"Failed to add video queue {qv_name} to pipeline")
try:
qv.set_state(Gst.State.NULL)
except Exception:
pass
return
client['qv'] = qv
if not Gst.Element.link(vtee, qv):
printwarn(f"Failed to link videotee to {qv_name}")
return
if not Gst.Element.link(qv, client['webrtc']):
if not Gst.Element.link(qv, webrtc):
printwarn(f"Failed to link {qv_name} to webrtcbin {webrtc_name}")
return
if qv is not None: qv.sync_state_with_parent()
client['qv'] = qv

if atee is not None:
qa = Gst.ElementFactory.make('queue', f"qa-{client['UUID']}")
self.pipe.add(qa)
qa = Gst.ElementFactory.make('queue', qa_name)
if qa is None:
printwarn(f"Failed to create audio queue {qa_name}")
return
if not self.pipe.add(qa):
printwarn(f"Failed to add audio queue {qa_name} to pipeline")
try:
qa.set_state(Gst.State.NULL)
except Exception:
pass
return
client['qa'] = qa
if not Gst.Element.link(atee, qa):
printwarn(f"Failed to link audiotee to {qa_name}")
return
if not Gst.Element.link(qa, client['webrtc']):
if not Gst.Element.link(qa, webrtc):
printwarn(f"Failed to link {qa_name} to webrtcbin {webrtc_name}")
return
if qa is not None: qa.sync_state_with_parent()
client['qa'] = qa

if self.midi and (self.midi_thread == None):
self.midi_thread = threading.Thread(target=midi2vdo, args=(self.midi,), daemon=True)
Expand Down Expand Up @@ -9659,6 +9744,16 @@ def _stop_pipeline_internal(self, UUID):
self.pipe.remove(qv)
except Exception as e:
printwarn(f"Failed to cleanup video queue: {e}")

if self.multiviewer and self.pipe:
uuid = client.get('UUID', UUID)
try:
stale_webrtc = self.pipe.get_by_name(uuid)
except Exception:
stale_webrtc = None
self._remove_pipeline_element_by_name(f"qa-{uuid}", upstream=atee, downstream=stale_webrtc)
self._remove_pipeline_element_by_name(f"qv-{uuid}", upstream=vtee, downstream=stale_webrtc)
self._remove_pipeline_element_by_name(uuid)

# Always remove from clients dict, even if cleanup failed
self.clients.pop(UUID, None)
Expand Down
Loading