Skip to content

Commit ef253f0

Browse files
ruiren_microsoftCopilot
andcommitted
Address Copilot review feedback: fix deadlock, validation, and safety
- Fix potential deadlock: close resultQueue before joining pushThread in StopInternal, store final response in member variable instead of pushing to closed queue. TryGetNext returns it after queue drains. - Use TryPush in PushWorkerLoop to prevent worker blocking on full result queue (log warning on drop instead of deadlocking). - Validate push_queue_capacity > 0 before Start() to prevent hang/DoS. - Add bounds check for size_t to int32_t cast in callWithBinary. - Improve error messages: distinguish not-started vs already-stopped. - Fall back to raw response.error when parsed CoreErrorResponse.message is empty. - Mark CreateLiveTranscriptionSession() as const. - Add tests: AppendAfterStopThrows, Start_InvalidCapacityThrows. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 25446d9 commit ef253f0

6 files changed

Lines changed: 77 additions & 27 deletions

File tree

sdk/cpp/include/openai/openai_audio_client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ namespace foundry_local {
3838
void TranscribeAudioStreaming(const std::filesystem::path& audioFilePath, const StreamCallback& onChunk) const;
3939

4040
/// Create a new live audio transcription session for streaming PCM audio.
41-
std::unique_ptr<LiveAudioTranscriptionSession> CreateLiveTranscriptionSession();
41+
std::unique_ptr<LiveAudioTranscriptionSession> CreateLiveTranscriptionSession() const;
4242

4343
private:
4444
OpenAIAudioClient(gsl::not_null<foundry_local::Internal::IFoundryLocalCore*> core, std::string_view modelId,

sdk/cpp/include/openai/openai_live_audio_client.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ namespace foundry_local {
9494

9595
std::thread pushThread_;
9696
std::string errorMessage_;
97+
LiveAudioTranscriptionResponse finalResult_;
98+
bool hasFinalResult_ = false;
9799
};
98100

99101
} // namespace foundry_local

sdk/cpp/src/core.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33
//
4-
// Core DLL interop loads Microsoft.AI.Foundry.Local.Core.dll at runtime.
4+
// Core DLL interop loads Microsoft.AI.Foundry.Local.Core.dll at runtime.
55
// Internal header, not part of the public API.
66

77
#pragma once
@@ -109,6 +109,9 @@ namespace foundry_local {
109109
}
110110

111111
if (binaryData && binaryDataLength > 0) {
112+
if (binaryDataLength > static_cast<size_t>(INT32_MAX)) {
113+
throw Exception("Binary data length exceeds maximum supported size (INT32_MAX).", logger);
114+
}
112115
request.BinaryData = binaryData;
113116
request.BinaryDataLength = static_cast<int32_t>(binaryDataLength);
114117
}

sdk/cpp/src/openai_audio_client.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ namespace foundry_local {
6969
}
7070
}
7171

