Skip to content

Commit 99b091f

Browse files
rui-renruiren_microsoftCopilot
authored
Add Nemotron-ASR streaming inference to Rust SDK (#613)
## Add Nemotron-ASR streaming inference to Rust SDK" ### Description Ports the C# live audio transcription feature ([PR #485](#485)) to the Rust SDK with full API parity. The existing `AudioClient` only supports file-based transcription. This PR introduces `LiveAudioTranscriptionSession` that accepts continuous PCM audio chunks (e.g., from a microphone) and returns partial/final transcription results as an async stream. ### What's included **New files** - `sdk/rust/src/openai/live_audio_client.rs` — Streaming session with `start()`, `append()`, `get_transcription_stream()`, `stop()`, plus types, cancellation support, and unit tests - `sdk/rust/tests/integration/live_audio_test.rs` — E2E integration test with synthetic PCM audio - `samples/rust/live-audio-transcription-example/` — Full sample with real microphone capture (cpal) and resampling **Modified files** - `sdk/rust/src/detail/core_interop.rs` — Added `StreamingRequestBuffer` FFI struct and `execute_command_with_binary()` for binary audio data - `sdk/rust/src/openai/audio_client.rs` — Added `create_live_transcription_session()` factory method - `sdk/rust/src/detail/model.rs`, `model_variant.rs` — Wired factory method to `Model` - `sdk/rust/src/openai/mod.rs`, `src/lib.rs` — Module registration and public exports - `sdk/rust/Cargo.toml` — Added `tokio-util` dependency for `CancellationToken` ### API surface ```rust let audio_client = model.create_audio_client(); let session = audio_client.create_live_transcription_session(); session.settings.sample_rate = 16000; session.settings.channels = 1; session.settings.language = Some("en".into()); session.start(None).await?; // Push audio from microphone callback session.append(&pcm_bytes, None).await?; // Read results as async stream use tokio_stream::StreamExt; let mut stream = session.get_transcription_stream()?; while let Some(result) = stream.next().await { let result = result?; println!("{}", result.content[0].text); } session.stop(None).await?; ``` ### C# API parity | C# | Rust | Status | |----|------|--------| | `CreateLiveTranscriptionSession()` | `create_live_transcription_session()` | ✅ | | `StartAsync(CancellationToken)` | `start(Option<CancellationToken>)` | ✅ | | `AppendAsync(ReadOnlyMemory<byte>, CancellationToken)` | `append(&[u8], Option<CancellationToken>)` | ✅ | | `GetTranscriptionStream(CancellationToken)` | `get_transcription_stream()` | ✅ | | `StopAsync(CancellationToken)` + cancel-safe cleanup | `stop(Option<CancellationToken>)` + cancel-safe cleanup | ✅ | | `IAsyncDisposable.DisposeAsync()` | `Drop` with best-effort native stop | ✅ | | `LiveAudioTranscriptionResponse.Content[0].Text` | `response.content[0].text` | ✅ | | `LiveAudioTranscriptionResponse.Content[0].Transcript` | `response.content[0].transcript` | ✅ | | `LiveAudioTranscriptionResponse.IsFinal` | `response.is_final` | ✅ | | `LiveAudioTranscriptionResponse.StartTime/EndTime` | `response.start_time` / `response.end_time` | ✅ | | `LiveAudioTranscriptionOptions` (SampleRate, Channels, BitsPerSample, Language, PushQueueCapacity) | `LiveAudioTranscriptionOptions` (sample_rate, channels, bits_per_sample, language, push_queue_capacity) | ✅ | | `CoreErrorResponse.TryParse()` | `CoreErrorResponse::try_parse()` | ✅ | | Native commands: `audio_stream_start`, `audio_stream_push`, `audio_stream_stop` | Same commands via `execute_command` / `execute_command_with_binary` | ✅ | ### Design highlights - **CancellationToken support** — `start/append/stop` accept `Option<CancellationToken>` via `tokio_util::sync::CancellationToken` - **Cancel-safe stop** — `stop()` always performs native `audio_stream_stop` even if token fires, preventing native session leaks (matches C# `StopAsync` pattern) - **Response envelope** — `LiveAudioTranscriptionResponse` uses `content: Vec<ContentPart>` matching C#'s `ConversationItem.Content[0].Text/Transcript` - **Bounded push queue** — Backpressure via bounded channel (capacity=100); prevents unbounded memory growth - **Push loop on blocking thread** — `execute_command_with_binary` FFI calls run on `spawn_blocking`, keeping async runtime free - **Settings freeze** — Audio format settings are cloned at `start()` and immutable during the session - **Drop safety** — Best-effort synchronous `audio_stream_stop` in `Drop` to prevent native session leaks - **FFI null pointer safety** — Empty binary slices use `std::ptr::null()` to avoid dangling pointer across FFI boundary ### Verified working - ✅ SDK build succeeds (0 errors, 0 clippy warnings) - ✅ 13 unit tests passing (JSON deserialization, settings defaults, error parsing, content envelope) - ✅ E2E pipeline: Microphone (48kHz/2ch/F32) → Resample (16kHz/mono/16-bit) → SDK → Core.dll → onnxruntime-genai.dll → nemotron model - ✅ Synthetic audio test: 30 chunks (96KB PCM) pushed with clean session lifecycle - ✅ Live microphone test: real-time capture, session start/stop, no native errors ### Stats - **14 files changed**, **1,329 additions**, **2 deletions** --------- Co-authored-by: ruiren_microsoft <ruiren@microsoft.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 984892e commit 99b091f

8 files changed

Lines changed: 920 additions & 2 deletions

File tree

sdk/rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ serde_json = "1"
2222
thiserror = "2"
2323
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync"] }
2424
tokio-stream = "0.1"
25+
tokio-util = "0.7"
2526
futures-core = "0.3"
2627
reqwest = { version = "0.12", features = ["json"] }
2728
urlencoding = "2"

sdk/rust/src/detail/core_interop.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,19 @@ impl ResponseBuffer {
4848
}
4949
}
5050

