-
Notifications
You must be signed in to change notification settings - Fork 309
Expand file tree
/
Copy pathLiveAudioSession.cs
More file actions
385 lines (335 loc) · 14.7 KB
/
LiveAudioSession.cs
File metadata and controls
385 lines (335 loc) · 14.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
// --------------------------------------------------------------------------------------------------------------------
// <copyright company="Microsoft">
// Copyright (c) Microsoft. All rights reserved.
// </copyright>
// --------------------------------------------------------------------------------------------------------------------
namespace Microsoft.AI.Foundry.Local.OpenAI;
using System.Runtime.CompilerServices;
using System.Globalization;
using System.Threading.Channels;
using Microsoft.AI.Foundry.Local;
using Microsoft.AI.Foundry.Local.Detail;
using Microsoft.Extensions.Logging;
/// <summary>
/// Session for real-time audio streaming ASR (Automatic Speech Recognition).
/// Audio data from a microphone (or other source) is pushed in as PCM chunks,
/// and transcription results are returned as an async stream.
///
/// Created via <see cref="OpenAIAudioClient.CreateLiveTranscriptionSession"/>.
///
/// Thread safety: AppendAsync can be called from any thread (including high-frequency
/// audio callbacks). Pushes are internally serialized via a bounded channel to prevent
/// unbounded memory growth and ensure ordering.
/// </summary>
public sealed class LiveAudioTranscriptionSession : IAsyncDisposable
{
private readonly string _modelId;
private readonly ICoreInterop _coreInterop = FoundryLocalManager.Instance.CoreInterop;
private readonly ILogger _logger = FoundryLocalManager.Instance.Logger;
// Session state — protected by _lock
private readonly AsyncLock _lock = new();
private string? _sessionHandle;
private bool _started;
private bool _stopped;
// Output channel: native callback writes, user reads via GetStream
private Channel<LiveAudioTranscriptionResponse>? _outputChannel;
// Internal push queue: user writes audio chunks, background loop drains to native core.
// Bounded to prevent unbounded memory growth if native core is slower than real-time.
private Channel<ReadOnlyMemory<byte>>? _pushChannel;
private Task? _pushLoopTask;
// Dedicated CTS for the push loop — decoupled from StartAsync's caller token.
// Cancelled only during StopAsync/DisposeAsync to allow clean drain.
private CancellationTokenSource? _sessionCts;
// Snapshot of settings captured at StartAsync — prevents mutation after session starts.
private LiveAudioTranscriptionOptions? _activeSettings;
/// <summary>
/// Audio format settings for the streaming session.
/// Must be configured before calling <see cref="StartAsync"/>.
/// Settings are frozen once the session starts.
/// </summary>
public record LiveAudioTranscriptionOptions
{
/// <summary>PCM sample rate in Hz. Default: 16000.</summary>
public int SampleRate { get; set; } = 16000;
/// <summary>Number of audio channels. Default: 1 (mono).</summary>
public int Channels { get; set; } = 1;
/// <summary>Number of bits per audio sample. Default: 16.</summary>
public int BitsPerSample { get; set; } = 16;
/// <summary>Optional BCP-47 language hint (e.g., "en", "zh").</summary>
public string? Language { get; set; }
/// <summary>
/// Maximum number of audio chunks buffered in the internal push queue.
/// If the queue is full, AppendAsync will asynchronously wait.
/// Default: 100 (~3 seconds of audio at typical chunk sizes).
/// </summary>
public int PushQueueCapacity { get; set; } = 100;
internal LiveAudioTranscriptionOptions Snapshot() => this with { }; // record copy
}
public LiveAudioTranscriptionOptions Settings { get; } = new();
internal LiveAudioTranscriptionSession(string modelId)
{
_modelId = modelId;
}
/// <summary>
/// Start a real-time audio streaming session.
/// Must be called before <see cref="AppendAsync"/> or <see cref="GetStream"/>.
/// Settings are frozen after this call.
/// </summary>
/// <param name="ct">Cancellation token.</param>
public async Task StartAsync(CancellationToken ct = default)
{
using var disposable = await _lock.LockAsync().ConfigureAwait(false);
if (_started)
{
throw new FoundryLocalException("Streaming session already started. Call StopAsync first.");
}
// Freeze settings
_activeSettings = Settings.Snapshot();
_outputChannel = Channel.CreateUnbounded<LiveAudioTranscriptionResponse>(
new UnboundedChannelOptions
{
SingleWriter = true, // only the native callback writes
SingleReader = true,
AllowSynchronousContinuations = true
});
_pushChannel = Channel.CreateBounded<ReadOnlyMemory<byte>>(
new BoundedChannelOptions(_activeSettings.PushQueueCapacity)
{
SingleReader = true, // only the push loop reads
SingleWriter = false, // multiple threads may push audio data
FullMode = BoundedChannelFullMode.Wait
});
var request = new CoreInteropRequest
{
Params = new Dictionary<string, string>
{
{ "Model", _modelId },
{ "SampleRate", _activeSettings.SampleRate.ToString(CultureInfo.InvariantCulture) },
{ "Channels", _activeSettings.Channels.ToString(CultureInfo.InvariantCulture) },
{ "BitsPerSample", _activeSettings.BitsPerSample.ToString(CultureInfo.InvariantCulture) },
}
};
if (_activeSettings.Language != null)
{
request.Params["Language"] = _activeSettings.Language;
}
// StartAudioStream uses existing execute_command entry point — synchronous P/Invoke
var response = await Task.Run(
() => _coreInterop.StartAudioStream(request), ct)
.ConfigureAwait(false);
if (response.Error != null)
{
_outputChannel.Writer.TryComplete();
throw new FoundryLocalException(
$"Error starting audio stream session: {response.Error}", _logger);
}
_sessionHandle = response.Data
?? throw new FoundryLocalException("Native core did not return a session handle.", _logger);
_started = true;
_stopped = false;
_sessionCts?.Dispose();
_sessionCts = new CancellationTokenSource();
#pragma warning disable IDISP013 // Await in using — Task.Run is intentionally fire-and-forget here
_pushLoopTask = Task.Run(() => PushLoopAsync(_sessionCts.Token), CancellationToken.None);
#pragma warning restore IDISP013
}
/// <summary>
/// Push a chunk of raw PCM audio data to the streaming session.
/// Can be called from any thread (including audio device callbacks).
/// Chunks are internally queued and serialized to the native core.
/// </summary>
/// <param name="pcmData">Raw PCM audio bytes matching the configured format.</param>
/// <param name="ct">Cancellation token.</param>
public async ValueTask AppendAsync(ReadOnlyMemory<byte> pcmData, CancellationToken ct = default)
{
if (!_started || _stopped)
{
throw new FoundryLocalException("No active streaming session. Call StartAsync first.");
}
// Copy the data to avoid issues if the caller reuses the buffer (e.g. NAudio reuses e.Buffer)
var copy = new byte[pcmData.Length];
pcmData.CopyTo(copy);
await _pushChannel!.Writer.WriteAsync(copy, ct).ConfigureAwait(false);
}
/// <summary>
/// Internal loop that drains the push queue and sends chunks to native core one at a time.
/// Terminates the session on any native error.
/// </summary>
private async Task PushLoopAsync(CancellationToken ct)
{
try
{
await foreach (var audioData in _pushChannel!.Reader.ReadAllAsync(ct).ConfigureAwait(false))
{
var request = new CoreInteropRequest
{
Params = new Dictionary<string, string> { { "SessionHandle", _sessionHandle! } }
};
var response = _coreInterop.PushAudioData(request, audioData);
if (response.Error != null)
{
var errorInfo = CoreErrorResponse.TryParse(response.Error);
var fatalEx = new FoundryLocalException(
$"Push failed (code={errorInfo?.Code ?? "UNKNOWN"}): {response.Error}",
_logger);
_logger.LogError("Terminating push loop due to push failure: {Error}",
response.Error);
_outputChannel?.Writer.TryComplete(fatalEx);
return;
}
// Parse transcription result from push response and surface it
if (!string.IsNullOrEmpty(response.Data))
{
try
{
var transcription = LiveAudioTranscriptionResponse.FromJson(response.Data!);
if (!string.IsNullOrEmpty(transcription.Content?[0]?.Text))
{
_outputChannel?.Writer.TryWrite(transcription);
}
}
catch (Exception parseEx)
{
// Non-fatal: log and continue if response isn't a transcription result
_logger.LogDebug(parseEx, "Could not parse push response as transcription result");
}
}
}
}
catch (OperationCanceledException)
{
// Expected on cancellation — push loop exits cleanly
}
catch (Exception ex)
{
_logger.LogError(ex, "Push loop terminated with unexpected error");
_outputChannel?.Writer.TryComplete(
new FoundryLocalException("Push loop terminated unexpectedly.", ex, _logger));
}
}
/// <summary>
/// Get the async stream of transcription results.
/// Results arrive as the native ASR engine processes audio data.
/// </summary>
/// <param name="ct">Cancellation token.</param>
/// <returns>Async enumerable of transcription results.</returns>
public async IAsyncEnumerable<LiveAudioTranscriptionResponse> GetStream(
[EnumeratorCancellation] CancellationToken ct = default)
{
if (_outputChannel == null)
{
throw new FoundryLocalException("No active streaming session. Call StartAsync first.");
}
await foreach (var item in _outputChannel.Reader.ReadAllAsync(ct).ConfigureAwait(false))
{
yield return item;
}
}
/// <summary>
/// Signal end-of-audio and stop the streaming session.
/// Any remaining buffered audio in the push queue will be drained to native core first.
/// Final results are delivered through <see cref="GetStream"/> before it completes.
/// </summary>
/// <param name="ct">Cancellation token.</param>
public async Task StopAsync(CancellationToken ct = default)
{
using var disposable = await _lock.LockAsync().ConfigureAwait(false);
if (!_started || _stopped)
{
return; // already stopped or never started
}
_stopped = true;
// 1. Complete the push channel so the push loop drains remaining items and exits
_pushChannel?.Writer.TryComplete();
// 2. Wait for the push loop to finish draining
if (_pushLoopTask != null)
{
await _pushLoopTask.ConfigureAwait(false);
}
// 3. Cancel the session CTS (no-op if push loop already exited)
_sessionCts?.Cancel();
// 4. Tell native core to flush and finalize.
// This MUST happen even if ct is cancelled — otherwise native session leaks.
var request = new CoreInteropRequest
{
Params = new Dictionary<string, string> { { "SessionHandle", _sessionHandle! } }
};
ICoreInterop.Response? response = null;
try
{
response = await Task.Run(
() => _coreInterop.StopAudioStream(request), ct)
.ConfigureAwait(false);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// ct fired, but we MUST still stop the native session to avoid a leak.
_logger.LogWarning("StopAsync cancelled — performing best-effort native session stop.");
try
{
response = await Task.Run(
() => _coreInterop.StopAudioStream(request))
.ConfigureAwait(false);
}
catch (Exception cleanupEx)
{
_logger.LogError(cleanupEx, "Best-effort native session stop failed.");
}
throw; // Re-throw the cancellation after cleanup
}
finally
{
// Parse final transcription from stop response before completing the channel
if (response?.Data != null)
{
try
{
var finalResult = LiveAudioTranscriptionResponse.FromJson(response.Data);
if (!string.IsNullOrEmpty(finalResult.Content?[0]?.Text))
{
_outputChannel?.Writer.TryWrite(finalResult);
}
}
catch (Exception parseEx)
{
_logger.LogDebug(parseEx, "Could not parse stop response as transcription result");
}
}
_sessionHandle = null;
_started = false;
_sessionCts?.Dispose();
_sessionCts = null;
// Complete the output channel AFTER writing final result
_outputChannel?.Writer.TryComplete();
}
if (response?.Error != null)
{
throw new FoundryLocalException(
$"Error stopping audio stream session: {response.Error}", _logger);
}
}
/// <summary>
/// Dispose the streaming session. Calls <see cref="StopAsync"/> if the session is still active.
/// Safe to call multiple times.
/// </summary>
public async ValueTask DisposeAsync()
{
try
{
if (_started && !_stopped)
{
await StopAsync().ConfigureAwait(false);
}
}
catch (Exception ex)
{
// DisposeAsync must never throw — log and swallow
_logger.LogWarning(ex, "Error during DisposeAsync cleanup.");
}
finally
{
_sessionCts?.Dispose();
_lock.Dispose();
}
}
}