Skip to content

Commit 421be4d

Browse files
committed
Fix recording shutdown for node docker
Signed-off-by: Viet Nguyen Duc <nguyenducviet4496@gmail.com>
1 parent 48d5121 commit 421be4d

File tree

1 file changed

+58
-38
lines changed

1 file changed

+58
-38
lines changed

Video/video_service.py

Lines changed: 58 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -355,42 +355,51 @@ async def wait_for_node_ready(self) -> None:
355355
f"Waiting for Node /status endpoint: {node_status_url} " f"(verify_ssl={self.node_status_verify_ssl})"
356356
)
357357

358-
while not self.shutdown_event.is_set():
358+
def _fetch_status() -> Optional[dict]:
359+
"""Blocking HTTP fetch run in a thread to avoid blocking the event loop."""
360+
req = Request(node_status_url, headers=headers)
359361
try:
360-
req = Request(node_status_url, headers=headers)
361362
if ssl_context is not None:
362363
resp_ctx = urlopen(req, timeout=5, context=ssl_context)
363364
else:
364365
resp_ctx = urlopen(req, timeout=5)
365-
366366
with resp_ctx as resp:
367367
if resp.status == 200:
368-
body = json.loads(resp.read().decode("utf-8"))
369-
370-
if self.record_standalone:
371-
nodes = body.get("value", {}).get("nodes", [])
372-
if nodes:
373-
node_info = nodes[0]
374-
self.node_id = node_info.get("id")
375-
self.node_external_uri = node_info.get("externalUri")
376-
else:
377-
# Fallback: sidecar connected directly to a node
378-
# (e.g. dynamic grid where /status returns singular "node")
379-
node_info = body.get("value", {}).get("node", {})
380-
self.node_id = node_info.get("nodeId") or node_info.get("id")
381-
self.node_external_uri = node_info.get("externalUri")
368+
return json.loads(resp.read().decode("utf-8"))
369+
except (URLError, OSError, json.JSONDecodeError, ValueError):
370+
pass
371+
return None
372+
373+
while not self.shutdown_event.is_set():
374+
try:
375+
# Run blocking urlopen in a thread so SIGTERM can be processed
376+
# immediately by the event loop without waiting up to 5s.
377+
body = await asyncio.to_thread(_fetch_status)
378+
if body is not None:
379+
if self.record_standalone:
380+
nodes = body.get("value", {}).get("nodes", [])
381+
if nodes:
382+
node_info = nodes[0]
383+
self.node_id = node_info.get("id")
384+
self.node_external_uri = node_info.get("externalUri")
382385
else:
386+
# Fallback: sidecar connected directly to a node
387+
# (e.g. dynamic grid where /status returns singular "node")
383388
node_info = body.get("value", {}).get("node", {})
384-
self.node_id = node_info.get("nodeId")
389+
self.node_id = node_info.get("nodeId") or node_info.get("id")
385390
self.node_external_uri = node_info.get("externalUri")
386-
387-
if self.node_id:
388-
logger.info(f"Node is ready. ID: {self.node_id}, URI: {self.node_external_uri}")
389-
return
390-
else:
391-
logger.warning("Node /status responded but nodeId is missing, retrying...")
392-
except (URLError, OSError, json.JSONDecodeError, ValueError) as e:
393-
logger.debug(f"Node not ready yet: {e}")
391+
else:
392+
node_info = body.get("value", {}).get("node", {})
393+
self.node_id = node_info.get("nodeId")
394+
self.node_external_uri = node_info.get("externalUri")
395+
396+
if self.node_id:
397+
logger.info(f"Node is ready. ID: {self.node_id}, URI: {self.node_external_uri}")
398+
return
399+
else:
400+
logger.warning("Node /status responded but nodeId is missing, retrying...")
401+
else:
402+
logger.debug(f"Node not ready yet: {node_status_url}")
394403
except Exception as e:
395404
logger.warning(f"Unexpected error polling Node /status: {e}")
396405

@@ -488,28 +497,33 @@ async def start_recording(self, session: SessionState) -> bool:
488497

489498
async def stop_recording(self, session: SessionState) -> bool:
490499
"""Stop ffmpeg recording for a session."""
491-
if session.ffmpeg_process is None:
492-
logger.warning(f"No recording in progress for session {session.session_id}")
500+
# Claim the process atomically before the first await. Asyncio is
501+
# cooperative: no other coroutine can run between the check and the
502+
# assignment, so a concurrent caller (e.g. cleanup() racing with
503+
# handle_session_closed()) will see None here and return early,
504+
# preventing double-terminate and double-upload.
505+
proc = session.ffmpeg_process
506+
if proc is None:
493507
return False
508+
session.ffmpeg_process = None
494509

495510
session.status = SessionStatus.STOPPING
496511
session.end_time = datetime.now()
497512

498513
try:
499-
session.ffmpeg_process.terminate()
514+
proc.terminate()
500515
try:
501-
_, stderr_bytes = await asyncio.wait_for(session.ffmpeg_process.communicate(), timeout=10.0)
516+
_, stderr_bytes = await asyncio.wait_for(proc.communicate(), timeout=10.0)
502517
except asyncio.TimeoutError:
503518
logger.warning(f"ffmpeg did not stop gracefully for {session.session_id}, killing")
504-
session.ffmpeg_process.kill()
505-
_, stderr_bytes = await session.ffmpeg_process.communicate()
519+
proc.kill()
520+
_, stderr_bytes = await proc.communicate()
506521

507-
rc = session.ffmpeg_process.returncode
522+
rc = proc.returncode
508523
if stderr_bytes:
509524
stderr_text = stderr_bytes.decode(errors="replace").strip()
510525
if stderr_text:
511526
logger.warning(f"ffmpeg stderr for {session.session_id}: {stderr_text}")
512-
session.ffmpeg_process = None
513527

514528
# 255 is ffmpeg's own graceful-stop exit code (exit_program(255) in its SIGTERM handler).
515529
if rc not in (0, 255, -signal.SIGTERM, -signal.SIGKILL):
@@ -720,8 +734,11 @@ async def handle_session_closed(self, data: dict) -> None:
720734

721735
# Stop recording if in progress
722736
if session.ffmpeg_process is not None:
723-
await self.stop_recording(session)
724-
await self.queue_upload(session)
737+
stopped = await self.stop_recording(session)
738+
if stopped:
739+
await self.queue_upload(session)
740+
else:
741+
logger.warning(f"Recording stop failed for {session_id}, skipping upload")
725742

726743
# Clean up session after a delay (keep for potential late events).
727744
# Tracked so cleanup() can cancel these on shutdown instead of waiting 60s.
@@ -924,8 +941,11 @@ async def cleanup(self) -> None:
924941

925942
for session in active_sessions:
926943
logger.info(f"Stopping recording: {session.session_id}")
927-
await self.stop_recording(session)
928-
await self.queue_upload(session)
944+
stopped = await self.stop_recording(session)
945+
if stopped:
946+
await self.queue_upload(session)
947+
else:
948+
logger.warning(f"Recording stop failed for {session.session_id}, skipping upload")
929949

930950
# Push sentinel so the upload worker exits after draining the queue.
931951
# run() is responsible for awaiting the upload task with a timeout.

0 commit comments

Comments
 (0)