Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/actions/run-clang-format/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ runs:
if (( ${+RUNNER_DEBUG} )) setopt XTRACE

print ::group::Install clang-format-19
# Homebrew now refuses to run executables from an untrusted tap, so
# explicitly trust obsproject/tools before installing clang-format from it.
brew tap obsproject/tools
brew trust obsproject/tools
brew install --quiet obsproject/tools/clang-format@19
print ::endgroup::

Expand Down
2 changes: 1 addition & 1 deletion CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"ENABLE_QT": true,
"CMAKE_EXPORT_COMPILE_COMMANDS": true,
"BUILD_PLUGIN": true,
"MOQ_VERSION": "0.2.14",
"MOQ_VERSION": "0.3.6",
"MOQ_ARCHIVE": "tar.gz"
}
},
Expand Down
44 changes: 37 additions & 7 deletions src/moq-output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ MoQOutput::MoQOutput(obs_data_t *, obs_output_t *output)
connect_time_ms(0),
origin(moq_origin_create()),
session(0),
broadcast(moq_publish_create())
broadcast(moq_publish_create()),
outstanding_sessions(0)
{
}

Expand All @@ -25,6 +26,14 @@ MoQOutput::~MoQOutput()
moq_origin_close(origin);

Stop();

// Wait for any outstanding session terminal callback to fire before `this`
// is freed, so a late callback on the libmoq runtime thread can't touch freed
// memory. Bounded so a missing terminal degrades to a warning, not a hang.
std::unique_lock<std::mutex> lock(session_mutex);
if (!session_cv.wait_for(lock, std::chrono::seconds(2), [this] { return outstanding_sessions == 0; }))
LOG_WARNING("Output teardown timed out with %d MoQ session callback(s) outstanding",
outstanding_sessions);
}

bool MoQOutput::Start()
Expand Down Expand Up @@ -75,25 +84,46 @@ bool MoQOutput::Start()

connect_start = std::chrono::steady_clock::now();

