From 13c1c118e1e0e25743601babad9819a69a58f3da Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 19 Jun 2026 14:16:17 -0700 Subject: [PATCH 1/7] Format moq-source.cpp with clang-format Bring the file into compliance with the repo's clang-format@19 config so the format check passes on subsequent changes. Pure formatting, no behavior change. Co-Authored-By: Claude Opus 4.8 --- src/moq-source.cpp | 119 ++++++++++++++++++++------------------------- 1 file changed, 54 insertions(+), 65 deletions(-) diff --git a/src/moq-source.cpp b/src/moq-source.cpp index 824b189..061d35c 100644 --- a/src/moq-source.cpp +++ b/src/moq-source.cpp @@ -28,28 +28,23 @@ static AVCodecID codec_string_to_id(const char *codec, size_t len) } // H.264/AVC - if ((len >= 4 && strncasecmp(codec, "h264", 4) == 0) || - (len >= 3 && strncasecmp(codec, "avc", 3) == 0)) { + if ((len >= 4 && strncasecmp(codec, "h264", 4) == 0) || (len >= 3 && strncasecmp(codec, "avc", 3) == 0)) { return AV_CODEC_ID_H264; } // HEVC/H.265 - if ((len >= 4 && strncasecmp(codec, "hevc", 4) == 0) || - (len >= 4 && strncasecmp(codec, "h265", 4) == 0) || - (len >= 4 && strncasecmp(codec, "hev1", 4) == 0) || - (len >= 4 && strncasecmp(codec, "hvc1", 4) == 0)) { + if ((len >= 4 && strncasecmp(codec, "hevc", 4) == 0) || (len >= 4 && strncasecmp(codec, "h265", 4) == 0) || + (len >= 4 && strncasecmp(codec, "hev1", 4) == 0) || (len >= 4 && strncasecmp(codec, "hvc1", 4) == 0)) { return AV_CODEC_ID_HEVC; } // VP9 - if ((len >= 3 && strncasecmp(codec, "vp9", 3) == 0) || - (len >= 4 && strncasecmp(codec, "vp09", 4) == 0)) { + if ((len >= 3 && strncasecmp(codec, "vp9", 3) == 0) || (len >= 4 && strncasecmp(codec, "vp09", 4) == 0)) { return AV_CODEC_ID_VP9; } // AV1 - if ((len >= 3 && strncasecmp(codec, "av1", 3) == 0) || - (len >= 4 && strncasecmp(codec, "av01", 4) == 0)) { + if ((len >= 3 && strncasecmp(codec, "av1", 3) == 0) || (len >= 4 && strncasecmp(codec, "av01", 4) == 0)) { return AV_CODEC_ID_AV1; } @@ -72,8 +67,8 @@ struct moq_source { std::atomic shutting_down; // Session handles (all negative = invalid) - std::atomic generation; // Increments on reconnect - bool reconnect_in_progress; // True while reconnect is happening + std::atomic generation; // Increments on reconnect + bool reconnect_in_progress; // True while reconnect is happening int32_t origin; int32_t session; int32_t consume; @@ -82,12 +77,12 @@ struct moq_source { // Decoder state AVCodecContext *codec_ctx; - AVCodecID current_codec_id; // Currently configured codec - enum AVPixelFormat current_pix_fmt; // Current pixel format for sws_ctx + AVCodecID current_codec_id; // Currently configured codec + enum AVPixelFormat current_pix_fmt; // Current pixel format for sws_ctx struct SwsContext *sws_ctx; bool got_keyframe; - uint32_t frames_waiting_for_keyframe; // Count of skipped frames while waiting - uint32_t consecutive_decode_errors; // Count of consecutive decode failures + uint32_t frames_waiting_for_keyframe; // Count of skipped frames while waiting + uint32_t consecutive_decode_errors; // Count of consecutive decode failures // Output frame buffer struct obs_source_frame frame; @@ -204,12 +199,11 @@ static void moq_source_update(void *data, obs_data_t *settings) pthread_mutex_lock(&ctx->mutex); // Check if settings actually changed - bool url_changed = (!ctx->url && url && strlen(url) > 0) || - (ctx->url && !url) || - (ctx->url && url && strcmp(ctx->url, url) != 0); + bool url_changed = (!ctx->url && url && strlen(url) > 0) || (ctx->url && !url) || + (ctx->url && url && strcmp(ctx->url, url) != 0); bool broadcast_changed = (!ctx->broadcast && broadcast && strlen(broadcast) > 0) || - (ctx->broadcast && !broadcast) || - (ctx->broadcast && broadcast && strcmp(ctx->broadcast, broadcast) != 0); + (ctx->broadcast && !broadcast) || + (ctx->broadcast && broadcast && strcmp(ctx->broadcast, broadcast) != 0); bool settings_changed = url_changed || broadcast_changed; // Store the new settings @@ -219,15 +213,14 @@ static void moq_source_update(void *data, obs_data_t *settings) ctx->broadcast = bstrdup(broadcast); // Check if new settings are valid for connection - bool valid = ctx->url && ctx->broadcast && - strlen(ctx->url) > 0 && strlen(ctx->broadcast) > 0; + bool valid = ctx->url && ctx->broadcast && strlen(ctx->url) > 0 && strlen(ctx->broadcast) > 0; pthread_mutex_unlock(&ctx->mutex); // If settings changed and are valid, reconnect if (settings_changed && valid) { - LOG_INFO("Settings changed, reconnecting (url=%s, broadcast=%s)", - url ? url : "(null)", broadcast ? broadcast : "(null)"); + LOG_INFO("Settings changed, reconnecting (url=%s, broadcast=%s)", url ? url : "(null)", + broadcast ? broadcast : "(null)"); moq_source_reconnect(ctx); } else if (settings_changed && !valid) { LOG_INFO("Settings changed but invalid - disconnecting"); @@ -468,12 +461,10 @@ static void moq_source_reconnect(struct moq_source *ctx) } // Connect to MoQ server (consume will happen in on_session_status callback) - int32_t new_session = moq_session_connect( - url_copy, strlen(url_copy), - 0, // origin_publish - new_origin, // origin_consume - on_session_status, ctx - ); + int32_t new_session = moq_session_connect(url_copy, strlen(url_copy), + 0, // origin_publish + new_origin, // origin_consume + on_session_status, ctx); bfree(url_copy); if (new_session < 0) { @@ -669,7 +660,8 @@ static bool moq_source_init_decoder(struct moq_source *ctx, const struct moq_vid // Use codec description as extradata (contains SPS/PPS for H.264, VPS/SPS/PPS for HEVC, etc.) if (config->description && config->description_len > 0) { - new_codec_ctx->extradata = (uint8_t *)av_mallocz(config->description_len + AV_INPUT_BUFFER_PADDING_SIZE); + new_codec_ctx->extradata = + (uint8_t *)av_mallocz(config->description_len + AV_INPUT_BUFFER_PADDING_SIZE); if (new_codec_ctx->extradata) { memcpy(new_codec_ctx->extradata, config->description, config->description_len); new_codec_ctx->extradata_size = static_cast(config->description_len); @@ -711,9 +703,9 @@ static bool moq_source_init_decoder(struct moq_source *ctx, const struct moq_vid // dynamically on first decoded frame when we know the actual pixel format ctx->codec_ctx = new_codec_ctx; ctx->current_codec_id = codec_id; - ctx->current_pix_fmt = AV_PIX_FMT_NONE; // Will be set on first frame - ctx->sws_ctx = NULL; // Will be created on first frame with actual pixel format - ctx->frame_buffer = NULL; // Will be allocated on first frame with actual dimensions + ctx->current_pix_fmt = AV_PIX_FMT_NONE; // Will be set on first frame + ctx->sws_ctx = NULL; // Will be created on first frame with actual pixel format + ctx->frame_buffer = NULL; // Will be allocated on first frame with actual dimensions ctx->frame.width = width; ctx->frame.height = height; ctx->frame.linesize[0] = width * 4; @@ -732,8 +724,8 @@ static bool moq_source_init_decoder(struct moq_source *ctx, const struct moq_vid if (config->codec && copy_len > 0) { memcpy(codec_str, config->codec, copy_len); } - LOG_INFO("Decoder initialized: codec=%s, dimensions=%ux%u (may be refined on first frame)", - codec_str, width, height); + LOG_INFO("Decoder initialized: codec=%s, dimensions=%ux%u (may be refined on first frame)", codec_str, width, + height); return true; } @@ -798,10 +790,9 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) // Skip non-keyframes until we get the first one if (!ctx->got_keyframe && !frame_data.keyframe) { ctx->frames_waiting_for_keyframe++; - if (ctx->frames_waiting_for_keyframe == 1 || - (ctx->frames_waiting_for_keyframe % 30) == 0) { + if (ctx->frames_waiting_for_keyframe == 1 || (ctx->frames_waiting_for_keyframe % 30) == 0) { LOG_INFO("Waiting for keyframe... (skipped %u frames so far)", - ctx->frames_waiting_for_keyframe); + ctx->frames_waiting_for_keyframe); } pthread_mutex_unlock(&ctx->mutex); moq_consume_frame_close(frame_id); @@ -812,7 +803,7 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) if (frame_data.keyframe) { if (!ctx->got_keyframe) { LOG_INFO("Got keyframe after waiting for %u frames, payload_size=%zu", - ctx->frames_waiting_for_keyframe, frame_data.payload_size); + ctx->frames_waiting_for_keyframe, frame_data.payload_size); // Flush decoder to ensure clean state when starting from keyframe avcodec_flush_buffers(ctx->codec_ctx); } @@ -847,7 +838,7 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) // If too many consecutive errors, flush decoder and wait for next keyframe if (ctx->consecutive_decode_errors >= 5) { LOG_WARNING("Too many send errors (%u), flushing decoder and waiting for keyframe", - ctx->consecutive_decode_errors); + ctx->consecutive_decode_errors); avcodec_flush_buffers(ctx->codec_ctx); ctx->got_keyframe = false; ctx->consecutive_decode_errors = 0; @@ -878,7 +869,7 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) // If too many consecutive errors, flush decoder and wait for next keyframe if (ctx->consecutive_decode_errors >= 5) { LOG_WARNING("Too many decode errors (%u), flushing decoder and waiting for keyframe", - ctx->consecutive_decode_errors); + ctx->consecutive_decode_errors); avcodec_flush_buffers(ctx->codec_ctx); ctx->got_keyframe = false; ctx->consecutive_decode_errors = 0; @@ -904,18 +895,18 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) if (need_reinit) { if (dimensions_changed) { - LOG_INFO("Decoded frame dimensions changed: %ux%u -> %dx%d", - ctx->frame.width, ctx->frame.height, frame->width, frame->height); + LOG_INFO("Decoded frame dimensions changed: %ux%u -> %dx%d", ctx->frame.width, + ctx->frame.height, frame->width, frame->height); } if (pix_fmt_changed) { - LOG_INFO("Decoded frame pixel format changed: %d -> %d (%s)", - ctx->current_pix_fmt, decoded_pix_fmt, - av_get_pix_fmt_name(decoded_pix_fmt) ? av_get_pix_fmt_name(decoded_pix_fmt) : "unknown"); + LOG_INFO("Decoded frame pixel format changed: %d -> %d (%s)", ctx->current_pix_fmt, + decoded_pix_fmt, + av_get_pix_fmt_name(decoded_pix_fmt) ? av_get_pix_fmt_name(decoded_pix_fmt) + : "unknown"); } // Validate that dimensions are positive and reasonable - if (frame->width <= 0 || frame->height <= 0 || - frame->width > 16384 || frame->height > 16384) { + if (frame->width <= 0 || frame->height <= 0 || frame->width > 16384 || frame->height > 16384) { LOG_ERROR("Invalid decoded frame dimensions: %dx%d", frame->width, frame->height); av_frame_free(&frame); pthread_mutex_unlock(&ctx->mutex); @@ -939,15 +930,14 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) } // Create new scaling context with the actual pixel format from the decoded frame - struct SwsContext *new_sws_ctx = sws_getContext( - frame->width, frame->height, decoded_pix_fmt, - frame->width, frame->height, AV_PIX_FMT_RGBA, - SWS_BILINEAR, NULL, NULL, NULL - ); + struct SwsContext *new_sws_ctx = sws_getContext(frame->width, frame->height, decoded_pix_fmt, + frame->width, frame->height, AV_PIX_FMT_RGBA, + SWS_BILINEAR, NULL, NULL, NULL); if (!new_sws_ctx) { - LOG_ERROR("Failed to create scaling context for %dx%d pix_fmt=%d (%s)", - frame->width, frame->height, decoded_pix_fmt, - av_get_pix_fmt_name(decoded_pix_fmt) ? av_get_pix_fmt_name(decoded_pix_fmt) : "unknown"); + LOG_ERROR("Failed to create scaling context for %dx%d pix_fmt=%d (%s)", frame->width, + frame->height, decoded_pix_fmt, + av_get_pix_fmt_name(decoded_pix_fmt) ? av_get_pix_fmt_name(decoded_pix_fmt) + : "unknown"); av_frame_free(&frame); pthread_mutex_unlock(&ctx->mutex); moq_consume_frame_close(frame_id); @@ -958,8 +948,8 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) size_t new_buffer_size = (size_t)frame->width * (size_t)frame->height * 4; uint8_t *new_frame_buffer = (uint8_t *)bmalloc(new_buffer_size); if (!new_frame_buffer) { - LOG_ERROR("Failed to allocate frame buffer for %dx%d (%zu bytes)", - frame->width, frame->height, new_buffer_size); + LOG_ERROR("Failed to allocate frame buffer for %dx%d (%zu bytes)", frame->width, frame->height, + new_buffer_size); sws_freeContext(new_sws_ctx); av_frame_free(&frame); pthread_mutex_unlock(&ctx->mutex); @@ -981,17 +971,16 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) ctx->frame.linesize[0] = frame->width * 4; ctx->frame.data[0] = new_frame_buffer; - LOG_INFO("Scaler initialized for %dx%d pix_fmt=%s", - frame->width, frame->height, - av_get_pix_fmt_name(decoded_pix_fmt) ? av_get_pix_fmt_name(decoded_pix_fmt) : "unknown"); + LOG_INFO("Scaler initialized for %dx%d pix_fmt=%s", frame->width, frame->height, + av_get_pix_fmt_name(decoded_pix_fmt) ? av_get_pix_fmt_name(decoded_pix_fmt) : "unknown"); } // Convert YUV420P to RGBA uint8_t *dst_data[4] = {ctx->frame_buffer, NULL, NULL, NULL}; int dst_linesize[4] = {static_cast(ctx->frame.width * 4), 0, 0, 0}; - sws_scale(ctx->sws_ctx, (const uint8_t *const *)frame->data, frame->linesize, - 0, ctx->frame.height, dst_data, dst_linesize); + sws_scale(ctx->sws_ctx, (const uint8_t *const *)frame->data, frame->linesize, 0, ctx->frame.height, dst_data, + dst_linesize); // Update OBS frame timestamp and output ctx->frame.timestamp = frame_data.timestamp_us; From ffcd5c4094cfc5069d32bd352d4b76ee941a28a3 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 19 Jun 2026 14:18:00 -0700 Subject: [PATCH 2/7] Fix MoQ source teardown UAF; drop unnecessary reconnect sleep moq_source_destroy freed ctx after a fixed os_sleep_ms(100) that was meant to let in-flight async MoQ callbacks drain (they check shutting_down and exit early). That is a timing guess, not synchronization: the decode callback runs FFmpeg avcodec_send_packet/receive_frame, which can exceed 100ms on a large keyframe or slow/contended hardware, so a callback could touch ctx after it was freed. The code documented this limitation itself. Replace the sleep with real quiescence: - Add an in-flight callback counter (active_callbacks) and a condition variable (callbacks_done) to moq_source. - A callback_guard RAII type increments the counter on callback entry (unless shutdown has begun) and broadcasts when it returns to zero. Each top-level callback (on_session_status, on_catalog, on_video_frame) holds one for its whole stack frame, so ctx cannot be freed while a callback that may touch it is still running. - destroy sets shutting_down, disconnects, then waits on callbacks_done until the count is zero - all under the mutex so the handoff is atomic. Also drop the os_sleep_ms(50) in the reconnect path. libmoq origins and sessions are fully independent (each origin is a distinct random instance, each session its own task): moq_origin_close removes the origin synchronously and moq_session_close only signals the old session's task to wind down on the libmoq runtime thread. The new origin/session share no client-side state with the closed one, so the delay guarded nothing - it was pure latency. Co-Authored-By: Claude Opus 4.8 --- src/moq-source.cpp | 110 +++++++++++++++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 28 deletions(-) diff --git a/src/moq-source.cpp b/src/moq-source.cpp index 061d35c..fc4feff 100644 --- a/src/moq-source.cpp +++ b/src/moq-source.cpp @@ -66,6 +66,13 @@ struct moq_source { // Shutdown flag - set when destroy begins, callbacks should exit early std::atomic shutting_down; + // In-flight async callback tracking. Each top-level MoQ callback holds a + // callback_guard for its lifetime, incrementing active_callbacks on entry + // and signaling callbacks_done when the count returns to zero. destroy waits + // on this for real quiescence before freeing ctx (no timing guesswork). + int active_callbacks; // guarded by mutex + pthread_cond_t callbacks_done; // signaled when active_callbacks hits 0 + // Session handles (all negative = invalid) std::atomic generation; // Increments on reconnect bool reconnect_in_progress; // True while reconnect is happening @@ -92,6 +99,43 @@ struct moq_source { pthread_mutex_t mutex; }; +// RAII guard that tracks an in-flight async MoQ callback. Constructing it +// registers the callback (unless we are already shutting down); destruction +// unregisters it and wakes moq_source_destroy if it was the last one. This +// gives destroy a real synchronization point instead of a fixed sleep: ctx is +// never freed while a callback frame that may touch it is still on the stack. +namespace { +struct callback_guard { + struct moq_source *ctx; + bool active; + + explicit callback_guard(struct moq_source *c) : ctx(c) + { + pthread_mutex_lock(&ctx->mutex); + active = !ctx->shutting_down.load(); + if (active) + ctx->active_callbacks++; + pthread_mutex_unlock(&ctx->mutex); + } + + ~callback_guard() + { + if (!active) + return; + pthread_mutex_lock(&ctx->mutex); + if (--ctx->active_callbacks == 0) + pthread_cond_broadcast(&ctx->callbacks_done); + pthread_mutex_unlock(&ctx->mutex); + } + + callback_guard(const callback_guard &) = delete; + callback_guard &operator=(const callback_guard &) = delete; + + // True if the callback may proceed (i.e. shutdown had not begun on entry). + bool entered() const { return active; } +}; +} // namespace + // Forward declarations static void moq_source_update(void *data, obs_data_t *settings); static void moq_source_destroy(void *data); @@ -119,6 +163,10 @@ static void *moq_source_create(obs_data_t *settings, obs_source_t *source) // Initialize shutdown flag ctx->shutting_down = false; + // Initialize in-flight callback tracking + ctx->active_callbacks = 0; + pthread_cond_init(&ctx->callbacks_done, NULL); + // Initialize handles to invalid values ctx->generation = 0; ctx->reconnect_in_progress = false; @@ -158,32 +206,28 @@ static void moq_source_destroy(void *data) { struct moq_source *ctx = (struct moq_source *)data; - // Set shutdown flag first - callbacks will check this and exit early + // Set shutdown flag first, then disconnect, then wait for any in-flight + // async callbacks to finish - all under the mutex so the handoff is atomic. pthread_mutex_lock(&ctx->mutex); ctx->shutting_down = true; moq_source_disconnect_locked(ctx); - pthread_mutex_unlock(&ctx->mutex); - // Give MoQ callbacks time to drain - they check shutting_down and exit early. - // This prevents use-after-free when async callbacks fire after ctx is freed. - // - // LIMITATION: This 100ms sleep is a timing-based workaround, not a synchronization - // guarantee. If a callback is mid-execution when shutting_down is set AND takes - // longer than 100ms to complete (after the mutex unlock), there is still a - // potential race condition. In practice, our callbacks are fast (< 1ms typically) - // and this delay provides sufficient margin. However, a more robust solution - // would use reference counting: - // - Increment refcount when entering a callback - // - Decrement when exiting - // - Wait for refcount to reach zero before freeing ctx - // This could be implemented using std::shared_ptr or a manual atomic refcount - // with a condition variable for waiting. - os_sleep_ms(100); + // Wait for true quiescence rather than sleeping a fixed interval. Any + // callback that entered before shutting_down was set has incremented + // active_callbacks (via callback_guard) and will broadcast callbacks_done + // when it unwinds. Callbacks arriving after this point observe shutting_down + // under the mutex and never increment. This eliminates the use-after-free + // window that a timed sleep could not close (e.g. a decode callback running + // FFmpeg longer than the old 100ms budget). + while (ctx->active_callbacks > 0) + pthread_cond_wait(&ctx->callbacks_done, &ctx->mutex); + pthread_mutex_unlock(&ctx->mutex); bfree(ctx->url); bfree(ctx->broadcast); // Note: frame_buffer is already freed by moq_source_disconnect_locked + pthread_cond_destroy(&ctx->callbacks_done); pthread_mutex_destroy(&ctx->mutex); bfree(ctx); @@ -257,14 +301,16 @@ static void on_session_status(void *user_data, int32_t code) { struct moq_source *ctx = (struct moq_source *)user_data; - // Fast path: check atomic flag before taking lock - if (ctx->shutting_down.load()) { + // Register this callback for the duration of its execution. If shutdown has + // begun, entered() is false and we must not touch ctx. + callback_guard guard(ctx); + if (!guard.entered()) { LOG_DEBUG("Ignoring session status callback - shutting down"); return; } pthread_mutex_lock(&ctx->mutex); - // Double-check after acquiring lock (may have changed) + // Double-check after acquiring lock (shutdown may have begun mid-callback) if (ctx->shutting_down.load()) { pthread_mutex_unlock(&ctx->mutex); return; @@ -307,8 +353,9 @@ static void on_catalog(void *user_data, int32_t catalog) LOG_INFO("Catalog callback received: %d", catalog); - // Fast path: check atomic flag before taking lock - if (ctx->shutting_down.load()) { + // Register this callback for the duration of its execution. + callback_guard guard(ctx); + if (!guard.entered()) { LOG_DEBUG("Ignoring catalog callback - shutting down"); if (catalog >= 0) moq_consume_catalog_close(catalog); @@ -317,7 +364,7 @@ static void on_catalog(void *user_data, int32_t catalog) pthread_mutex_lock(&ctx->mutex); - // Double-check after acquiring lock (may have changed) + // Double-check after acquiring lock (shutdown may have begun mid-callback) if (ctx->shutting_down.load()) { pthread_mutex_unlock(&ctx->mutex); if (catalog >= 0) @@ -393,8 +440,10 @@ static void on_video_frame(void *user_data, int32_t frame_id) return; } - // Fast path: check atomic flag before taking lock - if (ctx->shutting_down.load()) { + // Register this callback for the duration of its execution (which includes + // the FFmpeg decode in moq_source_decode_frame). + callback_guard guard(ctx); + if (!guard.entered()) { moq_consume_frame_close(frame_id); return; } @@ -403,7 +452,7 @@ static void on_video_frame(void *user_data, int32_t frame_id) // Note: We can't check video_track here because frames may arrive before // the track handle is stored in on_catalog (race condition) pthread_mutex_lock(&ctx->mutex); - // Double-check after acquiring lock (may have changed) + // Double-check after acquiring lock (shutdown may have begun mid-callback) if (ctx->shutting_down.load()) { pthread_mutex_unlock(&ctx->mutex); moq_consume_frame_close(frame_id); @@ -446,8 +495,13 @@ static void moq_source_reconnect(struct moq_source *ctx) // Blank video while reconnecting to avoid showing stale frames moq_source_blank_video(ctx); - // Small delay to allow MoQ library to fully clean up previous connection - os_sleep_ms(50); + // No delay needed before reconnecting: libmoq origins and sessions are fully + // independent (each origin is a distinct random instance, each session its own + // task), so the new connection shares no client-side state with the one we just + // closed. moq_origin_close removes the origin synchronously, and moq_session_close + // only signals the old session's task to wind down asynchronously on the libmoq + // runtime thread - nothing the new origin/session can collide with. (The previous + // os_sleep_ms(50) here was a timing band-aid that provided no real guarantee.) // Create origin for consuming (outside mutex since it may block) int32_t new_origin = moq_origin_create(); From 609cd9af26ecccf88ad6b2e29d7aade381310ab0 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 19 Jun 2026 16:29:43 -0700 Subject: [PATCH 3/7] Update libmoq to 0.3.6 Bumps MOQ_VERSION 0.2.14 -> 0.3.6 (latest released; 0.3.7 is tagged but has no published release artifacts yet). The fetched API is signature-compatible for everything the plugin uses, and the moq_video_config / moq_frame structs are unchanged - the only breaking change is semantic: the session on_status codes were redefined in 0.3.0. Old (0.2.x): 0 = connected, non-zero = failure. New (0.3.x): > 0 = (re)connected carrying the connection epoch (1 = first connect, 2 = first reconnect, ...); 0 = closed cleanly (terminal); < 0 = fatal / reconnect permanently gave up (terminal). Update both session status callbacks accordingly: - moq-source.cpp on_session_status: treat > 0 as connected (was treating the new epoch-1 connect as a failure, which would have broken connecting entirely against 0.3.x). Start consuming only on the first connect; on later epochs libmoq re-subscribes our existing consumer automatically since the origin outlives the connection, so recreating it would leak handles. Handle 0 (clean close) and < 0 (error) as the terminal cases. - moq-output.cpp: same > 0 / <= 0 split for its informational connect log. A nice side effect: 0.3.x sessions auto-reconnect internally with backoff, so transient transport drops now recover without the plugin's manual reconnect (which remains for settings changes). Co-Authored-By: Claude Opus 4.8 --- CMakePresets.json | 2 +- src/moq-output.cpp | 15 +++++++++------ src/moq-source.cpp | 28 +++++++++++++++++++++------- 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/CMakePresets.json b/CMakePresets.json index b47a12a..6ebb63d 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -14,7 +14,7 @@ "ENABLE_QT": true, "CMAKE_EXPORT_COMPILE_COMMANDS": true, "BUILD_PLUGIN": true, - "MOQ_VERSION": "0.2.14", + "MOQ_VERSION": "0.3.6", "MOQ_ARCHIVE": "tar.gz" } }, diff --git a/src/moq-output.cpp b/src/moq-output.cpp index e08af33..5c42a4f 100644 --- a/src/moq-output.cpp +++ b/src/moq-output.cpp @@ -75,17 +75,20 @@ bool MoQOutput::Start() connect_start = std::chrono::steady_clock::now(); - // Create a callback to log when the session is connected or closed - auto session_connect_callback = [](void *user_data, int error_code) { + // Create a callback to log when the session is connected or closed. + // libmoq status codes (>= 0.3.0): > 0 = (re)connected (epoch), 0 = closed + // cleanly (terminal), < 0 = fatal/reconnect-gave-up (terminal). + auto session_connect_callback = [](void *user_data, int code) { auto self = static_cast(user_data); - if (error_code == 0) { + if (code > 0) { auto elapsed = std::chrono::steady_clock::now() - self->connect_start; - self->connect_time_ms = static_cast(std::chrono::duration_cast(elapsed).count()); - LOG_INFO("MoQ session established (%d ms): %s", self->connect_time_ms, + self->connect_time_ms = static_cast( + std::chrono::duration_cast(elapsed).count()); + LOG_INFO("MoQ session connected (%d ms, epoch %d): %s", self->connect_time_ms, code, self->server_url.c_str()); } else { - LOG_INFO("MoQ session closed (%d): %s", error_code, self->server_url.c_str()); + LOG_INFO("MoQ session closed (%d): %s", code, self->server_url.c_str()); } }; diff --git a/src/moq-source.cpp b/src/moq-source.cpp index fc4feff..2e487c0 100644 --- a/src/moq-source.cpp +++ b/src/moq-source.cpp @@ -322,16 +322,30 @@ static void on_session_status(void *user_data, int32_t code) } uint32_t current_gen = ctx->generation; - if (code == 0) { + // libmoq status codes (>= 0.3.0): + // > 0 : (re)connected, carrying the connection epoch (1 = first connect, + // 2 = first reconnect, ...). The session auto-reconnects internally. + // = 0 : closed cleanly via moq_session_close (terminal) - we initiated it. + // < 0 : reconnect permanently gave up or fatal error (terminal). + if (code > 0) { pthread_mutex_unlock(&ctx->mutex); - LOG_INFO("MoQ session connected successfully (generation %u)", current_gen); - // Now that we're connected, start consuming the broadcast - moq_source_start_consume(ctx, current_gen); + LOG_INFO("MoQ session connected (generation %u, epoch %d)", current_gen, code); + // Start consuming only on the first connect. On later epochs libmoq has + // re-subscribed our existing consumer automatically (the origin outlives + // the connection), so recreating it would leak handles. + if (code == 1) { + moq_source_start_consume(ctx, current_gen); + } + } else if (code == 0) { + // Clean close - we asked for this (disconnect/reconnect/destroy). The + // handle is already being torn down; nothing to do here. + pthread_mutex_unlock(&ctx->mutex); + LOG_DEBUG("MoQ session closed cleanly (generation %u)", current_gen); } else { - // Connection failed - clean up the session and origin immediately - LOG_ERROR("MoQ session failed with code: %d (generation %u)", code, current_gen); + // Terminal error (e.g. auth failure, or reconnect gave up). + LOG_ERROR("MoQ session error: %d (generation %u)", code, current_gen); - // Clean up failed session/origin to prevent further callbacks + // Clean up the failed session/origin to prevent further callbacks. if (ctx->session >= 0) { moq_session_close(ctx->session); ctx->session = -1; From ea3b5171f86a5e0928171b8f1b9304ee77f5a8a2 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 19 Jun 2026 17:02:52 -0700 Subject: [PATCH 4/7] Rework source teardown around libmoq terminal callbacks The previous teardown counted in-flight callback *executions* and waited for that count to hit zero. That closes the window where a callback is mid-execution when destroy runs, but not the window where libmoq delivers a callback *after* the count reached zero: each subscription fires one terminal callback (status <= 0) asynchronously after it is closed, which could land after destroy had already freed ctx. libmoq >= 0.3.0 documents a terminal-callback lifetime contract: user_data must stay valid until each subscription's terminal callback. Rework teardown to honor it directly via subscription reference counting: - ctx holds one reference for the OBS-owned source plus one per outstanding subscription handed `ctx` as user_data (session, catalog, video track). The ref is pre-incremented before the libmoq registration call (undone if it fails) and released by that subscription's terminal callback via the subscription_ref RAII guard. Because libmoq runs all callbacks serially on one runtime thread, the ref is held continuously from registration through the terminal callback, so ctx is always valid inside any callback. - destroy sets shutting_down, closes every subscription (which makes each terminal fire promptly - libmoq's biased close path wins over pending updates), drops the owner ref, and waits for the count to reach zero. A generous bounded wait backstops a subscription that never terminates so a broken teardown degrades to a logged warning instead of hanging OBS. Along the way this fixes a latent handle-management bug: the catalog *snapshot* id delivered to on_catalog was being stored and closed with moq_consume_catalog_close (which expects a *subscription* id from a different slab - an id-collision hazard). Now the real subscription handle from moq_consume_catalog is stored and closed, snapshots are freed with moq_consume_catalog_free on every path, and a catalog update closes the previous video track instead of leaking it. Also: a fatal session error now tears down all subscriptions (not just session+origin) so their terminals fire promptly, and reconnect bails if shutting down. The refcount balance and deadlock-freedom were checked by two independent adversarial reviews. Still wants runtime validation: connect/disconnect, scene-switch teardown with a decode in flight, settings-change reconnect, and repeated catalog updates. Co-Authored-By: Claude Opus 4.8 --- src/moq-source.cpp | 319 +++++++++++++++++++++++++++------------------ 1 file changed, 189 insertions(+), 130 deletions(-) diff --git a/src/moq-source.cpp b/src/moq-source.cpp index 2e487c0..c0473db 100644 --- a/src/moq-source.cpp +++ b/src/moq-source.cpp @@ -5,6 +5,8 @@ #include #include +#include +#include #ifdef _WIN32 #define strncasecmp _strnicmp @@ -66,12 +68,16 @@ struct moq_source { // Shutdown flag - set when destroy begins, callbacks should exit early std::atomic shutting_down; - // In-flight async callback tracking. Each top-level MoQ callback holds a - // callback_guard for its lifetime, incrementing active_callbacks on entry - // and signaling callbacks_done when the count returns to zero. destroy waits - // on this for real quiescence before freeing ctx (no timing guesswork). - int active_callbacks; // guarded by mutex - pthread_cond_t callbacks_done; // signaled when active_callbacks hits 0 + // Lifetime reference count, guarded by mutex. ctx must outlive every libmoq + // subscription that was handed `ctx` as user_data: each delivers exactly one + // terminal callback (status <= 0), the documented last touch of user_data + // (libmoq >= 0.3.0). We hold one reference for the OBS-owned source plus one + // per outstanding subscription (session, catalog, video track); a + // subscription's reference is released by its terminal callback (see + // subscription_ref). destroy drops the owner reference and waits for the + // count to reach zero before freeing. + int refs; + pthread_cond_t refs_zero; // signaled when refs reaches 0 // Session handles (all negative = invalid) std::atomic generation; // Increments on reconnect @@ -99,40 +105,38 @@ struct moq_source { pthread_mutex_t mutex; }; -// RAII guard that tracks an in-flight async MoQ callback. Constructing it -// registers the callback (unless we are already shutting down); destruction -// unregisters it and wakes moq_source_destroy if it was the last one. This -// gives destroy a real synchronization point instead of a fixed sleep: ctx is -// never freed while a callback frame that may touch it is still on the stack. +// RAII helper that releases a subscription's lifetime reference when its async +// callback returns. libmoq (>= 0.3.0) hands `ctx` to each subscription as +// user_data and guarantees exactly one terminal callback (status code <= 0), +// after which user_data is never touched again. Each callback constructs one of +// these with `terminal` set on that terminal code; on scope exit it drops the +// matching reference and wakes moq_source_destroy when the last one is gone. +// Releasing on scope exit (not entry) keeps ctx valid for the whole callback +// body, including a terminal callback that still reads ctx before returning. +// +// Because libmoq runs all callbacks on a single runtime thread, a subscription's +// reference is held continuously from registration through its terminal +// callback, so ctx is always valid on entry to any of its callbacks - no +// shutting_down pre-check is needed for safety. namespace { -struct callback_guard { +struct subscription_ref { struct moq_source *ctx; - bool active; + bool terminal; - explicit callback_guard(struct moq_source *c) : ctx(c) - { - pthread_mutex_lock(&ctx->mutex); - active = !ctx->shutting_down.load(); - if (active) - ctx->active_callbacks++; - pthread_mutex_unlock(&ctx->mutex); - } + subscription_ref(struct moq_source *c, bool is_terminal) : ctx(c), terminal(is_terminal) {} - ~callback_guard() + ~subscription_ref() { - if (!active) + if (!terminal) return; pthread_mutex_lock(&ctx->mutex); - if (--ctx->active_callbacks == 0) - pthread_cond_broadcast(&ctx->callbacks_done); + if (--ctx->refs == 0) + pthread_cond_broadcast(&ctx->refs_zero); pthread_mutex_unlock(&ctx->mutex); } - callback_guard(const callback_guard &) = delete; - callback_guard &operator=(const callback_guard &) = delete; - - // True if the callback may proceed (i.e. shutdown had not begun on entry). - bool entered() const { return active; } + subscription_ref(const subscription_ref &) = delete; + subscription_ref &operator=(const subscription_ref &) = delete; }; } // namespace @@ -163,9 +167,10 @@ static void *moq_source_create(obs_data_t *settings, obs_source_t *source) // Initialize shutdown flag ctx->shutting_down = false; - // Initialize in-flight callback tracking - ctx->active_callbacks = 0; - pthread_cond_init(&ctx->callbacks_done, NULL); + // One lifetime reference for the OBS-owned source itself; each subscription + // adds its own while outstanding. + ctx->refs = 1; + pthread_cond_init(&ctx->refs_zero, NULL); // Initialize handles to invalid values ctx->generation = 0; @@ -206,28 +211,40 @@ static void moq_source_destroy(void *data) { struct moq_source *ctx = (struct moq_source *)data; - // Set shutdown flag first, then disconnect, then wait for any in-flight - // async callbacks to finish - all under the mutex so the handoff is atomic. pthread_mutex_lock(&ctx->mutex); ctx->shutting_down = true; + + // Close every subscription. Each was handed `ctx` as user_data and now + // delivers its terminal callback, which releases the matching reference. + // Closing via the handle makes that terminal fire promptly (libmoq's close + // path wins over any pending update), so the wait below is real quiescence, + // not a timing guess. moq_source_disconnect_locked(ctx); - // Wait for true quiescence rather than sleeping a fixed interval. Any - // callback that entered before shutting_down was set has incremented - // active_callbacks (via callback_guard) and will broadcast callbacks_done - // when it unwinds. Callbacks arriving after this point observe shutting_down - // under the mutex and never increment. This eliminates the use-after-free - // window that a timed sleep could not close (e.g. a decode callback running - // FFmpeg longer than the old 100ms budget). - while (ctx->active_callbacks > 0) - pthread_cond_wait(&ctx->callbacks_done, &ctx->mutex); + // Drop the owner reference, then wait for the outstanding subscriptions to + // deliver their terminal callbacks before freeing ctx. The generous bounded + // wait is a backstop against a subscription that never terminates (a libmoq + // bug or an unaccounted handle): far better to log and proceed than to hang + // OBS on source deletion. In normal operation the terminals arrive within + // milliseconds and the timeout is never reached. + if (--ctx->refs > 0) { + struct timespec deadline; + timespec_get(&deadline, TIME_UTC); + deadline.tv_sec += 2; + while (ctx->refs > 0) { + if (pthread_cond_timedwait(&ctx->refs_zero, &ctx->mutex, &deadline) == ETIMEDOUT) { + LOG_WARNING("Teardown timed out with %d MoQ callback(s) still outstanding", ctx->refs); + break; + } + } + } pthread_mutex_unlock(&ctx->mutex); bfree(ctx->url); bfree(ctx->broadcast); // Note: frame_buffer is already freed by moq_source_disconnect_locked - pthread_cond_destroy(&ctx->callbacks_done); + pthread_cond_destroy(&ctx->refs_zero); pthread_mutex_destroy(&ctx->mutex); bfree(ctx); @@ -301,17 +318,15 @@ static void on_session_status(void *user_data, int32_t code) { struct moq_source *ctx = (struct moq_source *)user_data; - // Register this callback for the duration of its execution. If shutdown has - // begun, entered() is false and we must not touch ctx. - callback_guard guard(ctx); - if (!guard.entered()) { - LOG_DEBUG("Ignoring session status callback - shutting down"); - return; - } + // Hold this session subscription's reference for the callback's lifetime. A + // terminal status (<= 0) means the session task has ended and will not touch + // ctx again, so the reference is released when `ref` goes out of scope. + subscription_ref ref(ctx, code <= 0); pthread_mutex_lock(&ctx->mutex); - // Double-check after acquiring lock (shutdown may have begun mid-callback) if (ctx->shutting_down.load()) { + // Teardown in progress; nothing to do (a terminal callback still + // releases its reference via `ref`). pthread_mutex_unlock(&ctx->mutex); return; } @@ -342,18 +357,12 @@ static void on_session_status(void *user_data, int32_t code) pthread_mutex_unlock(&ctx->mutex); LOG_DEBUG("MoQ session closed cleanly (generation %u)", current_gen); } else { - // Terminal error (e.g. auth failure, or reconnect gave up). + // Terminal error (e.g. auth failure, or reconnect gave up). Tear down + // every subscription, not just the session, so the catalog/video + // subscriptions also fire their terminal callbacks and release their + // references promptly instead of lingering until the source is destroyed. LOG_ERROR("MoQ session error: %d (generation %u)", code, current_gen); - - // Clean up the failed session/origin to prevent further callbacks. - if (ctx->session >= 0) { - moq_session_close(ctx->session); - ctx->session = -1; - } - if (ctx->origin >= 0) { - moq_origin_close(ctx->origin); - ctx->origin = -1; - } + moq_source_disconnect_locked(ctx); pthread_mutex_unlock(&ctx->mutex); // Blank the video to show error state @@ -365,115 +374,113 @@ static void on_catalog(void *user_data, int32_t catalog) { struct moq_source *ctx = (struct moq_source *)user_data; - LOG_INFO("Catalog callback received: %d", catalog); + // Hold the catalog subscription's reference for the callback's lifetime; + // release it when this is the terminal callback (catalog <= 0). + subscription_ref ref(ctx, catalog <= 0); - // Register this callback for the duration of its execution. - callback_guard guard(ctx); - if (!guard.entered()) { - LOG_DEBUG("Ignoring catalog callback - shutting down"); - if (catalog >= 0) - moq_consume_catalog_close(catalog); + if (catalog <= 0) { + if (catalog < 0) + LOG_ERROR("Catalog subscription error: %d", catalog); + else + LOG_DEBUG("Catalog subscription closed cleanly"); return; } - pthread_mutex_lock(&ctx->mutex); - - // Double-check after acquiring lock (shutdown may have begun mid-callback) - if (ctx->shutting_down.load()) { - pthread_mutex_unlock(&ctx->mutex); - if (catalog >= 0) - moq_consume_catalog_close(catalog); - return; - } + LOG_INFO("Catalog callback received: %d", catalog); - // Check if this callback is still valid (not from a stale connection) + // `catalog` is a catalog *snapshot* id (a different slab from the + // subscription handle stored in ctx->catalog_handle). It must be freed with + // moq_consume_catalog_free on every path below - never closed. + pthread_mutex_lock(&ctx->mutex); + bool stale = ctx->shutting_down.load() || ctx->consume < 0; uint32_t current_gen = ctx->generation; - if (ctx->consume < 0) { - // We've been disconnected, ignore this callback - pthread_mutex_unlock(&ctx->mutex); - if (catalog >= 0) - moq_consume_catalog_close(catalog); - return; - } - pthread_mutex_unlock(&ctx->mutex); - - if (catalog < 0) { - LOG_ERROR("Failed to get catalog: %d", catalog); - // Catalog failed (likely invalid broadcast) - blank video - moq_source_blank_video(ctx); + if (stale) { + // Disconnected or shutting down; ignore this snapshot. + moq_consume_catalog_free(catalog); return; } - // Get video configuration + // Get video configuration from the snapshot. struct moq_video_config video_config; if (moq_consume_video_config(catalog, 0, &video_config) < 0) { LOG_ERROR("Failed to get video config"); - moq_consume_catalog_close(catalog); + moq_consume_catalog_free(catalog); return; } - // Initialize decoder with the video config (takes mutex internally) + // Initialize decoder with the video config (takes mutex internally, and + // copies the codec/description out of the snapshot). if (!moq_source_init_decoder(ctx, &video_config)) { LOG_ERROR("Failed to initialize decoder"); - moq_consume_catalog_close(catalog); + moq_consume_catalog_free(catalog); return; } - // Subscribe to video track with minimal buffering - // Note: moq_consume_video_ordered takes the catalog handle, not the consume handle + // Pre-account for the video track subscription before handing ctx to libmoq, + // so its reference is in place the instant the subscription exists. Undone + // below only if creation fails. + pthread_mutex_lock(&ctx->mutex); + ctx->refs++; + pthread_mutex_unlock(&ctx->mutex); + + // Subscribe to the video track (index 0). This takes the catalog snapshot, + // not the consume handle, and does not retain it - so free the snapshot + // immediately after. int32_t track = moq_consume_video_ordered(catalog, 0, 0, on_video_frame, ctx); + moq_consume_catalog_free(catalog); if (track < 0) { LOG_ERROR("Failed to subscribe to video track: %d", track); - moq_consume_catalog_close(catalog); + pthread_mutex_lock(&ctx->mutex); + // No subscription was created, so no terminal will fire; undo the ref. + if (--ctx->refs == 0) + pthread_cond_broadcast(&ctx->refs_zero); + pthread_mutex_unlock(&ctx->mutex); return; } + // The video track subscription now exists and will deliver a terminal + // on_video_frame (<= 0) that releases the reference added above - even on the + // cleanup path below. pthread_mutex_lock(&ctx->mutex); - if (ctx->generation == current_gen) { + if (ctx->generation == current_gen && !ctx->shutting_down.load()) { + // A catalog update can arrive while a track is already subscribed; close + // the previous one so its terminal callback releases its reference (else + // it would linger until teardown's bounded wait). + int32_t old_track = ctx->video_track; ctx->video_track = track; - ctx->catalog_handle = catalog; + pthread_mutex_unlock(&ctx->mutex); + if (old_track >= 0) + moq_consume_video_close(old_track); + LOG_INFO("Subscribed to video track successfully"); } else { - // Generation changed while we were setting up, clean up the track + // Stale or shutting down: close the track we just created; its terminal + // callback releases the reference added above. pthread_mutex_unlock(&ctx->mutex); moq_consume_video_close(track); - moq_consume_catalog_close(catalog); - return; } - pthread_mutex_unlock(&ctx->mutex); - - LOG_INFO("Subscribed to video track successfully"); } static void on_video_frame(void *user_data, int32_t frame_id) { struct moq_source *ctx = (struct moq_source *)user_data; - if (frame_id < 0) { - LOG_ERROR("Video frame callback with error: %d", frame_id); - return; - } + // Hold the video track subscription's reference for the callback's lifetime + // (which includes the FFmpeg decode in moq_source_decode_frame); release it + // on the terminal callback (frame_id <= 0). + subscription_ref ref(ctx, frame_id <= 0); - // Register this callback for the duration of its execution (which includes - // the FFmpeg decode in moq_source_decode_frame). - callback_guard guard(ctx); - if (!guard.entered()) { - moq_consume_frame_close(frame_id); + if (frame_id <= 0) { + if (frame_id < 0) + LOG_ERROR("Video track error: %d", frame_id); + else + LOG_DEBUG("Video track closed cleanly"); return; } - // Check if this callback is still valid using generation (not video_track) - // Note: We can't check video_track here because frames may arrive before - // the track handle is stored in on_catalog (race condition) pthread_mutex_lock(&ctx->mutex); - // Double-check after acquiring lock (shutdown may have begun mid-callback) - if (ctx->shutting_down.load()) { - pthread_mutex_unlock(&ctx->mutex); - moq_consume_frame_close(frame_id); - return; - } - if (ctx->consume < 0) { - // We've been disconnected, ignore this callback + if (ctx->shutting_down.load() || ctx->consume < 0) { + // Shutting down or disconnected: drop the frame. pthread_mutex_unlock(&ctx->mutex); moq_consume_frame_close(frame_id); return; @@ -489,6 +496,15 @@ static void moq_source_reconnect(struct moq_source *ctx) // Increment generation to invalidate old callbacks pthread_mutex_lock(&ctx->mutex); + // Never start a new connection once teardown has begun: it would register a + // subscription after destroy already closed everything, leaking its + // reference until the bounded wait times out. (OBS serializes update/destroy + // so this is defense-in-depth.) + if (ctx->shutting_down.load()) { + pthread_mutex_unlock(&ctx->mutex); + return; + } + // Check if reconnect is already in progress if (ctx->reconnect_in_progress) { LOG_DEBUG("Reconnect already in progress, skipping"); @@ -528,6 +544,14 @@ static void moq_source_reconnect(struct moq_source *ctx) return; } + // Pre-account for the session subscription before handing ctx to libmoq: the + // connection can fail and fire its terminal on_session_status from the + // runtime thread immediately, and that must not decrement the reference + // before we have added it. + pthread_mutex_lock(&ctx->mutex); + ctx->refs++; + pthread_mutex_unlock(&ctx->mutex); + // Connect to MoQ server (consume will happen in on_session_status callback) int32_t new_session = moq_session_connect(url_copy, strlen(url_copy), 0, // origin_publish @@ -539,6 +563,10 @@ static void moq_source_reconnect(struct moq_source *ctx) LOG_ERROR("Failed to connect to MoQ server: %d", new_session); moq_origin_close(new_origin); pthread_mutex_lock(&ctx->mutex); + // No session subscription was created, so no terminal will fire; undo + // the reference we pre-added above. + if (--ctx->refs == 0) + pthread_cond_broadcast(&ctx->refs_zero); ctx->reconnect_in_progress = false; pthread_mutex_unlock(&ctx->mutex); return; @@ -612,6 +640,13 @@ static void moq_source_start_consume(struct moq_source *ctx, uint32_t expected_g ctx->consume = consume; pthread_mutex_unlock(&ctx->mutex); + // Pre-account for the catalog subscription before handing ctx to libmoq, so + // its reference is in place the instant the subscription exists (see the + // note on subscription_ref). Undone below only if creation fails. + pthread_mutex_lock(&ctx->mutex); + ctx->refs++; + pthread_mutex_unlock(&ctx->mutex); + // Subscribe to catalog updates int32_t catalog_handle = moq_consume_catalog(consume, on_catalog, ctx); if (catalog_handle < 0) { @@ -619,6 +654,9 @@ static void moq_source_start_consume(struct moq_source *ctx, uint32_t expected_g bfree(broadcast_copy); // Failed to get catalog - clean up pthread_mutex_lock(&ctx->mutex); + // No subscription was created, so no terminal will fire; undo the ref. + if (--ctx->refs == 0) + pthread_cond_broadcast(&ctx->refs_zero); if (ctx->generation == expected_gen) { if (ctx->consume >= 0) { moq_consume_close(ctx->consume); @@ -638,11 +676,32 @@ static void moq_source_start_consume(struct moq_source *ctx, uint32_t expected_g return; } + // The catalog subscription now exists and will deliver a terminal on_catalog + // (<= 0) that releases the reference added above. Store the subscription + // handle so disconnect can close it, which makes that terminal fire promptly. + // (This is the real subscription handle, distinct from the catalog snapshot + // ids delivered to on_catalog.) + pthread_mutex_lock(&ctx->mutex); + if (ctx->generation == expected_gen && !ctx->shutting_down.load()) { + ctx->catalog_handle = catalog_handle; + pthread_mutex_unlock(&ctx->mutex); + } else { + // Stale or shutting down: close it; its terminal releases the reference. + pthread_mutex_unlock(&ctx->mutex); + moq_consume_catalog_close(catalog_handle); + } + LOG_INFO("Consuming broadcast: %s", broadcast_copy); bfree(broadcast_copy); } -// NOTE: Caller must hold ctx->mutex when calling this function +// NOTE: Caller must hold ctx->mutex when calling this function. +// +// The moq_*_close calls below only *signal* each subscription to wind down; +// libmoq delivers the terminal callback asynchronously on its runtime thread, +// never synchronously from within close(). That is what lets us close under the +// mutex here without the terminal callback's subscription_ref re-entering the +// mutex on this same thread (which would self-deadlock). static void moq_source_disconnect_locked(struct moq_source *ctx) { if (ctx->video_track >= 0) { From 4a4d48c403769f71d25cd52c10d14ef71a249141 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 19 Jun 2026 17:21:10 -0700 Subject: [PATCH 5/7] ci: trust obsproject tap before installing clang-format Homebrew now refuses to run executables from an untrusted third-party tap, so the format check's `brew install obsproject/tools/clang-format@19` was skipped ("Skipping obsproject/tools because it is not trusted") and the step failed with a broken pipe before any file was checked. Explicitly tap and `brew trust obsproject/tools` first. Co-Authored-By: Claude Opus 4.8 --- .github/actions/run-clang-format/action.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/actions/run-clang-format/action.yaml b/.github/actions/run-clang-format/action.yaml index 3f3f8e5..99b9fe7 100644 --- a/.github/actions/run-clang-format/action.yaml +++ b/.github/actions/run-clang-format/action.yaml @@ -51,6 +51,10 @@ runs: if (( ${+RUNNER_DEBUG} )) setopt XTRACE print ::group::Install clang-format-19 + # Homebrew now refuses to run executables from an untrusted tap, so + # explicitly trust obsproject/tools before installing clang-format from it. + brew tap obsproject/tools + brew trust obsproject/tools brew install --quiet obsproject/tools/clang-format@19 print ::endgroup:: From 9e9334f7893e58281202978e110e29d3fa810b12 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 19 Jun 2026 17:36:58 -0700 Subject: [PATCH 6/7] Format moq-output.h with clang-format Bring the header into compliance with the repo's clang-format@19 config so the format check passes once the file is touched. Pure formatting, no behavior change. Co-Authored-By: Claude Opus 4.8 --- src/moq-output.h | 73 ++++++++++++++++++++++-------------------------- 1 file changed, 33 insertions(+), 40 deletions(-) diff --git a/src/moq-output.h b/src/moq-output.h index a46e6a5..45c5a1b 100644 --- a/src/moq-output.h +++ b/src/moq-output.h @@ -6,46 +6,39 @@ #include #include "logger.h" -class MoQOutput -{ - public: - MoQOutput(obs_data_t *settings, obs_output_t *output); - ~MoQOutput(); - - bool Start(); - void Stop(bool signal = true); - void Data(struct encoder_packet *packet); - - inline size_t GetTotalBytes() - { - return total_bytes_sent; - } - - inline int GetConnectTime() - { - return connect_time_ms; - } - - private: - void VideoInit(obs_encoder_t *encoder); - void VideoData(struct encoder_packet *packet); - void AudioInit(obs_encoder_t *encoder); - void AudioData(struct encoder_packet *packet); - - obs_output_t *output; - - std::string server_url; - std::string path; - - size_t total_bytes_sent; - int connect_time_ms; - std::chrono::steady_clock::time_point connect_start; - - int origin; - int session; - int broadcast; - std::map video_tracks; - std::map audio_tracks; +class MoQOutput { +public: + MoQOutput(obs_data_t *settings, obs_output_t *output); + ~MoQOutput(); + + bool Start(); + void Stop(bool signal = true); + void Data(struct encoder_packet *packet); + + inline size_t GetTotalBytes() { return total_bytes_sent; } + + inline int GetConnectTime() { return connect_time_ms; } + +private: + void VideoInit(obs_encoder_t *encoder); + void VideoData(struct encoder_packet *packet); + void AudioInit(obs_encoder_t *encoder); + void AudioData(struct encoder_packet *packet); + + obs_output_t *output; + + std::string server_url; + std::string path; + + size_t total_bytes_sent; + int connect_time_ms; + std::chrono::steady_clock::time_point connect_start; + + int origin; + int session; + int broadcast; + std::map video_tracks; + std::map audio_tracks; }; void register_moq_output(); From f1d39ac9dec82f62636eb029e6cfbd6ef95f9aba Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 19 Jun 2026 18:00:02 -0700 Subject: [PATCH 7/7] Fix MoQ output teardown use-after-free; review fixes Code review surfaced that the source-side teardown rework left the sibling MoQOutput with the same class of bug the 0.3.x bump makes live: - Use-after-free: ~MoQOutput -> Stop() calls moq_session_close, which only signals; libmoq delivers the session's terminal status callback (code <= 0) asynchronously on its runtime thread afterward and dereferences `self` (server_url, connect_time_ms, connect_start). OBS deletes the object as soon as ~MoQOutput returns, so the late callback touched freed memory. 0.3.x's auto-reconnect also fires the callback repeatedly mid-stream, widening the window. Fix by mirroring the source's pattern: count outstanding session subscriptions (pre-incremented before moq_session_connect, released by the terminal callback) and have the destructor wait for the count to reach zero, with the same bounded backstop so a missing terminal warns instead of hanging. - Data race: connect_time_ms is written by the callback (runtime thread) and read by GetConnectTime() (OBS thread); make it std::atomic. Also (source): restore the blank-video-on-catalog-error behavior that the terminal-callback refactor dropped, so an invalid/failed broadcast still clears the preview (except during our own teardown). Co-Authored-By: Claude Opus 4.8 --- src/moq-output.cpp | 31 +++++++++++++++++++++++++++++-- src/moq-output.h | 17 ++++++++++++++++- src/moq-source.cpp | 9 +++++++-- 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/src/moq-output.cpp b/src/moq-output.cpp index 5c42a4f..f669101 100644 --- a/src/moq-output.cpp +++ b/src/moq-output.cpp @@ -15,7 +15,8 @@ MoQOutput::MoQOutput(obs_data_t *, obs_output_t *output) connect_time_ms(0), origin(moq_origin_create()), session(0), - broadcast(moq_publish_create()) + broadcast(moq_publish_create()), + outstanding_sessions(0) { } @@ -25,6 +26,14 @@ MoQOutput::~MoQOutput() moq_origin_close(origin); Stop(); + + // Wait for any outstanding session terminal callback to fire before `this` + // is freed, so a late callback on the libmoq runtime thread can't touch freed + // memory. Bounded so a missing terminal degrades to a warning, not a hang. + std::unique_lock lock(session_mutex); + if (!session_cv.wait_for(lock, std::chrono::seconds(2), [this] { return outstanding_sessions == 0; })) + LOG_WARNING("Output teardown timed out with %d MoQ session callback(s) outstanding", + outstanding_sessions); } bool MoQOutput::Start() @@ -85,18 +94,36 @@ bool MoQOutput::Start() auto elapsed = std::chrono::steady_clock::now() - self->connect_start; self->connect_time_ms = static_cast( std::chrono::duration_cast(elapsed).count()); - LOG_INFO("MoQ session connected (%d ms, epoch %d): %s", self->connect_time_ms, code, + LOG_INFO("MoQ session connected (%d ms, epoch %d): %s", self->connect_time_ms.load(), code, self->server_url.c_str()); } else { + // Terminal callback (0 = clean close, < 0 = fatal): the session task + // has ended and will not touch `self` again. Release the lifetime + // reference the destructor waits on. This is the last access to `self`. LOG_INFO("MoQ session closed (%d): %s", code, self->server_url.c_str()); + std::lock_guard lock(self->session_mutex); + if (--self->outstanding_sessions == 0) + self->session_cv.notify_all(); } }; + // Pre-account for the session subscription before handing `this` to libmoq: + // the connection can fail and fire its terminal callback before connect + // returns, and the destructor must wait for that callback. + { + std::lock_guard lock(session_mutex); + outstanding_sessions++; + } + // Start establishing a session with the MoQ server // NOTE: You could publish the same broadcasts to multiple sessions if you want (redundant ingest). session = moq_session_connect(server_url.data(), server_url.size(), origin, 0, session_connect_callback, this); if (session < 0) { LOG_ERROR("Failed to initialize MoQ server: %d", session); + // No subscription was created, so no terminal will fire; undo the ref. + std::lock_guard lock(session_mutex); + if (--outstanding_sessions == 0) + session_cv.notify_all(); return false; } diff --git a/src/moq-output.h b/src/moq-output.h index 45c5a1b..15bb525 100644 --- a/src/moq-output.h +++ b/src/moq-output.h @@ -1,8 +1,11 @@ #pragma once #include +#include #include +#include #include +#include #include #include "logger.h" @@ -31,12 +34,24 @@ class MoQOutput { std::string path; size_t total_bytes_sent; - int connect_time_ms; + // Written by the session status callback (libmoq runtime thread), read by + // GetConnectTime() (OBS thread); atomic to avoid a data race. + std::atomic connect_time_ms; std::chrono::steady_clock::time_point connect_start; int origin; int session; int broadcast; + + // Session subscription lifetime. libmoq delivers a terminal status callback + // (code <= 0) asynchronously on its runtime thread after moq_session_close, + // and may touch `this` until then. outstanding_sessions counts sessions whose + // terminal callback hasn't fired; the destructor waits for it to reach zero + // so a late callback can't touch freed memory. + std::mutex session_mutex; + std::condition_variable session_cv; + int outstanding_sessions; + std::map video_tracks; std::map audio_tracks; }; diff --git a/src/moq-source.cpp b/src/moq-source.cpp index c0473db..f5870e1 100644 --- a/src/moq-source.cpp +++ b/src/moq-source.cpp @@ -379,10 +379,15 @@ static void on_catalog(void *user_data, int32_t catalog) subscription_ref ref(ctx, catalog <= 0); if (catalog <= 0) { - if (catalog < 0) + if (catalog < 0) { LOG_ERROR("Catalog subscription error: %d", catalog); - else + // Surface the failure (e.g. invalid broadcast) by blanking, as the + // old catalog-fetch-failed path did - but not during our own teardown. + if (!ctx->shutting_down.load()) + moq_source_blank_video(ctx); + } else { LOG_DEBUG("Catalog subscription closed cleanly"); + } return; }