-
Notifications
You must be signed in to change notification settings - Fork 17
Update libmoq to 0.3.6; rework MoQ source teardown to fix use-after-free #44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
13c1c11
ffcd5c4
609cd9a
ea3b517
4a4d48c
9e9334f
f1d39ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -15,7 +15,8 @@ MoQOutput::MoQOutput(obs_data_t *, obs_output_t *output) | |||||||||||||||||||||||||||||||||||||||||||||
| connect_time_ms(0), | ||||||||||||||||||||||||||||||||||||||||||||||
| origin(moq_origin_create()), | ||||||||||||||||||||||||||||||||||||||||||||||
| session(0), | ||||||||||||||||||||||||||||||||||||||||||||||
| broadcast(moq_publish_create()) | ||||||||||||||||||||||||||||||||||||||||||||||
| broadcast(moq_publish_create()), | ||||||||||||||||||||||||||||||||||||||||||||||
| outstanding_sessions(0) | ||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -25,6 +26,14 @@ MoQOutput::~MoQOutput() | |||||||||||||||||||||||||||||||||||||||||||||
| moq_origin_close(origin); | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| Stop(); | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| // Wait for any outstanding session terminal callback to fire before `this` | ||||||||||||||||||||||||||||||||||||||||||||||
| // is freed, so a late callback on the libmoq runtime thread can't touch freed | ||||||||||||||||||||||||||||||||||||||||||||||
| // memory. Bounded so a missing terminal degrades to a warning, not a hang. | ||||||||||||||||||||||||||||||||||||||||||||||
| std::unique_lock<std::mutex> lock(session_mutex); | ||||||||||||||||||||||||||||||||||||||||||||||
| if (!session_cv.wait_for(lock, std::chrono::seconds(2), [this] { return outstanding_sessions == 0; })) | ||||||||||||||||||||||||||||||||||||||||||||||
| LOG_WARNING("Output teardown timed out with %d MoQ session callback(s) outstanding", | ||||||||||||||||||||||||||||||||||||||||||||||
| outstanding_sessions); | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| bool MoQOutput::Start() | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -75,25 +84,46 @@ bool MoQOutput::Start() | |||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| connect_start = std::chrono::steady_clock::now(); | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| // Create a callback to log when the session is connected or closed | ||||||||||||||||||||||||||||||||||||||||||||||
| auto session_connect_callback = [](void *user_data, int error_code) { | ||||||||||||||||||||||||||||||||||||||||||||||
| // Create a callback to log when the session is connected or closed. | ||||||||||||||||||||||||||||||||||||||||||||||
| // libmoq status codes (>= 0.3.0): > 0 = (re)connected (epoch), 0 = closed | ||||||||||||||||||||||||||||||||||||||||||||||
| // cleanly (terminal), < 0 = fatal/reconnect-gave-up (terminal). | ||||||||||||||||||||||||||||||||||||||||||||||
| auto session_connect_callback = [](void *user_data, int code) { | ||||||||||||||||||||||||||||||||||||||||||||||
| auto self = static_cast<MoQOutput *>(user_data); | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| if (error_code == 0) { | ||||||||||||||||||||||||||||||||||||||||||||||
| if (code > 0) { | ||||||||||||||||||||||||||||||||||||||||||||||
| auto elapsed = std::chrono::steady_clock::now() - self->connect_start; | ||||||||||||||||||||||||||||||||||||||||||||||
| self->connect_time_ms = static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count()); | ||||||||||||||||||||||||||||||||||||||||||||||
| LOG_INFO("MoQ session established (%d ms): %s", self->connect_time_ms, | ||||||||||||||||||||||||||||||||||||||||||||||
| self->connect_time_ms = static_cast<int>( | ||||||||||||||||||||||||||||||||||||||||||||||
| std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count()); | ||||||||||||||||||||||||||||||||||||||||||||||
| LOG_INFO("MoQ session connected (%d ms, epoch %d): %s", self->connect_time_ms.load(), code, | ||||||||||||||||||||||||||||||||||||||||||||||
| self->server_url.c_str()); | ||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||
| LOG_INFO("MoQ session closed (%d): %s", error_code, self->server_url.c_str()); | ||||||||||||||||||||||||||||||||||||||||||||||
| // Terminal callback (0 = clean close, < 0 = fatal): the session task | ||||||||||||||||||||||||||||||||||||||||||||||
| // has ended and will not touch `self` again. Release the lifetime | ||||||||||||||||||||||||||||||||||||||||||||||
| // reference the destructor waits on. This is the last access to `self`. | ||||||||||||||||||||||||||||||||||||||||||||||
| LOG_INFO("MoQ session closed (%d): %s", code, self->server_url.c_str()); | ||||||||||||||||||||||||||||||||||||||||||||||
| std::lock_guard<std::mutex> lock(self->session_mutex); | ||||||||||||||||||||||||||||||||||||||||||||||
| if (--self->outstanding_sessions == 0) | ||||||||||||||||||||||||||||||||||||||||||||||
| self->session_cv.notify_all(); | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+93
to
107
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical: Reset When 🐛 Proposed fix to reset connection time on close if (code > 0) {
auto elapsed = std::chrono::steady_clock::now() - self->connect_start;
self->connect_time_ms = static_cast<int>(
std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
LOG_INFO("MoQ session connected (%d ms, epoch %d): %s", self->connect_time_ms, code,
self->server_url.c_str());
} else {
+ self->connect_time_ms = 0;
LOG_INFO("MoQ session closed (%d): %s", code, self->server_url.c_str());
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| // Pre-account for the session subscription before handing `this` to libmoq: | ||||||||||||||||||||||||||||||||||||||||||||||
| // the connection can fail and fire its terminal callback before connect | ||||||||||||||||||||||||||||||||||||||||||||||
| // returns, and the destructor must wait for that callback. | ||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||
| std::lock_guard<std::mutex> lock(session_mutex); | ||||||||||||||||||||||||||||||||||||||||||||||
| outstanding_sessions++; | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| // Start establishing a session with the MoQ server | ||||||||||||||||||||||||||||||||||||||||||||||
| // NOTE: You could publish the same broadcasts to multiple sessions if you want (redundant ingest). | ||||||||||||||||||||||||||||||||||||||||||||||
| session = moq_session_connect(server_url.data(), server_url.size(), origin, 0, session_connect_callback, this); | ||||||||||||||||||||||||||||||||||||||||||||||
| if (session < 0) { | ||||||||||||||||||||||||||||||||||||||||||||||
| LOG_ERROR("Failed to initialize MoQ server: %d", session); | ||||||||||||||||||||||||||||||||||||||||||||||
| // No subscription was created, so no terminal will fire; undo the ref. | ||||||||||||||||||||||||||||||||||||||||||||||
| std::lock_guard<std::mutex> lock(session_mutex); | ||||||||||||||||||||||||||||||||||||||||||||||
| if (--outstanding_sessions == 0) | ||||||||||||||||||||||||||||||||||||||||||||||
| session_cv.notify_all(); | ||||||||||||||||||||||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,51 +1,59 @@ | ||
| #pragma once | ||
| #include <obs-module.h> | ||
|
|
||
| #include <atomic> | ||
| #include <chrono> | ||
| #include <condition_variable> | ||
| #include <map> | ||
| #include <mutex> | ||
| #include <string> | ||
| #include "logger.h" | ||
|
|
||
| class MoQOutput | ||
| { | ||
| public: | ||
| MoQOutput(obs_data_t *settings, obs_output_t *output); | ||
| ~MoQOutput(); | ||
|
|
||
| bool Start(); | ||
| void Stop(bool signal = true); | ||
| void Data(struct encoder_packet *packet); | ||
|
|
||
| inline size_t GetTotalBytes() | ||
| { | ||
| return total_bytes_sent; | ||
| } | ||
|
|
||
| inline int GetConnectTime() | ||
| { | ||
| return connect_time_ms; | ||
| } | ||
|
|
||
| private: | ||
| void VideoInit(obs_encoder_t *encoder); | ||
| void VideoData(struct encoder_packet *packet); | ||
| void AudioInit(obs_encoder_t *encoder); | ||
| void AudioData(struct encoder_packet *packet); | ||
|
|
||
| obs_output_t *output; | ||
|
|
||
| std::string server_url; | ||
| std::string path; | ||
|
|
||
| size_t total_bytes_sent; | ||
| int connect_time_ms; | ||
| std::chrono::steady_clock::time_point connect_start; | ||
|
|
||
| int origin; | ||
| int session; | ||
| int broadcast; | ||
| std::map<obs_encoder_t *, int> video_tracks; | ||
| std::map<obs_encoder_t *, int> audio_tracks; | ||
| class MoQOutput { | ||
| public: | ||
| MoQOutput(obs_data_t *settings, obs_output_t *output); | ||
| ~MoQOutput(); | ||
|
|
||
| bool Start(); | ||
| void Stop(bool signal = true); | ||
| void Data(struct encoder_packet *packet); | ||
|
|
||
| inline size_t GetTotalBytes() { return total_bytes_sent; } | ||
|
|
||
| inline int GetConnectTime() { return connect_time_ms; } | ||
|
|
||
| private: | ||
| void VideoInit(obs_encoder_t *encoder); | ||
| void VideoData(struct encoder_packet *packet); | ||
| void AudioInit(obs_encoder_t *encoder); | ||
| void AudioData(struct encoder_packet *packet); | ||
|
|
||
| obs_output_t *output; | ||
|
|
||
| std::string server_url; | ||
| std::string path; | ||
|
|
||
| size_t total_bytes_sent; | ||
| // Written by the session status callback (libmoq runtime thread), read by | ||
| // GetConnectTime() (OBS thread); atomic to avoid a data race. | ||
| std::atomic<int> connect_time_ms; | ||
| std::chrono::steady_clock::time_point connect_start; | ||
|
|
||
| int origin; | ||
| int session; | ||
| int broadcast; | ||
|
|
||
| // Session subscription lifetime. libmoq delivers a terminal status callback | ||
| // (code <= 0) asynchronously on its runtime thread after moq_session_close, | ||
| // and may touch `this` until then. outstanding_sessions counts sessions whose | ||
| // terminal callback hasn't fired; the destructor waits for it to reach zero | ||
| // so a late callback can't touch freed memory. | ||
| std::mutex session_mutex; | ||
| std::condition_variable session_cv; | ||
| int outstanding_sessions; | ||
|
|
||
| std::map<obs_encoder_t *, int> video_tracks; | ||
| std::map<obs_encoder_t *, int> audio_tracks; | ||
| }; | ||
|
|
||
| void register_moq_output(); |
Uh oh!
There was an error while loading. Please reload this page.