Skip to content

Commit 0c91947

Browse files
kixelatedclaude
andauthored
Update libmoq to 0.3.6; rework MoQ source teardown to fix use-after-free (#44)
* Format moq-source.cpp with clang-format Bring the file into compliance with the repo's clang-format@19 config so the format check passes on subsequent changes. Pure formatting, no behavior change. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * Fix MoQ source teardown UAF; drop unnecessary reconnect sleep moq_source_destroy freed ctx after a fixed os_sleep_ms(100) that was meant to let in-flight async MoQ callbacks drain (they check shutting_down and exit early). That is a timing guess, not synchronization: the decode callback runs FFmpeg avcodec_send_packet/receive_frame, which can exceed 100ms on a large keyframe or slow/contended hardware, so a callback could touch ctx after it was freed. The code documented this limitation itself. Replace the sleep with real quiescence: - Add an in-flight callback counter (active_callbacks) and a condition variable (callbacks_done) to moq_source. - A callback_guard RAII type increments the counter on callback entry (unless shutdown has begun) and broadcasts when it returns to zero. Each top-level callback (on_session_status, on_catalog, on_video_frame) holds one for its whole stack frame, so ctx cannot be freed while a callback that may touch it is still running. - destroy sets shutting_down, disconnects, then waits on callbacks_done until the count is zero - all under the mutex so the handoff is atomic. Also drop the os_sleep_ms(50) in the reconnect path. libmoq origins and sessions are fully independent (each origin is a distinct random instance, each session its own task): moq_origin_close removes the origin synchronously and moq_session_close only signals the old session's task to wind down on the libmoq runtime thread. The new origin/session share no client-side state with the closed one, so the delay guarded nothing - it was pure latency. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * Update libmoq to 0.3.6 Bumps MOQ_VERSION 0.2.14 -> 0.3.6 (latest released; 0.3.7 is tagged but has no published release artifacts yet). The fetched API is signature-compatible for everything the plugin uses, and the moq_video_config / moq_frame structs are unchanged - the only breaking change is semantic: the session on_status codes were redefined in 0.3.0. Old (0.2.x): 0 = connected, non-zero = failure. New (0.3.x): > 0 = (re)connected carrying the connection epoch (1 = first connect, 2 = first reconnect, ...); 0 = closed cleanly (terminal); < 0 = fatal / reconnect permanently gave up (terminal). Update both session status callbacks accordingly: - moq-source.cpp on_session_status: treat > 0 as connected (was treating the new epoch-1 connect as a failure, which would have broken connecting entirely against 0.3.x). Start consuming only on the first connect; on later epochs libmoq re-subscribes our existing consumer automatically since the origin outlives the connection, so recreating it would leak handles. Handle 0 (clean close) and < 0 (error) as the terminal cases. - moq-output.cpp: same > 0 / <= 0 split for its informational connect log. A nice side effect: 0.3.x sessions auto-reconnect internally with backoff, so transient transport drops now recover without the plugin's manual reconnect (which remains for settings changes). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * Rework source teardown around libmoq terminal callbacks The previous teardown counted in-flight callback *executions* and waited for that count to hit zero. That closes the window where a callback is mid-execution when destroy runs, but not the window where libmoq delivers a callback *after* the count reached zero: each subscription fires one terminal callback (status <= 0) asynchronously after it is closed, which could land after destroy had already freed ctx. libmoq >= 0.3.0 documents a terminal-callback lifetime contract: user_data must stay valid until each subscription's terminal callback. Rework teardown to honor it directly via subscription reference counting: - ctx holds one reference for the OBS-owned source plus one per outstanding subscription handed `ctx` as user_data (session, catalog, video track). The ref is pre-incremented before the libmoq registration call (undone if it fails) and released by that subscription's terminal callback via the subscription_ref RAII guard. Because libmoq runs all callbacks serially on one runtime thread, the ref is held continuously from registration through the terminal callback, so ctx is always valid inside any callback. - destroy sets shutting_down, closes every subscription (which makes each terminal fire promptly - libmoq's biased close path wins over pending updates), drops the owner ref, and waits for the count to reach zero. A generous bounded wait backstops a subscription that never terminates so a broken teardown degrades to a logged warning instead of hanging OBS. Along the way this fixes a latent handle-management bug: the catalog *snapshot* id delivered to on_catalog was being stored and closed with moq_consume_catalog_close (which expects a *subscription* id from a different slab - an id-collision hazard). Now the real subscription handle from moq_consume_catalog is stored and closed, snapshots are freed with moq_consume_catalog_free on every path, and a catalog update closes the previous video track instead of leaking it. Also: a fatal session error now tears down all subscriptions (not just session+origin) so their terminals fire promptly, and reconnect bails if shutting down. The refcount balance and deadlock-freedom were checked by two independent adversarial reviews. Still wants runtime validation: connect/disconnect, scene-switch teardown with a decode in flight, settings-change reconnect, and repeated catalog updates. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * ci: trust obsproject tap before installing clang-format Homebrew now refuses to run executables from an untrusted third-party tap, so the format check's `brew install obsproject/tools/clang-format@19` was skipped ("Skipping obsproject/tools because it is not trusted") and the step failed with a broken pipe before any file was checked. Explicitly tap and `brew trust obsproject/tools` first. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * Format moq-output.h with clang-format Bring the header into compliance with the repo's clang-format@19 config so the format check passes once the file is touched. Pure formatting, no behavior change. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * Fix MoQ output teardown use-after-free; review fixes Code review surfaced that the source-side teardown rework left the sibling MoQOutput with the same class of bug the 0.3.x bump makes live: - Use-after-free: ~MoQOutput -> Stop() calls moq_session_close, which only signals; libmoq delivers the session's terminal status callback (code <= 0) asynchronously on its runtime thread afterward and dereferences `self` (server_url, connect_time_ms, connect_start). OBS deletes the object as soon as ~MoQOutput returns, so the late callback touched freed memory. 0.3.x's auto-reconnect also fires the callback repeatedly mid-stream, widening the window. Fix by mirroring the source's pattern: count outstanding session subscriptions (pre-incremented before moq_session_connect, released by the terminal callback) and have the destructor wait for the count to reach zero, with the same bounded backstop so a missing terminal warns instead of hanging. - Data race: connect_time_ms is written by the callback (runtime thread) and read by GetConnectTime() (OBS thread); make it std::atomic<int>. Also (source): restore the blank-video-on-catalog-error behavior that the terminal-callback refactor dropped, so an invalid/failed broadcast still clears the preview (except during our own teardown). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 0fa282c commit 0c91947

