Skip to content

Commit 9e9148c

Browse files
(realtime capabilities): add support for native transcript synchronization (#1329)
Co-authored-by: Brian Yin <brian.yin@livekit.io>
1 parent 3d2f01d commit 9e9148c

5 files changed

Lines changed: 95 additions & 33 deletions

File tree

.changeset/weak-zoos-attack.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@livekit/agents-plugin-phonic': patch
3+
'@livekit/agents': patch
4+
---
5+
6+
support new realtime model capability for native transcript synchronization, set to true for phonic

agents/src/llm/realtime.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ export interface RealtimeCapabilities {
5454
midSessionInstructionsUpdate?: boolean;
5555
midSessionToolsUpdate?: boolean;
5656
perResponseToolChoice?: boolean;
57+
nativeTranscriptSync?: boolean;
5758
}
5859

5960
export interface InputTranscriptionCompleted {

agents/src/voice/room_io/room_io.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
} from '@livekit/rtc-node';
1919
import type { WritableStreamDefaultWriter } from 'node:stream/web';
2020
import { ATTRIBUTE_PUBLISH_ON_BEHALF, TOPIC_CHAT } from '../../constants.js';
21+
import { RealtimeModel } from '../../llm/index.js';
2122
import { log } from '../../log.js';
2223
import { IdentityTransform } from '../../stream/identity_transform.js';
2324
import { Future, Task, waitForAbort } from '../../utils.js';
@@ -26,11 +27,15 @@ import {
2627
AgentSessionEventTypes,
2728
type AgentStateChangedEvent,
2829
CloseReason,
30+
type ConversationItemAddedEvent,
2931
type UserInputTranscribedEvent,
3032
} from '../events.js';
3133
import type { AudioOutput, TextOutput } from '../io.js';
3234
import type { TextInputCallback } from '../remote_session.js';
33-
import { TranscriptionSynchronizer } from '../transcription/synchronizer.js';
35+
import {
36+
TranscriptionSynchronizer,
37+
defaultTextSyncOptions,
38+
} from '../transcription/synchronizer.js';
3439
import { ParticipantAudioInputStream } from './_input.js';
3540
import {
3641
ParalellTextOutput,
@@ -277,6 +282,16 @@ export class RoomIO {
277282
});
278283
};
279284

