Skip to content

Commit 369908e

Browse files
committed
Add lifecycle management & robust writer shutdown
Introduce lifecycle locking and an "abandoned" state to VideoRecorder to prevent unsafe restarts and improve shutdown handling. start() now acquires a lifecycle lock, refuses to restart when a leftover thread is marked abandoned, and initializes/reset writer state inside the lock. stop() now marks the recorder as abandoned and records an encode error if the writer thread fails to terminate within the timeout, and ensures timestamps are saved. The writer loop was simplified to break directly on sentinel or encode errors and now closes WriteGear during finalization with exception handling. Miscellaneous reordering and small bookkeeping fixes were made to improve safety and logging during shutdown.
1 parent acd15da commit 369908e

File tree

1 file changed

+79
-53
lines changed

1 file changed

+79
-53
lines changed

dlclivegui/services/video_recorder.py

Lines changed: 79 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,22 @@ def __init__(
5353
crf: int = 23,
5454
buffer_size: int = 240,
5555
):
56+
# Config
5657
self._output = Path(output)
5758
self._writer: Any | None = None
5859
self._frame_size = frame_size
5960
self._frame_rate = frame_rate
6061
self._codec = codec
6162
self._crf = int(crf)
6263
self._buffer_size = max(1, int(buffer_size))
64+
# Worker state
6365
self._queue: queue.Queue[Any] | None = None
6466
self._writer_thread: threading.Thread | None = None
6567
self._stop_event = threading.Event()
6668
self._stats_lock = threading.Lock()
69+
self._lifecycle_lock = threading.Lock()
70+
self._abandoned = False
71+
# Stats
6772
self._frames_enqueued = 0
6873
self._frames_written = 0
6974
self._dropped_frames = 0
@@ -81,37 +86,46 @@ def is_running(self) -> bool:
8186
def start(self) -> None:
8287
if WriteGear is None:
8388
raise RuntimeError("vidgear is required for video recording. Install it with 'pip install vidgear'.")
84-
if self._writer is not None:
85-
return
86-
fps_value = float(self._frame_rate) if self._frame_rate else 30.0
87-
88-
writer_kwargs: dict[str, Any] = {
89-
"compression_mode": True,
90-
"logging": False,
91-
"-input_framerate": fps_value,
92-
"-vcodec": (self._codec or "libx264").strip() or "libx264",
93-
"-crf": int(self._crf),
94-
}
95-
# TODO deal with pixel format
96-
97-
self._output.parent.mkdir(parents=True, exist_ok=True)
98-
self._writer = WriteGear(output=str(self._output), **writer_kwargs)
99-
self._queue = queue.Queue(maxsize=self._buffer_size)
100-
self._frames_enqueued = 0
101-
self._frames_written = 0
102-
self._dropped_frames = 0
103-
self._total_latency = 0.0
104-
self._last_latency = 0.0
105-
self._written_times.clear()
106-
self._frame_timestamps.clear()
107-
self._encode_error = None
108-
self._stop_event.clear()
109-
self._writer_thread = threading.Thread(
110-
target=self._writer_loop,
111-
name="VideoRecorderWriter",
112-
daemon=True,
113-
)
114-
self._writer_thread.start()
89+
90+
with self._lifecycle_lock:
91+
if self.is_running:
92+
return
93+
if self._abandoned:
94+
raise RuntimeError("Cannot restart VideoRecorder, as a leftover thread is still running.")
95+
if self._writer is not None:
96+
self._writer = None
97+
self._queue = None
98+
self._writer_thread = None
99+
100+
fps_value = float(self._frame_rate) if self._frame_rate else 30.0
101+
102+
writer_kwargs: dict[str, Any] = {
103+
"compression_mode": True,
104+
"logging": False,
105+
"-input_framerate": fps_value,
106+
"-vcodec": (self._codec or "libx264").strip() or "libx264",
107+
"-crf": int(self._crf),
108+
}
109+
# TODO deal with pixel format
110+
111+
self._output.parent.mkdir(parents=True, exist_ok=True)
112+
self._writer = WriteGear(output=str(self._output), **writer_kwargs)
113+
self._queue = queue.Queue(maxsize=self._buffer_size)
114+
self._frames_enqueued = 0
115+
self._frames_written = 0
116+
self._dropped_frames = 0
117+
self._total_latency = 0.0
118+
self._last_latency = 0.0
119+
self._written_times.clear()
120+
self._frame_timestamps.clear()
121+
self._encode_error = None
122+
self._stop_event.clear()
123+
self._writer_thread = threading.Thread(
124+
target=self._writer_loop,
125+
name="VideoRecorderWriter",
126+
daemon=True,
127+
)
128+
self._writer_thread.start()
115129