51+
/// Request buffer with binary payload for `execute_command_with_binary`.
52+
///
53+
/// Used for audio streaming — carries both JSON params and raw PCM bytes.
54+
#[repr(C)]
55+
struct StreamingRequestBuffer {
56+
command: *const i8,
57+
command_length: i32,
58+
data: *const i8,
59+
data_length: i32,
60+
binary_data: *const u8,
61+
binary_data_length: i32,
62+
}
63+
5164
/// Signature for `execute_command`.
5265
type ExecuteCommandFn = unsafe extern "C" fn(*const RequestBuffer, *mut ResponseBuffer);
5366

@@ -63,6 +76,10 @@ type ExecuteCommandWithCallbackFn = unsafe extern "C" fn(
6376
*mut std::ffi::c_void,
6477
);
6578

79+
/// Signature for `execute_command_with_binary`.
80+
type ExecuteCommandWithBinaryFn =
81+
unsafe extern "C" fn(*const StreamingRequestBuffer, *mut ResponseBuffer);
82+
6683
// ── Library name helpers ─────────────────────────────────────────────────────
6784

6885
#[cfg(target_os = "windows")]
@@ -237,6 +254,8 @@ pub(crate) struct CoreInterop {
237254
CallbackFn,
238255
*mut std::ffi::c_void,
239256
),
257+
execute_command_with_binary:
258+
Option<unsafe extern "C" fn(*const StreamingRequestBuffer, *mut ResponseBuffer)>,
240259
}
241260

242261
impl std::fmt::Debug for CoreInterop {
@@ -307,12 +326,22 @@ impl CoreInterop {
307326
*sym
308327
};
309328

329+
// SAFETY: Same as above — symbol must match `ExecuteCommandWithBinaryFn`.
330+
// Optional: older native cores may not export this symbol (used for audio streaming).
331+
let execute_command_with_binary: Option<ExecuteCommandWithBinaryFn> = unsafe {
332+
library
333+
.get::<ExecuteCommandWithBinaryFn>(b"execute_command_with_binary\0")
334+
.ok()
335+
.map(|sym| *sym)
336+
};
337+
310338
Ok(Self {
311339
_library: library,
312340
#[cfg(target_os = "windows")]
313341
_dependency_libs,
314342
execute_command,
315343
execute_command_with_callback,
344+
execute_command_with_binary,
316345
})
317346
}
318347

@@ -354,6 +383,61 @@ impl CoreInterop {
354383
Self::process_response(response)
355384
}
356385

