diff --git a/.github/actions/run-clang-format/action.yaml b/.github/actions/run-clang-format/action.yaml index 3f3f8e5..99b9fe7 100644 --- a/.github/actions/run-clang-format/action.yaml +++ b/.github/actions/run-clang-format/action.yaml @@ -51,6 +51,10 @@ runs: if (( ${+RUNNER_DEBUG} )) setopt XTRACE print ::group::Install clang-format-19 + # Homebrew now refuses to run executables from an untrusted tap, so + # explicitly trust obsproject/tools before installing clang-format from it. + brew tap obsproject/tools + brew trust obsproject/tools brew install --quiet obsproject/tools/clang-format@19 print ::endgroup:: diff --git a/CMakePresets.json b/CMakePresets.json index b47a12a..6ebb63d 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -14,7 +14,7 @@ "ENABLE_QT": true, "CMAKE_EXPORT_COMPILE_COMMANDS": true, "BUILD_PLUGIN": true, - "MOQ_VERSION": "0.2.14", + "MOQ_VERSION": "0.3.6", "MOQ_ARCHIVE": "tar.gz" } }, diff --git a/src/moq-output.cpp b/src/moq-output.cpp index e08af33..f669101 100644 --- a/src/moq-output.cpp +++ b/src/moq-output.cpp @@ -15,7 +15,8 @@ MoQOutput::MoQOutput(obs_data_t *, obs_output_t *output) connect_time_ms(0), origin(moq_origin_create()), session(0), - broadcast(moq_publish_create()) + broadcast(moq_publish_create()), + outstanding_sessions(0) { } @@ -25,6 +26,14 @@ MoQOutput::~MoQOutput() moq_origin_close(origin); Stop(); + + // Wait for any outstanding session terminal callback to fire before `this` + // is freed, so a late callback on the libmoq runtime thread can't touch freed + // memory. Bounded so a missing terminal degrades to a warning, not a hang. + std::unique_lock lock(session_mutex); + if (!session_cv.wait_for(lock, std::chrono::seconds(2), [this] { return outstanding_sessions == 0; })) + LOG_WARNING("Output teardown timed out with %d MoQ session callback(s) outstanding", + outstanding_sessions); } bool MoQOutput::Start() @@ -75,25 +84,46 @@ bool MoQOutput::Start() connect_start = std::chrono::steady_clock::now(); - // Create a callback to log when the session is connected or closed - auto session_connect_callback = [](void *user_data, int error_code) { + // Create a callback to log when the session is connected or closed. + // libmoq status codes (>= 0.3.0): > 0 = (re)connected (epoch), 0 = closed + // cleanly (terminal), < 0 = fatal/reconnect-gave-up (terminal). + auto session_connect_callback = [](void *user_data, int code) { auto self = static_cast(user_data); - if (error_code == 0) { + if (code > 0) { auto elapsed = std::chrono::steady_clock::now() - self->connect_start; - self->connect_time_ms = static_cast(std::chrono::duration_cast(elapsed).count()); - LOG_INFO("MoQ session established (%d ms): %s", self->connect_time_ms, + self->connect_time_ms = static_cast( + std::chrono::duration_cast(elapsed).count()); + LOG_INFO("MoQ session connected (%d ms, epoch %d): %s", self->connect_time_ms.load(), code, self->server_url.c_str()); } else { - LOG_INFO("MoQ session closed (%d): %s", error_code, self->server_url.c_str()); + // Terminal callback (0 = clean close, < 0 = fatal): the session task + // has ended and will not touch `self` again. Release the lifetime + // reference the destructor waits on. This is the last access to `self`. + LOG_INFO("MoQ session closed (%d): %s", code, self->server_url.c_str()); + std::lock_guard lock(self->session_mutex); + if (--self->outstanding_sessions == 0) + self->session_cv.notify_all(); } }; + // Pre-account for the session subscription before handing `this` to libmoq: + // the connection can fail and fire its terminal callback before connect + // returns, and the destructor must wait for that callback. + { + std::lock_guard lock(session_mutex); + outstanding_sessions++; + } + // Start establishing a session with the MoQ server // NOTE: You could publish the same broadcasts to multiple sessions if you want (redundant ingest). session = moq_session_connect(server_url.data(), server_url.size(), origin, 0, session_connect_callback, this); if (session < 0) { LOG_ERROR("Failed to initialize MoQ server: %d", session); + // No subscription was created, so no terminal will fire; undo the ref. + std::lock_guard lock(session_mutex); + if (--outstanding_sessions == 0) + session_cv.notify_all(); return false; } diff --git a/src/moq-output.h b/src/moq-output.h index a46e6a5..15bb525 100644 --- a/src/moq-output.h +++ b/src/moq-output.h @@ -1,51 +1,59 @@ #pragma once #include +#include #include +#include #include +#include #include #include "logger.h" -class MoQOutput -{ - public: - MoQOutput(obs_data_t *settings, obs_output_t *output); - ~MoQOutput(); - - bool Start(); - void Stop(bool signal = true); - void Data(struct encoder_packet *packet); - - inline size_t GetTotalBytes() - { - return total_bytes_sent; - } - - inline int GetConnectTime() - { - return connect_time_ms; - } - - private: - void VideoInit(obs_encoder_t *encoder); - void VideoData(struct encoder_packet *packet); - void AudioInit(obs_encoder_t *encoder); - void AudioData(struct encoder_packet *packet); - - obs_output_t *output; - - std::string server_url; - std::string path; - - size_t total_bytes_sent; - int connect_time_ms; - std::chrono::steady_clock::time_point connect_start; - - int origin; - int session; - int broadcast; - std::map video_tracks; - std::map audio_tracks; +class MoQOutput { +public: + MoQOutput(obs_data_t *settings, obs_output_t *output); + ~MoQOutput(); + + bool Start(); + void Stop(bool signal = true); + void Data(struct encoder_packet *packet); + + inline size_t GetTotalBytes() { return total_bytes_sent; } + + inline int GetConnectTime() { return connect_time_ms; } + +private: + void VideoInit(obs_encoder_t *encoder); + void VideoData(struct encoder_packet *packet); + void AudioInit(obs_encoder_t *encoder); + void AudioData(struct encoder_packet *packet); + + obs_output_t *output; + + std::string server_url; + std::string path; + + size_t total_bytes_sent; + // Written by the session status callback (libmoq runtime thread), read by + // GetConnectTime() (OBS thread); atomic to avoid a data race. + std::atomic connect_time_ms; + std::chrono::steady_clock::time_point connect_start; + + int origin; + int session; + int broadcast; + + // Session subscription lifetime. libmoq delivers a terminal status callback + // (code <= 0) asynchronously on its runtime thread after moq_session_close, + // and may touch `this` until then. outstanding_sessions counts sessions whose + // terminal callback hasn't fired; the destructor waits for it to reach zero + // so a late callback can't touch freed memory. + std::mutex session_mutex; + std::condition_variable session_cv; + int outstanding_sessions; + + std::map video_tracks; + std::map audio_tracks; }; void register_moq_output(); diff --git a/src/moq-source.cpp b/src/moq-source.cpp index 824b189..f5870e1 100644 --- a/src/moq-source.cpp +++ b/src/moq-source.cpp @@ -5,6 +5,8 @@ #include #include +#include +#include #ifdef _WIN32 #define strncasecmp _strnicmp @@ -28,28 +30,23 @@ static AVCodecID codec_string_to_id(const char *codec, size_t len) } // H.264/AVC - if ((len >= 4 && strncasecmp(codec, "h264", 4) == 0) || - (len >= 3 && strncasecmp(codec, "avc", 3) == 0)) { + if ((len >= 4 && strncasecmp(codec, "h264", 4) == 0) || (len >= 3 && strncasecmp(codec, "avc", 3) == 0)) { return AV_CODEC_ID_H264; } // HEVC/H.265 - if ((len >= 4 && strncasecmp(codec, "hevc", 4) == 0) || - (len >= 4 && strncasecmp(codec, "h265", 4) == 0) || - (len >= 4 && strncasecmp(codec, "hev1", 4) == 0) || - (len >= 4 && strncasecmp(codec, "hvc1", 4) == 0)) { + if ((len >= 4 && strncasecmp(codec, "hevc", 4) == 0) || (len >= 4 && strncasecmp(codec, "h265", 4) == 0) || + (len >= 4 && strncasecmp(codec, "hev1", 4) == 0) || (len >= 4 && strncasecmp(codec, "hvc1", 4) == 0)) { return AV_CODEC_ID_HEVC; } // VP9 - if ((len >= 3 && strncasecmp(codec, "vp9", 3) == 0) || - (len >= 4 && strncasecmp(codec, "vp09", 4) == 0)) { + if ((len >= 3 && strncasecmp(codec, "vp9", 3) == 0) || (len >= 4 && strncasecmp(codec, "vp09", 4) == 0)) { return AV_CODEC_ID_VP9; } // AV1 - if ((len >= 3 && strncasecmp(codec, "av1", 3) == 0) || - (len >= 4 && strncasecmp(codec, "av01", 4) == 0)) { + if ((len >= 3 && strncasecmp(codec, "av1", 3) == 0) || (len >= 4 && strncasecmp(codec, "av01", 4) == 0)) { return AV_CODEC_ID_AV1; } @@ -71,9 +68,20 @@ struct moq_source { // Shutdown flag - set when destroy begins, callbacks should exit early std::atomic shutting_down; + // Lifetime reference count, guarded by mutex. ctx must outlive every libmoq + // subscription that was handed `ctx` as user_data: each delivers exactly one + // terminal callback (status <= 0), the documented last touch of user_data + // (libmoq >= 0.3.0). We hold one reference for the OBS-owned source plus one + // per outstanding subscription (session, catalog, video track); a + // subscription's reference is released by its terminal callback (see + // subscription_ref). destroy drops the owner reference and waits for the + // count to reach zero before freeing. + int refs; + pthread_cond_t refs_zero; // signaled when refs reaches 0 + // Session handles (all negative = invalid) - std::atomic generation; // Increments on reconnect - bool reconnect_in_progress; // True while reconnect is happening + std::atomic generation; // Increments on reconnect + bool reconnect_in_progress; // True while reconnect is happening int32_t origin; int32_t session; int32_t consume; @@ -82,12 +90,12 @@ struct moq_source { // Decoder state AVCodecContext *codec_ctx; - AVCodecID current_codec_id; // Currently configured codec - enum AVPixelFormat current_pix_fmt; // Current pixel format for sws_ctx + AVCodecID current_codec_id; // Currently configured codec + enum AVPixelFormat current_pix_fmt; // Current pixel format for sws_ctx struct SwsContext *sws_ctx; bool got_keyframe; - uint32_t frames_waiting_for_keyframe; // Count of skipped frames while waiting - uint32_t consecutive_decode_errors; // Count of consecutive decode failures + uint32_t frames_waiting_for_keyframe; // Count of skipped frames while waiting + uint32_t consecutive_decode_errors; // Count of consecutive decode failures // Output frame buffer struct obs_source_frame frame; @@ -97,6 +105,41 @@ struct moq_source { pthread_mutex_t mutex; }; +// RAII helper that releases a subscription's lifetime reference when its async +// callback returns. libmoq (>= 0.3.0) hands `ctx` to each subscription as +// user_data and guarantees exactly one terminal callback (status code <= 0), +// after which user_data is never touched again. Each callback constructs one of +// these with `terminal` set on that terminal code; on scope exit it drops the +// matching reference and wakes moq_source_destroy when the last one is gone. +// Releasing on scope exit (not entry) keeps ctx valid for the whole callback +// body, including a terminal callback that still reads ctx before returning. +// +// Because libmoq runs all callbacks on a single runtime thread, a subscription's +// reference is held continuously from registration through its terminal +// callback, so ctx is always valid on entry to any of its callbacks - no +// shutting_down pre-check is needed for safety. +namespace { +struct subscription_ref { + struct moq_source *ctx; + bool terminal; + + subscription_ref(struct moq_source *c, bool is_terminal) : ctx(c), terminal(is_terminal) {} + + ~subscription_ref() + { + if (!terminal) + return; + pthread_mutex_lock(&ctx->mutex); + if (--ctx->refs == 0) + pthread_cond_broadcast(&ctx->refs_zero); + pthread_mutex_unlock(&ctx->mutex); + } + + subscription_ref(const subscription_ref &) = delete; + subscription_ref &operator=(const subscription_ref &) = delete; +}; +} // namespace + // Forward declarations static void moq_source_update(void *data, obs_data_t *settings); static void moq_source_destroy(void *data); @@ -124,6 +167,11 @@ static void *moq_source_create(obs_data_t *settings, obs_source_t *source) // Initialize shutdown flag ctx->shutting_down = false; + // One lifetime reference for the OBS-owned source itself; each subscription + // adds its own while outstanding. + ctx->refs = 1; + pthread_cond_init(&ctx->refs_zero, NULL); + // Initialize handles to invalid values ctx->generation = 0; ctx->reconnect_in_progress = false; @@ -163,32 +211,40 @@ static void moq_source_destroy(void *data) { struct moq_source *ctx = (struct moq_source *)data; - // Set shutdown flag first - callbacks will check this and exit early pthread_mutex_lock(&ctx->mutex); ctx->shutting_down = true; + + // Close every subscription. Each was handed `ctx` as user_data and now + // delivers its terminal callback, which releases the matching reference. + // Closing via the handle makes that terminal fire promptly (libmoq's close + // path wins over any pending update), so the wait below is real quiescence, + // not a timing guess. moq_source_disconnect_locked(ctx); - pthread_mutex_unlock(&ctx->mutex); - // Give MoQ callbacks time to drain - they check shutting_down and exit early. - // This prevents use-after-free when async callbacks fire after ctx is freed. - // - // LIMITATION: This 100ms sleep is a timing-based workaround, not a synchronization - // guarantee. If a callback is mid-execution when shutting_down is set AND takes - // longer than 100ms to complete (after the mutex unlock), there is still a - // potential race condition. In practice, our callbacks are fast (< 1ms typically) - // and this delay provides sufficient margin. However, a more robust solution - // would use reference counting: - // - Increment refcount when entering a callback - // - Decrement when exiting - // - Wait for refcount to reach zero before freeing ctx - // This could be implemented using std::shared_ptr or a manual atomic refcount - // with a condition variable for waiting. - os_sleep_ms(100); + // Drop the owner reference, then wait for the outstanding subscriptions to + // deliver their terminal callbacks before freeing ctx. The generous bounded + // wait is a backstop against a subscription that never terminates (a libmoq + // bug or an unaccounted handle): far better to log and proceed than to hang + // OBS on source deletion. In normal operation the terminals arrive within + // milliseconds and the timeout is never reached. + if (--ctx->refs > 0) { + struct timespec deadline; + timespec_get(&deadline, TIME_UTC); + deadline.tv_sec += 2; + while (ctx->refs > 0) { + if (pthread_cond_timedwait(&ctx->refs_zero, &ctx->mutex, &deadline) == ETIMEDOUT) { + LOG_WARNING("Teardown timed out with %d MoQ callback(s) still outstanding", ctx->refs); + break; + } + } + } + pthread_mutex_unlock(&ctx->mutex); bfree(ctx->url); bfree(ctx->broadcast); // Note: frame_buffer is already freed by moq_source_disconnect_locked + pthread_cond_destroy(&ctx->refs_zero); pthread_mutex_destroy(&ctx->mutex); bfree(ctx); @@ -204,12 +260,11 @@ static void moq_source_update(void *data, obs_data_t *settings) pthread_mutex_lock(&ctx->mutex); // Check if settings actually changed - bool url_changed = (!ctx->url && url && strlen(url) > 0) || - (ctx->url && !url) || - (ctx->url && url && strcmp(ctx->url, url) != 0); + bool url_changed = (!ctx->url && url && strlen(url) > 0) || (ctx->url && !url) || + (ctx->url && url && strcmp(ctx->url, url) != 0); bool broadcast_changed = (!ctx->broadcast && broadcast && strlen(broadcast) > 0) || - (ctx->broadcast && !broadcast) || - (ctx->broadcast && broadcast && strcmp(ctx->broadcast, broadcast) != 0); + (ctx->broadcast && !broadcast) || + (ctx->broadcast && broadcast && strcmp(ctx->broadcast, broadcast) != 0); bool settings_changed = url_changed || broadcast_changed; // Store the new settings @@ -219,15 +274,14 @@ static void moq_source_update(void *data, obs_data_t *settings) ctx->broadcast = bstrdup(broadcast); // Check if new settings are valid for connection - bool valid = ctx->url && ctx->broadcast && - strlen(ctx->url) > 0 && strlen(ctx->broadcast) > 0; + bool valid = ctx->url && ctx->broadcast && strlen(ctx->url) > 0 && strlen(ctx->broadcast) > 0; pthread_mutex_unlock(&ctx->mutex); // If settings changed and are valid, reconnect if (settings_changed && valid) { - LOG_INFO("Settings changed, reconnecting (url=%s, broadcast=%s)", - url ? url : "(null)", broadcast ? broadcast : "(null)"); + LOG_INFO("Settings changed, reconnecting (url=%s, broadcast=%s)", url ? url : "(null)", + broadcast ? broadcast : "(null)"); moq_source_reconnect(ctx); } else if (settings_changed && !valid) { LOG_INFO("Settings changed but invalid - disconnecting"); @@ -264,15 +318,15 @@ static void on_session_status(void *user_data, int32_t code) { struct moq_source *ctx = (struct moq_source *)user_data; - // Fast path: check atomic flag before taking lock - if (ctx->shutting_down.load()) { - LOG_DEBUG("Ignoring session status callback - shutting down"); - return; - } + // Hold this session subscription's reference for the callback's lifetime. A + // terminal status (<= 0) means the session task has ended and will not touch + // ctx again, so the reference is released when `ref` goes out of scope. + subscription_ref ref(ctx, code <= 0); pthread_mutex_lock(&ctx->mutex); - // Double-check after acquiring lock (may have changed) if (ctx->shutting_down.load()) { + // Teardown in progress; nothing to do (a terminal callback still + // releases its reference via `ref`). pthread_mutex_unlock(&ctx->mutex); return; } @@ -283,24 +337,32 @@ static void on_session_status(void *user_data, int32_t code) } uint32_t current_gen = ctx->generation; - if (code == 0) { + // libmoq status codes (>= 0.3.0): + // > 0 : (re)connected, carrying the connection epoch (1 = first connect, + // 2 = first reconnect, ...). The session auto-reconnects internally. + // = 0 : closed cleanly via moq_session_close (terminal) - we initiated it. + // < 0 : reconnect permanently gave up or fatal error (terminal). + if (code > 0) { pthread_mutex_unlock(&ctx->mutex); - LOG_INFO("MoQ session connected successfully (generation %u)", current_gen); - // Now that we're connected, start consuming the broadcast - moq_source_start_consume(ctx, current_gen); - } else { - // Connection failed - clean up the session and origin immediately - LOG_ERROR("MoQ session failed with code: %d (generation %u)", code, current_gen); - - // Clean up failed session/origin to prevent further callbacks - if (ctx->session >= 0) { - moq_session_close(ctx->session); - ctx->session = -1; - } - if (ctx->origin >= 0) { - moq_origin_close(ctx->origin); - ctx->origin = -1; + LOG_INFO("MoQ session connected (generation %u, epoch %d)", current_gen, code); + // Start consuming only on the first connect. On later epochs libmoq has + // re-subscribed our existing consumer automatically (the origin outlives + // the connection), so recreating it would leak handles. + if (code == 1) { + moq_source_start_consume(ctx, current_gen); } + } else if (code == 0) { + // Clean close - we asked for this (disconnect/reconnect/destroy). The + // handle is already being torn down; nothing to do here. + pthread_mutex_unlock(&ctx->mutex); + LOG_DEBUG("MoQ session closed cleanly (generation %u)", current_gen); + } else { + // Terminal error (e.g. auth failure, or reconnect gave up). Tear down + // every subscription, not just the session, so the catalog/video + // subscriptions also fire their terminal callbacks and release their + // references promptly instead of lingering until the source is destroyed. + LOG_ERROR("MoQ session error: %d (generation %u)", code, current_gen); + moq_source_disconnect_locked(ctx); pthread_mutex_unlock(&ctx->mutex); // Blank the video to show error state @@ -312,112 +374,118 @@ static void on_catalog(void *user_data, int32_t catalog) { struct moq_source *ctx = (struct moq_source *)user_data; - LOG_INFO("Catalog callback received: %d", catalog); - - // Fast path: check atomic flag before taking lock - if (ctx->shutting_down.load()) { - LOG_DEBUG("Ignoring catalog callback - shutting down"); - if (catalog >= 0) - moq_consume_catalog_close(catalog); + // Hold the catalog subscription's reference for the callback's lifetime; + // release it when this is the terminal callback (catalog <= 0). + subscription_ref ref(ctx, catalog <= 0); + + if (catalog <= 0) { + if (catalog < 0) { + LOG_ERROR("Catalog subscription error: %d", catalog); + // Surface the failure (e.g. invalid broadcast) by blanking, as the + // old catalog-fetch-failed path did - but not during our own teardown. + if (!ctx->shutting_down.load()) + moq_source_blank_video(ctx); + } else { + LOG_DEBUG("Catalog subscription closed cleanly"); + } return; } - pthread_mutex_lock(&ctx->mutex); - - // Double-check after acquiring lock (may have changed) - if (ctx->shutting_down.load()) { - pthread_mutex_unlock(&ctx->mutex); - if (catalog >= 0) - moq_consume_catalog_close(catalog); - return; - } + LOG_INFO("Catalog callback received: %d", catalog); - // Check if this callback is still valid (not from a stale connection) + // `catalog` is a catalog *snapshot* id (a different slab from the + // subscription handle stored in ctx->catalog_handle). It must be freed with + // moq_consume_catalog_free on every path below - never closed. + pthread_mutex_lock(&ctx->mutex); + bool stale = ctx->shutting_down.load() || ctx->consume < 0; uint32_t current_gen = ctx->generation; - if (ctx->consume < 0) { - // We've been disconnected, ignore this callback - pthread_mutex_unlock(&ctx->mutex); - if (catalog >= 0) - moq_consume_catalog_close(catalog); - return; - } - pthread_mutex_unlock(&ctx->mutex); - - if (catalog < 0) { - LOG_ERROR("Failed to get catalog: %d", catalog); - // Catalog failed (likely invalid broadcast) - blank video - moq_source_blank_video(ctx); + if (stale) { + // Disconnected or shutting down; ignore this snapshot. + moq_consume_catalog_free(catalog); return; } - // Get video configuration + // Get video configuration from the snapshot. struct moq_video_config video_config; if (moq_consume_video_config(catalog, 0, &video_config) < 0) { LOG_ERROR("Failed to get video config"); - moq_consume_catalog_close(catalog); + moq_consume_catalog_free(catalog); return; } - // Initialize decoder with the video config (takes mutex internally) + // Initialize decoder with the video config (takes mutex internally, and + // copies the codec/description out of the snapshot). if (!moq_source_init_decoder(ctx, &video_config)) { LOG_ERROR("Failed to initialize decoder"); - moq_consume_catalog_close(catalog); + moq_consume_catalog_free(catalog); return; } - // Subscribe to video track with minimal buffering - // Note: moq_consume_video_ordered takes the catalog handle, not the consume handle + // Pre-account for the video track subscription before handing ctx to libmoq, + // so its reference is in place the instant the subscription exists. Undone + // below only if creation fails. + pthread_mutex_lock(&ctx->mutex); + ctx->refs++; + pthread_mutex_unlock(&ctx->mutex); + + // Subscribe to the video track (index 0). This takes the catalog snapshot, + // not the consume handle, and does not retain it - so free the snapshot + // immediately after. int32_t track = moq_consume_video_ordered(catalog, 0, 0, on_video_frame, ctx); + moq_consume_catalog_free(catalog); if (track < 0) { LOG_ERROR("Failed to subscribe to video track: %d", track); - moq_consume_catalog_close(catalog); + pthread_mutex_lock(&ctx->mutex); + // No subscription was created, so no terminal will fire; undo the ref. + if (--ctx->refs == 0) + pthread_cond_broadcast(&ctx->refs_zero); + pthread_mutex_unlock(&ctx->mutex); return; } + // The video track subscription now exists and will deliver a terminal + // on_video_frame (<= 0) that releases the reference added above - even on the + // cleanup path below. pthread_mutex_lock(&ctx->mutex); - if (ctx->generation == current_gen) { + if (ctx->generation == current_gen && !ctx->shutting_down.load()) { + // A catalog update can arrive while a track is already subscribed; close + // the previous one so its terminal callback releases its reference (else + // it would linger until teardown's bounded wait). + int32_t old_track = ctx->video_track; ctx->video_track = track; - ctx->catalog_handle = catalog; + pthread_mutex_unlock(&ctx->mutex); + if (old_track >= 0) + moq_consume_video_close(old_track); + LOG_INFO("Subscribed to video track successfully"); } else { - // Generation changed while we were setting up, clean up the track + // Stale or shutting down: close the track we just created; its terminal + // callback releases the reference added above. pthread_mutex_unlock(&ctx->mutex); moq_consume_video_close(track); - moq_consume_catalog_close(catalog); - return; } - pthread_mutex_unlock(&ctx->mutex); - - LOG_INFO("Subscribed to video track successfully"); } static void on_video_frame(void *user_data, int32_t frame_id) { struct moq_source *ctx = (struct moq_source *)user_data; - if (frame_id < 0) { - LOG_ERROR("Video frame callback with error: %d", frame_id); - return; - } + // Hold the video track subscription's reference for the callback's lifetime + // (which includes the FFmpeg decode in moq_source_decode_frame); release it + // on the terminal callback (frame_id <= 0). + subscription_ref ref(ctx, frame_id <= 0); - // Fast path: check atomic flag before taking lock - if (ctx->shutting_down.load()) { - moq_consume_frame_close(frame_id); + if (frame_id <= 0) { + if (frame_id < 0) + LOG_ERROR("Video track error: %d", frame_id); + else + LOG_DEBUG("Video track closed cleanly"); return; } - // Check if this callback is still valid using generation (not video_track) - // Note: We can't check video_track here because frames may arrive before - // the track handle is stored in on_catalog (race condition) pthread_mutex_lock(&ctx->mutex); - // Double-check after acquiring lock (may have changed) - if (ctx->shutting_down.load()) { - pthread_mutex_unlock(&ctx->mutex); - moq_consume_frame_close(frame_id); - return; - } - if (ctx->consume < 0) { - // We've been disconnected, ignore this callback + if (ctx->shutting_down.load() || ctx->consume < 0) { + // Shutting down or disconnected: drop the frame. pthread_mutex_unlock(&ctx->mutex); moq_consume_frame_close(frame_id); return; @@ -433,6 +501,15 @@ static void moq_source_reconnect(struct moq_source *ctx) // Increment generation to invalidate old callbacks pthread_mutex_lock(&ctx->mutex); + // Never start a new connection once teardown has begun: it would register a + // subscription after destroy already closed everything, leaking its + // reference until the bounded wait times out. (OBS serializes update/destroy + // so this is defense-in-depth.) + if (ctx->shutting_down.load()) { + pthread_mutex_unlock(&ctx->mutex); + return; + } + // Check if reconnect is already in progress if (ctx->reconnect_in_progress) { LOG_DEBUG("Reconnect already in progress, skipping"); @@ -453,8 +530,13 @@ static void moq_source_reconnect(struct moq_source *ctx) // Blank video while reconnecting to avoid showing stale frames moq_source_blank_video(ctx); - // Small delay to allow MoQ library to fully clean up previous connection - os_sleep_ms(50); + // No delay needed before reconnecting: libmoq origins and sessions are fully + // independent (each origin is a distinct random instance, each session its own + // task), so the new connection shares no client-side state with the one we just + // closed. moq_origin_close removes the origin synchronously, and moq_session_close + // only signals the old session's task to wind down asynchronously on the libmoq + // runtime thread - nothing the new origin/session can collide with. (The previous + // os_sleep_ms(50) here was a timing band-aid that provided no real guarantee.) // Create origin for consuming (outside mutex since it may block) int32_t new_origin = moq_origin_create(); @@ -467,19 +549,29 @@ static void moq_source_reconnect(struct moq_source *ctx) return; } + // Pre-account for the session subscription before handing ctx to libmoq: the + // connection can fail and fire its terminal on_session_status from the + // runtime thread immediately, and that must not decrement the reference + // before we have added it. + pthread_mutex_lock(&ctx->mutex); + ctx->refs++; + pthread_mutex_unlock(&ctx->mutex); + // Connect to MoQ server (consume will happen in on_session_status callback) - int32_t new_session = moq_session_connect( - url_copy, strlen(url_copy), - 0, // origin_publish - new_origin, // origin_consume - on_session_status, ctx - ); + int32_t new_session = moq_session_connect(url_copy, strlen(url_copy), + 0, // origin_publish + new_origin, // origin_consume + on_session_status, ctx); bfree(url_copy); if (new_session < 0) { LOG_ERROR("Failed to connect to MoQ server: %d", new_session); moq_origin_close(new_origin); pthread_mutex_lock(&ctx->mutex); + // No session subscription was created, so no terminal will fire; undo + // the reference we pre-added above. + if (--ctx->refs == 0) + pthread_cond_broadcast(&ctx->refs_zero); ctx->reconnect_in_progress = false; pthread_mutex_unlock(&ctx->mutex); return; @@ -553,6 +645,13 @@ static void moq_source_start_consume(struct moq_source *ctx, uint32_t expected_g ctx->consume = consume; pthread_mutex_unlock(&ctx->mutex); + // Pre-account for the catalog subscription before handing ctx to libmoq, so + // its reference is in place the instant the subscription exists (see the + // note on subscription_ref). Undone below only if creation fails. + pthread_mutex_lock(&ctx->mutex); + ctx->refs++; + pthread_mutex_unlock(&ctx->mutex); + // Subscribe to catalog updates int32_t catalog_handle = moq_consume_catalog(consume, on_catalog, ctx); if (catalog_handle < 0) { @@ -560,6 +659,9 @@ static void moq_source_start_consume(struct moq_source *ctx, uint32_t expected_g bfree(broadcast_copy); // Failed to get catalog - clean up pthread_mutex_lock(&ctx->mutex); + // No subscription was created, so no terminal will fire; undo the ref. + if (--ctx->refs == 0) + pthread_cond_broadcast(&ctx->refs_zero); if (ctx->generation == expected_gen) { if (ctx->consume >= 0) { moq_consume_close(ctx->consume); @@ -579,11 +681,32 @@ static void moq_source_start_consume(struct moq_source *ctx, uint32_t expected_g return; } + // The catalog subscription now exists and will deliver a terminal on_catalog + // (<= 0) that releases the reference added above. Store the subscription + // handle so disconnect can close it, which makes that terminal fire promptly. + // (This is the real subscription handle, distinct from the catalog snapshot + // ids delivered to on_catalog.) + pthread_mutex_lock(&ctx->mutex); + if (ctx->generation == expected_gen && !ctx->shutting_down.load()) { + ctx->catalog_handle = catalog_handle; + pthread_mutex_unlock(&ctx->mutex); + } else { + // Stale or shutting down: close it; its terminal releases the reference. + pthread_mutex_unlock(&ctx->mutex); + moq_consume_catalog_close(catalog_handle); + } + LOG_INFO("Consuming broadcast: %s", broadcast_copy); bfree(broadcast_copy); } -// NOTE: Caller must hold ctx->mutex when calling this function +// NOTE: Caller must hold ctx->mutex when calling this function. +// +// The moq_*_close calls below only *signal* each subscription to wind down; +// libmoq delivers the terminal callback asynchronously on its runtime thread, +// never synchronously from within close(). That is what lets us close under the +// mutex here without the terminal callback's subscription_ref re-entering the +// mutex on this same thread (which would self-deadlock). static void moq_source_disconnect_locked(struct moq_source *ctx) { if (ctx->video_track >= 0) { @@ -669,7 +792,8 @@ static bool moq_source_init_decoder(struct moq_source *ctx, const struct moq_vid // Use codec description as extradata (contains SPS/PPS for H.264, VPS/SPS/PPS for HEVC, etc.) if (config->description && config->description_len > 0) { - new_codec_ctx->extradata = (uint8_t *)av_mallocz(config->description_len + AV_INPUT_BUFFER_PADDING_SIZE); + new_codec_ctx->extradata = + (uint8_t *)av_mallocz(config->description_len + AV_INPUT_BUFFER_PADDING_SIZE); if (new_codec_ctx->extradata) { memcpy(new_codec_ctx->extradata, config->description, config->description_len); new_codec_ctx->extradata_size = static_cast(config->description_len); @@ -711,9 +835,9 @@ static bool moq_source_init_decoder(struct moq_source *ctx, const struct moq_vid // dynamically on first decoded frame when we know the actual pixel format ctx->codec_ctx = new_codec_ctx; ctx->current_codec_id = codec_id; - ctx->current_pix_fmt = AV_PIX_FMT_NONE; // Will be set on first frame - ctx->sws_ctx = NULL; // Will be created on first frame with actual pixel format - ctx->frame_buffer = NULL; // Will be allocated on first frame with actual dimensions + ctx->current_pix_fmt = AV_PIX_FMT_NONE; // Will be set on first frame + ctx->sws_ctx = NULL; // Will be created on first frame with actual pixel format + ctx->frame_buffer = NULL; // Will be allocated on first frame with actual dimensions ctx->frame.width = width; ctx->frame.height = height; ctx->frame.linesize[0] = width * 4; @@ -732,8 +856,8 @@ static bool moq_source_init_decoder(struct moq_source *ctx, const struct moq_vid if (config->codec && copy_len > 0) { memcpy(codec_str, config->codec, copy_len); } - LOG_INFO("Decoder initialized: codec=%s, dimensions=%ux%u (may be refined on first frame)", - codec_str, width, height); + LOG_INFO("Decoder initialized: codec=%s, dimensions=%ux%u (may be refined on first frame)", codec_str, width, + height); return true; } @@ -798,10 +922,9 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) // Skip non-keyframes until we get the first one if (!ctx->got_keyframe && !frame_data.keyframe) { ctx->frames_waiting_for_keyframe++; - if (ctx->frames_waiting_for_keyframe == 1 || - (ctx->frames_waiting_for_keyframe % 30) == 0) { + if (ctx->frames_waiting_for_keyframe == 1 || (ctx->frames_waiting_for_keyframe % 30) == 0) { LOG_INFO("Waiting for keyframe... (skipped %u frames so far)", - ctx->frames_waiting_for_keyframe); + ctx->frames_waiting_for_keyframe); } pthread_mutex_unlock(&ctx->mutex); moq_consume_frame_close(frame_id); @@ -812,7 +935,7 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) if (frame_data.keyframe) { if (!ctx->got_keyframe) { LOG_INFO("Got keyframe after waiting for %u frames, payload_size=%zu", - ctx->frames_waiting_for_keyframe, frame_data.payload_size); + ctx->frames_waiting_for_keyframe, frame_data.payload_size); // Flush decoder to ensure clean state when starting from keyframe avcodec_flush_buffers(ctx->codec_ctx); } @@ -847,7 +970,7 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) // If too many consecutive errors, flush decoder and wait for next keyframe if (ctx->consecutive_decode_errors >= 5) { LOG_WARNING("Too many send errors (%u), flushing decoder and waiting for keyframe", - ctx->consecutive_decode_errors); + ctx->consecutive_decode_errors); avcodec_flush_buffers(ctx->codec_ctx); ctx->got_keyframe = false; ctx->consecutive_decode_errors = 0; @@ -878,7 +1001,7 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) // If too many consecutive errors, flush decoder and wait for next keyframe if (ctx->consecutive_decode_errors >= 5) { LOG_WARNING("Too many decode errors (%u), flushing decoder and waiting for keyframe", - ctx->consecutive_decode_errors); + ctx->consecutive_decode_errors); avcodec_flush_buffers(ctx->codec_ctx); ctx->got_keyframe = false; ctx->consecutive_decode_errors = 0; @@ -904,18 +1027,18 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) if (need_reinit) { if (dimensions_changed) { - LOG_INFO("Decoded frame dimensions changed: %ux%u -> %dx%d", - ctx->frame.width, ctx->frame.height, frame->width, frame->height); + LOG_INFO("Decoded frame dimensions changed: %ux%u -> %dx%d", ctx->frame.width, + ctx->frame.height, frame->width, frame->height); } if (pix_fmt_changed) { - LOG_INFO("Decoded frame pixel format changed: %d -> %d (%s)", - ctx->current_pix_fmt, decoded_pix_fmt, - av_get_pix_fmt_name(decoded_pix_fmt) ? av_get_pix_fmt_name(decoded_pix_fmt) : "unknown"); + LOG_INFO("Decoded frame pixel format changed: %d -> %d (%s)", ctx->current_pix_fmt, + decoded_pix_fmt, + av_get_pix_fmt_name(decoded_pix_fmt) ? av_get_pix_fmt_name(decoded_pix_fmt) + : "unknown"); } // Validate that dimensions are positive and reasonable - if (frame->width <= 0 || frame->height <= 0 || - frame->width > 16384 || frame->height > 16384) { + if (frame->width <= 0 || frame->height <= 0 || frame->width > 16384 || frame->height > 16384) { LOG_ERROR("Invalid decoded frame dimensions: %dx%d", frame->width, frame->height); av_frame_free(&frame); pthread_mutex_unlock(&ctx->mutex); @@ -939,15 +1062,14 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) } // Create new scaling context with the actual pixel format from the decoded frame - struct SwsContext *new_sws_ctx = sws_getContext( - frame->width, frame->height, decoded_pix_fmt, - frame->width, frame->height, AV_PIX_FMT_RGBA, - SWS_BILINEAR, NULL, NULL, NULL - ); + struct SwsContext *new_sws_ctx = sws_getContext(frame->width, frame->height, decoded_pix_fmt, + frame->width, frame->height, AV_PIX_FMT_RGBA, + SWS_BILINEAR, NULL, NULL, NULL); if (!new_sws_ctx) { - LOG_ERROR("Failed to create scaling context for %dx%d pix_fmt=%d (%s)", - frame->width, frame->height, decoded_pix_fmt, - av_get_pix_fmt_name(decoded_pix_fmt) ? av_get_pix_fmt_name(decoded_pix_fmt) : "unknown"); + LOG_ERROR("Failed to create scaling context for %dx%d pix_fmt=%d (%s)", frame->width, + frame->height, decoded_pix_fmt, + av_get_pix_fmt_name(decoded_pix_fmt) ? av_get_pix_fmt_name(decoded_pix_fmt) + : "unknown"); av_frame_free(&frame); pthread_mutex_unlock(&ctx->mutex); moq_consume_frame_close(frame_id); @@ -958,8 +1080,8 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) size_t new_buffer_size = (size_t)frame->width * (size_t)frame->height * 4; uint8_t *new_frame_buffer = (uint8_t *)bmalloc(new_buffer_size); if (!new_frame_buffer) { - LOG_ERROR("Failed to allocate frame buffer for %dx%d (%zu bytes)", - frame->width, frame->height, new_buffer_size); + LOG_ERROR("Failed to allocate frame buffer for %dx%d (%zu bytes)", frame->width, frame->height, + new_buffer_size); sws_freeContext(new_sws_ctx); av_frame_free(&frame); pthread_mutex_unlock(&ctx->mutex); @@ -981,17 +1103,16 @@ static void moq_source_decode_frame(struct moq_source *ctx, int32_t frame_id) ctx->frame.linesize[0] = frame->width * 4; ctx->frame.data[0] = new_frame_buffer; - LOG_INFO("Scaler initialized for %dx%d pix_fmt=%s", - frame->width, frame->height, - av_get_pix_fmt_name(decoded_pix_fmt) ? av_get_pix_fmt_name(decoded_pix_fmt) : "unknown"); + LOG_INFO("Scaler initialized for %dx%d pix_fmt=%s", frame->width, frame->height, + av_get_pix_fmt_name(decoded_pix_fmt) ? av_get_pix_fmt_name(decoded_pix_fmt) : "unknown"); } // Convert YUV420P to RGBA uint8_t *dst_data[4] = {ctx->frame_buffer, NULL, NULL, NULL}; int dst_linesize[4] = {static_cast(ctx->frame.width * 4), 0, 0, 0}; - sws_scale(ctx->sws_ctx, (const uint8_t *const *)frame->data, frame->linesize, - 0, ctx->frame.height, dst_data, dst_linesize); + sws_scale(ctx->sws_ctx, (const uint8_t *const *)frame->data, frame->linesize, 0, ctx->frame.height, dst_data, + dst_linesize); // Update OBS frame timestamp and output ctx->frame.timestamp = frame_data.timestamp_us;