72-
std::unique_ptr<LiveAudioTranscriptionSession> OpenAIAudioClient::CreateLiveTranscriptionSession() {
72+
std::unique_ptr<LiveAudioTranscriptionSession> OpenAIAudioClient::CreateLiveTranscriptionSession() const {
7373
return std::make_unique<LiveAudioTranscriptionSession>(core_, modelId_, logger_);
7474
}
7575

sdk/cpp/src/openai_live_audio_client.cpp

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ namespace foundry_local {
4848
state_ = SessionState::Starting;
4949
activeSettings_ = settings_;
5050

51+
// Validate queue capacity early
52+
if (activeSettings_.push_queue_capacity <= 0) {
53+
state_ = SessionState::Created;
54+
throw Exception("push_queue_capacity must be greater than 0.", *logger_);
55+
}
56+
5157
// Build the start command
5258
CoreInteropRequest req("audio_stream_start");
5359
req.AddParam("Model", modelId_);
@@ -77,11 +83,12 @@ namespace foundry_local {
7783
throw Exception("audio_stream_start returned an empty session handle.", *logger_);
7884
}
7985

86+
// Validate queue capacity
87+
const size_t queueCapacity = static_cast<size_t>(activeSettings_.push_queue_capacity);
88+
8089
// Create the queues
81-
pushQueue_ = std::make_unique<Internal::ThreadSafeQueue<AudioChunk>>(
82-
static_cast<size_t>(activeSettings_.push_queue_capacity));
83-
resultQueue_ = std::make_unique<Internal::ThreadSafeQueue<LiveAudioTranscriptionResponse>>(
84-
static_cast<size_t>(activeSettings_.push_queue_capacity));
90+
pushQueue_ = std::make_unique<Internal::ThreadSafeQueue<AudioChunk>>(queueCapacity);
91+
resultQueue_ = std::make_unique<Internal::ThreadSafeQueue<LiveAudioTranscriptionResponse>>(queueCapacity);
8592

8693
state_ = SessionState::Started;
8794

@@ -93,7 +100,11 @@ namespace foundry_local {
93100
{
94101
std::lock_guard<std::mutex> lock(mutex_);
95102
if (state_ != SessionState::Started) {
96-
throw Exception("Session is not started. Call Start() first.", *logger_);
103+
throw Exception(
104+
state_ == SessionState::Stopped
105+
? "Session has already been stopped."
106+
: "Session is not started. Call Start() first.",
107+
*logger_);
97108
}
98109
}
99110

@@ -118,8 +129,16 @@ namespace foundry_local {
118129
return TranscriptionStatus::Result;
119130
case Internal::DequeueStatus::Timeout:
120131
return TranscriptionStatus::Timeout;
121-
case Internal::DequeueStatus::Closed:
132+
case Internal::DequeueStatus::Closed: {
133+
// Return the final result from Stop() if available
134+
std::lock_guard<std::mutex> lock(mutex_);
135+
if (hasFinalResult_) {
136+
result = std::move(finalResult_);
137+
hasFinalResult_ = false;
138+
return TranscriptionStatus::Result;
139+
}
122140
return TranscriptionStatus::Closed;
141+
}
123142
case Internal::DequeueStatus::Error:
124143
return TranscriptionStatus::Error;
125144
default:
@@ -144,9 +163,15 @@ namespace foundry_local {
144163
pushQueue_->Close();
145164
}
146165

166+
// Close the result queue to unblock any blocked Push() in the worker thread,
167+
// preventing a deadlock when joining below.
168+
if (resultQueue_) {
169+
resultQueue_->Close();
170+
}
171+
147172
lock.unlock();
148173

149-
// Wait for the push thread to finish
174+
// Wait for the push thread to finish (safe now — worker is unblocked)
150175
if (pushThread_.joinable()) {
151176
pushThread_.join();
152177
}
@@ -158,23 +183,20 @@ namespace foundry_local {
158183

159184
auto response = core_->call(req.Command(), *logger_, &json);
160185

161-
// Enqueue the final transcription result from the stop response, then close
162-
if (resultQueue_) {
163-
if (response.HasError()) {
186+
// Store the final result or error for retrieval via TryGetNext
187+
if (response.HasError()) {
188+
if (resultQueue_) {
164189
resultQueue_->CloseWithError("audio_stream_stop failed: " + response.error);
165190
}
166-
else {
167-
if (!response.data.empty()) {
168-
try {
169-
auto finalResult = LiveAudioTranscriptionResponse::FromJson(response.data);
170-
resultQueue_->Push(std::move(finalResult));
171-
}
172-
catch (const std::exception& e) {
173-
logger_->Log(LogLevel::Warning,
174-
std::string("Failed to parse final transcription response: ") + e.what());
175-
}
176-
}
177-
resultQueue_->Close();
191+
}
192+
else if (!response.data.empty()) {
193+
try {
194+
finalResult_ = LiveAudioTranscriptionResponse::FromJson(response.data);
195+
hasFinalResult_ = true;
196+
}
197+
catch (const std::exception& e) {
198+
logger_->Log(LogLevel::Warning,
199+
std::string("Failed to parse final transcription response: ") + e.what());
178200
}
179201
}
180202

@@ -204,7 +226,10 @@ namespace foundry_local {
204226

205227
if (response.HasError()) {
206228
auto coreError = CoreErrorResponse::TryParse(response.error);
207-
std::string msg = coreError.has_value() ? coreError->message : response.error;
229+
std::string msg =
230+
(coreError.has_value() && !coreError->message.empty())
231+
? coreError->message
232+
: response.error;
208233

209234
logger_->Log(LogLevel::Error, "audio_stream_push failed: " + msg);
210235
pushQueue_->Close();
@@ -219,7 +244,11 @@ namespace foundry_local {
219244
if (!response.data.empty()) {
220245
try {
221246
auto result = LiveAudioTranscriptionResponse::FromJson(response.data);
222-
resultQueue_->Push(std::move(result));
247+
if (!resultQueue_->TryPush(std::move(result))) {
248+
logger_->Log(
249+
LogLevel::Warning,
250+
"Dropping transcription result because the result queue is full.");
251+
}
223252
}
224253
catch (const std::exception& e) {
225254
logger_->Log(LogLevel::Warning,

sdk/cpp/test/live_audio_test.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,22 @@ TEST_F(LiveAudioSessionTest, AppendBeforeStartThrows) {
210210
EXPECT_THROW(session.Append(data.data(), data.size()), Exception);
211211
}
212212

213+
TEST_F(LiveAudioSessionTest, AppendAfterStopThrows) {
214+
SetUpAllHandlers();
215+
216+
LiveAudioTranscriptionSession session(&core_, "whisper-model", &logger_);
217+
session.Start();
218+
session.Stop();
219+
std::vector<uint8_t> data = {0, 1, 2, 3};
220+
EXPECT_THROW(session.Append(data.data(), data.size()), Exception);
221+
}
222+
223+
TEST_F(LiveAudioSessionTest, Start_InvalidCapacityThrows) {
224+
LiveAudioTranscriptionSession session(&core_, "whisper-model", &logger_);
225+
session.Settings().push_queue_capacity = 0;
226+
EXPECT_THROW(session.Start(), Exception);
227+
}
228+
213229
TEST_F(LiveAudioSessionTest, StopParseFinalResponse) {
214230
SetUpStartHandlers();
215231
SetUpPushHandler();

0 commit comments

Comments
 (0)