5 files changed

Lines changed: 381 additions & 218 deletions

File tree

.github/actions/run-clang-format/action.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ runs:
5151
if (( ${+RUNNER_DEBUG} )) setopt XTRACE
5252
5353
print ::group::Install clang-format-19
54+
# Homebrew now refuses to run executables from an untrusted tap, so
55+
# explicitly trust obsproject/tools before installing clang-format from it.
56+
brew tap obsproject/tools
57+
brew trust obsproject/tools
5458
brew install --quiet obsproject/tools/clang-format@19
5559
print ::endgroup::
5660

CMakePresets.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"ENABLE_QT": true,
1515
"CMAKE_EXPORT_COMPILE_COMMANDS": true,
1616
"BUILD_PLUGIN": true,
17-
"MOQ_VERSION": "0.2.14",
17+
"MOQ_VERSION": "0.3.6",
1818
"MOQ_ARCHIVE": "tar.gz"
1919
}
2020
},

src/moq-output.cpp

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ MoQOutput::MoQOutput(obs_data_t *, obs_output_t *output)
1515
connect_time_ms(0),
1616
origin(moq_origin_create()),
1717
session(0),
18-
broadcast(moq_publish_create())
18+
broadcast(moq_publish_create()),
19+
outstanding_sessions(0)
1920
{
2021
}
2122

@@ -25,6 +26,14 @@ MoQOutput::~MoQOutput()
2526
moq_origin_close(origin);
2627

2728
Stop();
29+
30+
// Wait for any outstanding session terminal callback to fire before `this`
31+
// is freed, so a late callback on the libmoq runtime thread can't touch freed
32+
// memory. Bounded so a missing terminal degrades to a warning, not a hang.
33+
std::unique_lock<std::mutex> lock(session_mutex);
34+
if (!session_cv.wait_for(lock, std::chrono::seconds(2), [this] { return outstanding_sessions == 0; }))
35+
LOG_WARNING("Output teardown timed out with %d MoQ session callback(s) outstanding",
36+
outstanding_sessions);
2837
}
2938

3039
bool MoQOutput::Start()
@@ -75,25 +84,46 @@ bool MoQOutput::Start()
7584

7685
connect_start = std::chrono::steady_clock::now();
7786