116130
def configure_stream(self, frame_size: tuple[int, int], frame_rate: float | None) -> None:
117131
self._frame_size = frame_size
@@ -197,18 +211,30 @@ def stop(self) -> None:
197211
if t is not None:
198212
t.join(timeout=5.0)
199213
if t.is_alive():
200-
logger.warning("Video recorder thread did not terminate cleanly")
201-
return
214+
with self._stats_lock:
215+
self._encode_error = RuntimeError(
216+
"Failed to stop VideoRecorder within timeout; thread is still alive."
217+
)
218+
self._abandoned = True
219+
logger.critical(
220+
"Failed to stop VideoRecorder within timeout; thread is still alive. "
221+
"Marking recorder as abandoned to prevent restart."
222+
)
223+
return
202224

203225
if self._writer is not None:
204226
try:
205227
self._writer.close()
206228
except Exception:
207229
logger.exception("Failed to close WriteGear cleanly")
208230

231+
self._save_timestamps()
232+
209233
self._writer = None
210234
self._writer_thread = None
211235
self._queue = None
236+
self._stop_event.clear()
237+
self._abandoned = False
212238

213239
def get_stats(self) -> RecorderStats | None:
214240
if (
@@ -263,10 +289,9 @@ def _writer_loop(self) -> None:
263289
self._stop_event.set()
264290
break
265291

266-
stop_now = False
267292
try:
268293
if item is _SENTINEL:
269-
stop_now = True
294+
break
270295
else:
271296
frame, timestamp = item
272297
start = time.perf_counter()
@@ -281,19 +306,19 @@ def _writer_loop(self) -> None:
281306
self._encode_error = exc
282307
logger.exception("Video encoding failed while writing frame", exc_info=exc)
283308
self._stop_event.set()
284-
stop_now = True
285-
286-
elapsed = time.perf_counter() - start
287-
now = time.perf_counter()
288-
with self._stats_lock:
289-
self._frames_written += 1
290-
self._total_latency += elapsed
291-
self._last_latency = elapsed
292-
self._written_times.append(now)
293-
self._frame_timestamps.append(timestamp)
294-
if now - self._last_log_time >= 1.0:
295-
self._compute_write_fps_locked()
296-
self._last_log_time = now
309+
break
310+
else:
311+
elapsed = time.perf_counter() - start
312+
now = time.perf_counter()
313+
with self._stats_lock:
314+
self._frames_written += 1
315+
self._total_latency += elapsed
316+
self._last_latency = elapsed
317+
self._written_times.append(now)
318+
self._frame_timestamps.append(timestamp)
319+
if now - self._last_log_time >= 1.0:
320+
self._compute_write_fps_locked()
321+
self._last_log_time = now
297322

298323
finally:
299324
# Ensure queue accounting is correct for every item pulled from q
@@ -302,12 +327,13 @@ def _writer_loop(self) -> None:
302327
except ValueError:
303328
pass
304329

305-
if stop_now:
306-
break
307-
308330
finally:
309-
self._finalize_writer()
310-
self._save_timestamps()
331+
writer = self._writer
332+
if writer is not None:
333+
try:
334+
writer.close()
335+
except Exception:
336+
logger.exception("Failed to close WriteGear during writer loop finalization")
311337

312338
def _finalize_writer(self) -> None:
313339
writer = self._writer

0 commit comments

Comments
 (0)