386+
/// Execute a command with an additional binary payload.
387+
///
388+
/// Used for audio streaming — `binary_data` carries raw PCM bytes
389+
/// alongside the JSON parameters.
390+
pub fn execute_command_with_binary(
391+
&self,
392+
command: &str,
393+
params: Option<&Value>,
394+
binary_data: &[u8],
395+
) -> Result<String> {
396+
let native_fn = self.execute_command_with_binary.ok_or_else(|| {
397+
FoundryLocalError::CommandExecution {
398+
reason: "execute_command_with_binary is not supported by this native core \
399+
(symbol not found)"
400+
.into(),
401+
}
402+
})?;
403+
404+
let cmd = CString::new(command).map_err(|e| FoundryLocalError::CommandExecution {
405+
reason: format!("Invalid command string: {e}"),
406+
})?;
407+
408+
let data_json = match params {
409+
Some(v) => serde_json::to_string(v)?,
410+
None => String::new(),
411+
};
412+
let data_cstr =
413+
CString::new(data_json.as_str()).map_err(|e| FoundryLocalError::CommandExecution {
414+
reason: format!("Invalid data string: {e}"),
415+
})?;
416+
417+
let request = StreamingRequestBuffer {
418+
command: cmd.as_ptr(),
419+
command_length: cmd.as_bytes().len() as i32,
420+
data: data_cstr.as_ptr(),
421+
data_length: data_cstr.as_bytes().len() as i32,
422+
binary_data: if binary_data.is_empty() {
423+
std::ptr::null()
424+
} else {
425+
binary_data.as_ptr()
426+
},
427+
binary_data_length: binary_data.len() as i32,
428+
};
429+
430+
let mut response = ResponseBuffer::new();
431+
432+
// SAFETY: `request` fields point into `cmd`, `data_cstr`, and
433+
// `binary_data` which are all alive for the duration of this call.
434+
unsafe {
435+
(native_fn)(&request, &mut response);
436+
}
437+
438+
Self::process_response(response)
439+
}
440+
357441
/// Execute a command that streams results back via `callback`.
358442
///
359443
/// Each chunk delivered by the native library is decoded as UTF-8 and

sdk/rust/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ pub use async_openai::types::chat::{
3131

3232
// Re-export OpenAI response types for convenience.
3333
pub use crate::openai::{
34-
AudioTranscriptionResponse, AudioTranscriptionStream, ChatCompletionStream,
35-
TranscriptionSegment, TranscriptionWord,
34+
AudioTranscriptionResponse, AudioTranscriptionStream, ChatCompletionStream, ContentPart,
35+
CoreErrorResponse, LiveAudioTranscriptionOptions, LiveAudioTranscriptionResponse,
36+
LiveAudioTranscriptionSession, LiveAudioTranscriptionStream, TranscriptionSegment,
37+
TranscriptionWord,
3638
};
3739
pub use async_openai::types::chat::{
3840
ChatChoice, ChatChoiceStream, ChatCompletionMessageToolCall,

sdk/rust/src/openai/audio_client.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::detail::core_interop::CoreInterop;
99
use crate::error::{FoundryLocalError, Result};
1010

1111
use super::json_stream::JsonStream;
12+
use super::live_audio_client::LiveAudioTranscriptionSession;
1213

1314
/// A segment of a transcription, as returned by the OpenAI-compatible API.
1415
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
@@ -196,6 +197,15 @@ impl AudioClient {
196197
Ok(AudioTranscriptionStream::new(rx))
197198
}
198199

200+
/// Create a [`LiveAudioTranscriptionSession`] for real-time audio
201+
/// streaming transcription.
202+
///
203+
/// Configure the session's [`settings`](LiveAudioTranscriptionSession::settings)
204+
/// before calling [`start`](LiveAudioTranscriptionSession::start).
205+
pub fn create_live_transcription_session(&self) -> LiveAudioTranscriptionSession {
206+
LiveAudioTranscriptionSession::new(&self.model_id, Arc::clone(&self.core))
207+
}
208+
199209
fn validate_path(path: &str) -> Result<()> {
200210
if path.trim().is_empty() {
201211
return Err(FoundryLocalError::Validation {

0 commit comments

Comments
 (0)