78-
// Create a callback to log when the session is connected or closed
79-
auto session_connect_callback = [](void *user_data, int error_code) {
87+
// Create a callback to log when the session is connected or closed.
88+
// libmoq status codes (>= 0.3.0): > 0 = (re)connected (epoch), 0 = closed
89+
// cleanly (terminal), < 0 = fatal/reconnect-gave-up (terminal).
90+
auto session_connect_callback = [](void *user_data, int code) {
8091
auto self = static_cast<MoQOutput *>(user_data);
8192

82-
if (error_code == 0) {
93+
if (code > 0) {
8394
auto elapsed = std::chrono::steady_clock::now() - self->connect_start;
84-
self->connect_time_ms = static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
85-
LOG_INFO("MoQ session established (%d ms): %s", self->connect_time_ms,
95+
self->connect_time_ms = static_cast<int>(
96+
std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
97+
LOG_INFO("MoQ session connected (%d ms, epoch %d): %s", self->connect_time_ms.load(), code,
8698
self->server_url.c_str());
8799
} else {
88-
LOG_INFO("MoQ session closed (%d): %s", error_code, self->server_url.c_str());
100+
// Terminal callback (0 = clean close, < 0 = fatal): the session task
101+
// has ended and will not touch `self` again. Release the lifetime
102+
// reference the destructor waits on. This is the last access to `self`.
103+
LOG_INFO("MoQ session closed (%d): %s", code, self->server_url.c_str());
104+
std::lock_guard<std::mutex> lock(self->session_mutex);
105+
if (--self->outstanding_sessions == 0)
106+
self->session_cv.notify_all();
89107
}
90108
};
91109

110+
// Pre-account for the session subscription before handing `this` to libmoq:
111+
// the connection can fail and fire its terminal callback before connect
112+
// returns, and the destructor must wait for that callback.
113+
{
114+
std::lock_guard<std::mutex> lock(session_mutex);
115+
outstanding_sessions++;
116+
}
117+
92118
// Start establishing a session with the MoQ server
93119
// NOTE: You could publish the same broadcasts to multiple sessions if you want (redundant ingest).
94120
session = moq_session_connect(server_url.data(), server_url.size(), origin, 0, session_connect_callback, this);
95121
if (session < 0) {
96122
LOG_ERROR("Failed to initialize MoQ server: %d", session);
123+
// No subscription was created, so no terminal will fire; undo the ref.
124+
std::lock_guard<std::mutex> lock(session_mutex);
125+
if (--outstanding_sessions == 0)
126+
session_cv.notify_all();
97127
return false;
98128
}
99129

src/moq-output.h

Lines changed: 48 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,59 @@
11
#pragma once
22
#include <obs-module.h>
33

4+
#include <atomic>
45
#include <chrono>
6+
#include <condition_variable>
57
#include <map>
8+
#include <mutex>
69
#include <string>
710
#include "logger.h"
811

9-
class MoQOutput
10-
{
11-
public:
12-
MoQOutput(obs_data_t *settings, obs_output_t *output);
13-
~MoQOutput();
14-
15-
bool Start();
16-
void Stop(bool signal = true);
17-
void Data(struct encoder_packet *packet);
18-
19-
inline size_t GetTotalBytes()
20-
{
21-
return total_bytes_sent;
22-
}
23-
24-
inline int GetConnectTime()
25-
{
26-
return connect_time_ms;
27-
}
28-
29-
private:
30-
void VideoInit(obs_encoder_t *encoder);
31-
void VideoData(struct encoder_packet *packet);
32-
void AudioInit(obs_encoder_t *encoder);
33-
void AudioData(struct encoder_packet *packet);
34-
35-
obs_output_t *output;
36-
37-
std::string server_url;
38-
std::string path;
39-
40-
size_t total_bytes_sent;
41-
int connect_time_ms;
42-
std::chrono::steady_clock::time_point connect_start;
43-
44-
int origin;
45-
int session;
46-
int broadcast;
47-
std::map<obs_encoder_t *, int> video_tracks;
48-
std::map<obs_encoder_t *, int> audio_tracks;
12+
class MoQOutput {
13+
public:
14+
MoQOutput(obs_data_t *settings, obs_output_t *output);
15+
~MoQOutput();
16+
17+
bool Start();
18+
void Stop(bool signal = true);
19+
void Data(struct encoder_packet *packet);
20+
21+
inline size_t GetTotalBytes() { return total_bytes_sent; }
22+
23+
inline int GetConnectTime() { return connect_time_ms; }
24+
25+
private:
26+
void VideoInit(obs_encoder_t *encoder);
27+
void VideoData(struct encoder_packet *packet);
28+
void AudioInit(obs_encoder_t *encoder);
29+
void AudioData(struct encoder_packet *packet);
30+
31+
obs_output_t *output;
32+
33+
std::string server_url;
34+
std::string path;
35+
36+
size_t total_bytes_sent;
37+
// Written by the session status callback (libmoq runtime thread), read by
38+
// GetConnectTime() (OBS thread); atomic to avoid a data race.
39+
std::atomic<int> connect_time_ms;
40+
std::chrono::steady_clock::time_point connect_start;
41+
42+
int origin;
43+
int session;
44+
int broadcast;
45+
46+
// Session subscription lifetime. libmoq delivers a terminal status callback
47+
// (code <= 0) asynchronously on its runtime thread after moq_session_close,
48+
// and may touch `this` until then. outstanding_sessions counts sessions whose
49+
// terminal callback hasn't fired; the destructor waits for it to reach zero
50+
// so a late callback can't touch freed memory.
51+
std::mutex session_mutex;
52+
std::condition_variable session_cv;
53+
int outstanding_sessions;
54+
55+
std::map<obs_encoder_t *, int> video_tracks;
56+
std::map<obs_encoder_t *, int> audio_tracks;
4957
};
5058

5159
void register_moq_output();

0 commit comments

Comments
 (0)