285+
private onConversationItemAdded = (ev: ConversationItemAddedEvent) => {
286+
if (ev.item.type !== 'agent_handoff' || !this.transcriptionSynchronizer) {
287+
return;
288+
}
289+
const sessionLlm = this.agentSession.currentAgent?.llm ?? this.agentSession.llm;
290+
const nativeTranscriptSync =
291+
sessionLlm instanceof RealtimeModel && !!sessionLlm.capabilities.nativeTranscriptSync;
292+
this.transcriptionSynchronizer.enabled = !nativeTranscriptSync;
293+
};
294+
280295
private onAgentStateChanged = async (ev: AgentStateChangedEvent) => {
281296
if (this.room.isConnected && this.room.localParticipant) {
282297
await this.room.localParticipant.setAttributes({
@@ -497,9 +512,13 @@ export class RoomIO {
497512
// TODO(AJS-176): check for agent output
498513
const audioOutput = this.participantAudioOutput;
499514
if (this.outputOptions.syncTranscription && audioOutput) {
515+
const sessionLlm = this.agentSession.currentAgent?.llm ?? this.agentSession.llm;
516+
const nativeTranscriptSync =
517+
sessionLlm instanceof RealtimeModel && !!sessionLlm.capabilities.nativeTranscriptSync;
500518
this.transcriptionSynchronizer = new TranscriptionSynchronizer(
501519
audioOutput,
502520
this.agentTranscriptOutput,
521+
{ ...defaultTextSyncOptions, enabled: !nativeTranscriptSync },
503522
);
504523
}
505524
}
@@ -527,6 +546,10 @@ export class RoomIO {
527546

528547
this.agentSession.on(AgentSessionEventTypes.AgentStateChanged, this.onAgentStateChanged);
529548
this.agentSession.on(AgentSessionEventTypes.UserInputTranscribed, this.onUserInputTranscribed);
549+
this.agentSession.on(
550+
AgentSessionEventTypes.ConversationItemAdded,
551+
this.onConversationItemAdded,
552+
);
530553
}
531554

532555
async close() {
@@ -535,6 +558,10 @@ export class RoomIO {
535558
this.room.off(RoomEvent.ParticipantDisconnected, this.onParticipantDisconnected);
536559
this.agentSession.off(AgentSessionEventTypes.UserInputTranscribed, this.onUserInputTranscribed);
537560
this.agentSession.off(AgentSessionEventTypes.AgentStateChanged, this.onAgentStateChanged);
561+
this.agentSession.off(
562+
AgentSessionEventTypes.ConversationItemAdded,
563+
this.onConversationItemAdded,
564+
);
538565

539566
if (this.textStreamHandlerRegistered) {
540567
this.room.unregisterTextStreamHandler(TOPIC_CHAT);

agents/src/voice/transcription/synchronizer.ts

Lines changed: 59 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ interface TextSyncOptions {
2424
hyphenateWord: (word: string) => string[];
2525
splitWords: (words: string) => [string, number, number][];
2626
wordTokenizer: WordTokenizer;
27+
enabled: boolean;
2728
}
2829

2930
interface TextData {
@@ -141,6 +142,7 @@ interface AudioData {
141142
}
142143

143144
class SegmentSynchronizerImpl {
145+
private enabled: boolean;
144146
private textData: TextData;
145147
private audioData: AudioData;
146148
private speed: number;
@@ -172,6 +174,7 @@ class SegmentSynchronizerImpl {
172174
*/
173175
private readonly seedFromPushAudio: boolean = false,
174176
) {
177+
this.enabled = options.enabled;
175178
this.speed = options.speed * STANDARD_SPEECH_RATE; // hyphens per second
176179
this.textData = {
177180
wordStream: options.wordTokenizer.stream(),
@@ -188,13 +191,15 @@ class SegmentSynchronizerImpl {
188191
this.outputStream = new IdentityTransform();
189192
this.outputStreamWriter = this.outputStream.writable.getWriter();
190193

191-
this.mainTask()
192-
.then(() => {
193-
this.outputStreamWriter.close();
194-
})
195-
.catch((error) => {
196-
this.logger.error({ error }, 'mainTask SegmentSynchronizerImpl');
197-
});
194+
if (this.enabled) {
195+
this.mainTask()
196+
.then(() => {
197+
this.outputStreamWriter.close();
198+
})
199+
.catch((error) => {
200+
this.logger.error({ error }, 'mainTask SegmentSynchronizerImpl');
201+
});
202+
}
198203
this.captureTask = this.captureTaskImpl();
199204
}
200205

@@ -269,26 +274,19 @@ class SegmentSynchronizerImpl {
269274
return;
270275
}
271276

272-
// Check if text is a TimedString (has timing information)
273-
let textStr: string;
277+
const textStr = isTimedString(text) ? text.text : text;
274278
let startTime: number | undefined;
275279
let endTime: number | undefined;
276280

277281
if (isTimedString(text)) {
278-
// This is a TimedString
279-
textStr = text.text;
280282
startTime = text.startTime;
281283
endTime = text.endTime;
282284

283-
// Create annotatedRate if it doesn't exist
284285
if (!this.audioData.annotatedRate) {
285286
this.audioData.annotatedRate = new SpeakingRateData();
286287
}
287288

288-
// Add the timing annotation
289289
this.audioData.annotatedRate.addByAnnotation(textStr, startTime, endTime);
290-
} else {
291-
textStr = text;
292290
}
293291

294292
this.textData.wordStream.pushText(textStr);
@@ -302,7 +300,11 @@ class SegmentSynchronizerImpl {
302300
}
303301

304302
this.textData.done = true;
305-
this.textData.wordStream.endInput();
303+
if (!this.enabled) {
304+
this.outputStreamWriter.close();
305+
} else {
306+
this.textData.wordStream.endInput();
307+
}
306308
}
307309

308310
markPlaybackFinished(_playbackPosition: number, interrupted: boolean) {
@@ -485,6 +487,10 @@ class SegmentSynchronizerImpl {
485487
}
486488

487489
this.startFuture.resolve(); // avoid deadlock of mainTaskImpl in case it never started
490+
// Close the writer if endTextInput hasn't already done so (e.g. on interruption)
491+
if (!this.enabled && !this.textData.done) {
492+
this.outputStreamWriter.close();
493+
}
488494
this.textData.wordStream.close();
489495
await this.captureTask;
490496
}
@@ -495,13 +501,15 @@ export interface TranscriptionSynchronizerOptions {
495501
hyphenateWord: (word: string) => string[];
496502
splitWords: (words: string) => [string, number, number][];
497503
wordTokenizer: WordTokenizer;
504+
enabled: boolean;
498505
}
499506

500507
export const defaultTextSyncOptions: TranscriptionSynchronizerOptions = {
501508
speed: 1,
502509
hyphenateWord: basic.hyphenateWord,
503510
splitWords: basic.splitWords,
504511
wordTokenizer: new basic.WordTokenizer(false),
512+
enabled: true,
505513
};
506514

507515
export class TranscriptionSynchronizer {
@@ -510,7 +518,7 @@ export class TranscriptionSynchronizer {
510518

511519
private options: TextSyncOptions;
512520
private rotateSegmentTask: Task<void>;
513-
private _enabled: boolean = true;
521+
private _outputsAttached: boolean = true;
514522
private closed: boolean = false;
515523

516524
/** @internal */
@@ -539,6 +547,7 @@ export class TranscriptionSynchronizer {
539547
hyphenateWord: options.hyphenateWord,
540548
splitWords: options.splitWords,
541549
wordTokenizer: options.wordTokenizer,
550+
enabled: options.enabled,
542551
};
543552

544553
// initial segment/first segment, recreated for each new segment
@@ -548,19 +557,19 @@ export class TranscriptionSynchronizer {
548557
);
549558
}
550559

560+
get outputsAttached(): boolean {
561+
return this._outputsAttached;
562+
}
563+
551564
get enabled(): boolean {
552-
return this._enabled;
565+
return this.options.enabled;
553566
}
554567

555-
set enabled(enabled: boolean) {
556-
if (this._enabled === enabled) {
568+
set enabled(value: boolean) {
569+
if (this.options.enabled === value) {
557570
return;
558571
}
559-
560-
this._enabled = enabled;
561-
if (enabled) {
562-
this._warnedAsymmetricDetach = false;
563-
}
572+
this.options.enabled = value;
564573
this.rotateSegment();
565574
}
566575

@@ -572,7 +581,15 @@ export class TranscriptionSynchronizer {
572581
if (args.textAttached !== undefined) {
573582
this._textAttached = args.textAttached;
574583
}
575-
this.enabled = this._audioAttached && this._textAttached;
584+
const outputsAttached = this._audioAttached && this._textAttached;
585+
if (this._outputsAttached === outputsAttached) {
586+
return;
587+
}
588+
this._outputsAttached = outputsAttached;
589+
if (outputsAttached) {
590+
this._warnedAsymmetricDetach = false;
591+
}
592+
this.rotateSegment();
576593
}
577594

578595
rotateSegment() {
@@ -636,7 +653,7 @@ class SyncedAudioOutput extends AudioOutput {
636653
// TODO(AJS-102): use frame.durationMs once available in rtc-node
637654
this.pushedDuration += frame.samplesPerChannel / frame.sampleRate;
638655

639-
if (!this.synchronizer.enabled) {
656+
if (!this.synchronizer.outputsAttached) {
640657
if (
641658
this.synchronizer._audioAttached &&
642659
!this.synchronizer._textAttached &&
@@ -652,6 +669,10 @@ class SyncedAudioOutput extends AudioOutput {
652669
return;
653670
}
654671

672+
if (!this.synchronizer.enabled) {
673+
return;
674+
}
675+
655676
if (this.synchronizer._impl.audioInputEnded) {
656677
this.logger.warn(
657678
'SegmentSynchronizerImpl audio marked as ended in capture audio, rotating segment',
@@ -666,7 +687,7 @@ class SyncedAudioOutput extends AudioOutput {
666687
super.flush();
667688
this.nextInChainAudio.flush();
668689

669-
if (!this.synchronizer.enabled) {
690+
if (!this.synchronizer.outputsAttached || !this.synchronizer.enabled) {
670691
return;
671692
}
672693

@@ -694,14 +715,14 @@ class SyncedAudioOutput extends AudioOutput {
694715
// this is going to be automatically called by the next_in_chain
695716
onPlaybackStarted(createdAt: number): void {
696717
super.onPlaybackStarted(createdAt);
697-
if (this.synchronizer.enabled) {
718+
if (this.synchronizer.outputsAttached && this.synchronizer.enabled) {
698719
this.synchronizer._impl.onPlaybackStarted(createdAt);
699720
}
700721
}
701722

702723
// this is going to be automatically called by the next_in_chain
703724
onPlaybackFinished(ev: PlaybackFinishedEvent) {
704-
if (!this.synchronizer.enabled) {
725+
if (!this.synchronizer.outputsAttached || !this.synchronizer.enabled) {
705726
super.onPlaybackFinished(ev);
706727
return;
707728
}
@@ -745,6 +766,11 @@ class SyncedTextOutput extends TextOutput {
745766
const textStr = isTimedString(text) ? text.text : text;
746767

747768
if (!this.synchronizer.enabled) {
769+
await this.nextInChain.captureText(textStr);
770+
return;
771+
}
772+
773+
if (!this.synchronizer.outputsAttached) {
748774
if (
749775
this.synchronizer._textAttached &&
750776
!this.synchronizer._audioAttached &&
@@ -778,8 +804,9 @@ class SyncedTextOutput extends TextOutput {
778804
// Wait for any pending rotation to complete before accessing _impl
779805
await this.synchronizer.barrier();
780806

781-
if (!this.synchronizer.enabled) {
782-
this.nextInChain.flush(); // passthrough text if the synchronizer is disabled
807+
if (!this.synchronizer.enabled || !this.synchronizer.outputsAttached) {
808+
this.capturing = false;
809+
this.nextInChain.flush();
783810
return;
784811
}
785812

plugins/phonic/src/realtime/realtime_model.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ export class RealtimeModel extends llm.RealtimeModel {
165165
midSessionInstructionsUpdate: true,
166166
midSessionToolsUpdate: true,
167167
perResponseToolChoice: false,
168+
nativeTranscriptSync: true,
168169
});
169170

170171
const apiKey = options.apiKey || process.env.PHONIC_API_KEY;

0 commit comments

Comments
 (0)