// Create a callback to log when the session is connected or closed
auto session_connect_callback = [](void *user_data, int error_code) {
// Create a callback to log when the session is connected or closed.
// libmoq status codes (>= 0.3.0): > 0 = (re)connected (epoch), 0 = closed
// cleanly (terminal), < 0 = fatal/reconnect-gave-up (terminal).
auto session_connect_callback = [](void *user_data, int code) {
auto self = static_cast<MoQOutput *>(user_data);

if (error_code == 0) {
if (code > 0) {
auto elapsed = std::chrono::steady_clock::now() - self->connect_start;
self->connect_time_ms = static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
LOG_INFO("MoQ session established (%d ms): %s", self->connect_time_ms,
self->connect_time_ms = static_cast<int>(
std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
LOG_INFO("MoQ session connected (%d ms, epoch %d): %s", self->connect_time_ms.load(), code,
self->server_url.c_str());
} else {
LOG_INFO("MoQ session closed (%d): %s", error_code, self->server_url.c_str());
// Terminal callback (0 = clean close, < 0 = fatal): the session task
// has ended and will not touch `self` again. Release the lifetime
// reference the destructor waits on. This is the last access to `self`.
LOG_INFO("MoQ session closed (%d): %s", code, self->server_url.c_str());
std::lock_guard<std::mutex> lock(self->session_mutex);
if (--self->outstanding_sessions == 0)
self->session_cv.notify_all();
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment on lines +93 to 107

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Critical: Reset connect_time_ms to zero when the session closes.

When code <= 0 (session closed or failed), the callback does not reset connect_time_ms to zero. The UI determines connection state by checking obs_output_get_connect_time_ms(output) > 0 (see moq-dock.cpp:326), so leaving connect_time_ms at its previous positive value will cause the UI to incorrectly display "● Connected" even after the session has closed.

🐛 Proposed fix to reset connection time on close
 	if (code > 0) {
 		auto elapsed = std::chrono::steady_clock::now() - self->connect_start;
 		self->connect_time_ms = static_cast<int>(
 			std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
 		LOG_INFO("MoQ session connected (%d ms, epoch %d): %s", self->connect_time_ms, code,
 			 self->server_url.c_str());
 	} else {
+		self->connect_time_ms = 0;
 		LOG_INFO("MoQ session closed (%d): %s", code, self->server_url.c_str());
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (code > 0) {
auto elapsed = std::chrono::steady_clock::now() - self->connect_start;
self->connect_time_ms = static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
LOG_INFO("MoQ session established (%d ms): %s", self->connect_time_ms,
self->connect_time_ms = static_cast<int>(
std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
LOG_INFO("MoQ session connected (%d ms, epoch %d): %s", self->connect_time_ms, code,
self->server_url.c_str());
} else {
LOG_INFO("MoQ session closed (%d): %s", error_code, self->server_url.c_str());
LOG_INFO("MoQ session closed (%d): %s", code, self->server_url.c_str());
}
if (code > 0) {
auto elapsed = std::chrono::steady_clock::now() - self->connect_start;
self->connect_time_ms = static_cast<int>(
std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
LOG_INFO("MoQ session connected (%d ms, epoch %d): %s", self->connect_time_ms, code,
self->server_url.c_str());
} else {
self->connect_time_ms = 0;
LOG_INFO("MoQ session closed (%d): %s", code, self->server_url.c_str());
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/moq-output.cpp` around lines 84 - 92, The connect_time_ms field is not
being reset when the session closes (code <= 0), which causes the UI to
incorrectly display the connection as active based on checking if
connect_time_ms is greater than zero. In the else block that handles the session
closed condition (where code <= 0), add a statement to reset
self->connect_time_ms to zero before or after the LOG_INFO call. This ensures
the UI will correctly reflect that the connection is no longer active.

};

// Pre-account for the session subscription before handing `this` to libmoq:
// the connection can fail and fire its terminal callback before connect
// returns, and the destructor must wait for that callback.
{
std::lock_guard<std::mutex> lock(session_mutex);
outstanding_sessions++;
}

// Start establishing a session with the MoQ server
// NOTE: You could publish the same broadcasts to multiple sessions if you want (redundant ingest).
session = moq_session_connect(server_url.data(), server_url.size(), origin, 0, session_connect_callback, this);
if (session < 0) {
LOG_ERROR("Failed to initialize MoQ server: %d", session);
// No subscription was created, so no terminal will fire; undo the ref.
std::lock_guard<std::mutex> lock(session_mutex);
if (--outstanding_sessions == 0)
session_cv.notify_all();
return false;
}

Expand Down
88 changes: 48 additions & 40 deletions src/moq-output.h
Original file line number Diff line number Diff line change
@@ -1,51 +1,59 @@
#pragma once
#include <obs-module.h>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <map>
#include <mutex>
#include <string>
#include "logger.h"

class MoQOutput
{
public:
MoQOutput(obs_data_t *settings, obs_output_t *output);
~MoQOutput();

bool Start();
void Stop(bool signal = true);
void Data(struct encoder_packet *packet);

inline size_t GetTotalBytes()
{
return total_bytes_sent;
}

inline int GetConnectTime()
{
return connect_time_ms;
}

private:
void VideoInit(obs_encoder_t *encoder);
void VideoData(struct encoder_packet *packet);
void AudioInit(obs_encoder_t *encoder);
void AudioData(struct encoder_packet *packet);

obs_output_t *output;

std::string server_url;
std::string path;

size_t total_bytes_sent;
int connect_time_ms;
std::chrono::steady_clock::time_point connect_start;

int origin;
int session;
int broadcast;
std::map<obs_encoder_t *, int> video_tracks;
std::map<obs_encoder_t *, int> audio_tracks;
class MoQOutput {
public:
MoQOutput(obs_data_t *settings, obs_output_t *output);
~MoQOutput();

bool Start();
void Stop(bool signal = true);
void Data(struct encoder_packet *packet);

inline size_t GetTotalBytes() { return total_bytes_sent; }

inline int GetConnectTime() { return connect_time_ms; }

private:
void VideoInit(obs_encoder_t *encoder);
void VideoData(struct encoder_packet *packet);
void AudioInit(obs_encoder_t *encoder);
void AudioData(struct encoder_packet *packet);

obs_output_t *output;

std::string server_url;
std::string path;

size_t total_bytes_sent;
// Written by the session status callback (libmoq runtime thread), read by
// GetConnectTime() (OBS thread); atomic to avoid a data race.
std::atomic<int> connect_time_ms;
std::chrono::steady_clock::time_point connect_start;

int origin;
int session;
int broadcast;

// Session subscription lifetime. libmoq delivers a terminal status callback
// (code <= 0) asynchronously on its runtime thread after moq_session_close,
// and may touch `this` until then. outstanding_sessions counts sessions whose
// terminal callback hasn't fired; the destructor waits for it to reach zero
// so a late callback can't touch freed memory.
std::mutex session_mutex;
std::condition_variable session_cv;
int outstanding_sessions;

std::map<obs_encoder_t *, int> video_tracks;
std::map<obs_encoder_t *, int> audio_tracks;
};

void register_moq_output();
Loading
Loading