Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/weak-zoos-attack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@livekit/agents-plugin-phonic': patch
'@livekit/agents': patch
---

support new realtime model capability for native transcript synchronization, set to true for phonic
1 change: 1 addition & 0 deletions agents/src/llm/realtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export interface RealtimeCapabilities {
midSessionInstructionsUpdate?: boolean;
midSessionToolsUpdate?: boolean;
perResponseToolChoice?: boolean;
nativeTranscriptSync?: boolean;
}

export interface InputTranscriptionCompleted {
Expand Down
29 changes: 28 additions & 1 deletion agents/src/voice/room_io/room_io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from '@livekit/rtc-node';
import type { WritableStreamDefaultWriter } from 'node:stream/web';
import { ATTRIBUTE_PUBLISH_ON_BEHALF, TOPIC_CHAT } from '../../constants.js';
import { RealtimeModel } from '../../llm/index.js';
import { log } from '../../log.js';
import { IdentityTransform } from '../../stream/identity_transform.js';
import { Future, Task, waitForAbort } from '../../utils.js';
Expand All @@ -26,11 +27,15 @@ import {
AgentSessionEventTypes,
type AgentStateChangedEvent,
CloseReason,
type ConversationItemAddedEvent,
type UserInputTranscribedEvent,
} from '../events.js';
import type { AudioOutput, TextOutput } from '../io.js';
import type { TextInputCallback } from '../remote_session.js';
import { TranscriptionSynchronizer } from '../transcription/synchronizer.js';
import {
TranscriptionSynchronizer,
defaultTextSyncOptions,
} from '../transcription/synchronizer.js';
import { ParticipantAudioInputStream } from './_input.js';
import {
ParalellTextOutput,
Expand Down Expand Up @@ -271,6 +276,16 @@ export class RoomIO {
});
};

private onConversationItemAdded = (ev: ConversationItemAddedEvent) => {
if (ev.item.type !== 'agent_handoff' || !this.transcriptionSynchronizer) {
return;
}
const sessionLlm = this.agentSession.currentAgent?.llm ?? this.agentSession.llm;
const nativeTranscriptSync =
sessionLlm instanceof RealtimeModel && !!sessionLlm.capabilities.nativeTranscriptSync;
this.transcriptionSynchronizer.enabled = !nativeTranscriptSync;
};

private onAgentStateChanged = async (ev: AgentStateChangedEvent) => {
if (this.room.isConnected && this.room.localParticipant) {
await this.room.localParticipant.setAttributes({
Expand Down Expand Up @@ -489,9 +504,13 @@ export class RoomIO {
// TODO(AJS-176): check for agent output
const audioOutput = this.participantAudioOutput;
if (this.outputOptions.syncTranscription && audioOutput) {
const sessionLlm = this.agentSession.currentAgent?.llm ?? this.agentSession.llm;
const nativeTranscriptSync =
sessionLlm instanceof RealtimeModel && !!sessionLlm.capabilities.nativeTranscriptSync;
this.transcriptionSynchronizer = new TranscriptionSynchronizer(
audioOutput,
this.agentTranscriptOutput,
{ ...defaultTextSyncOptions, enabled: !nativeTranscriptSync },
);
}
}
Expand Down Expand Up @@ -519,6 +538,10 @@ export class RoomIO {

this.agentSession.on(AgentSessionEventTypes.AgentStateChanged, this.onAgentStateChanged);
this.agentSession.on(AgentSessionEventTypes.UserInputTranscribed, this.onUserInputTranscribed);
this.agentSession.on(
AgentSessionEventTypes.ConversationItemAdded,
this.onConversationItemAdded,
);
}

async close() {
Expand All @@ -527,6 +550,10 @@ export class RoomIO {
this.room.off(RoomEvent.ParticipantDisconnected, this.onParticipantDisconnected);
this.agentSession.off(AgentSessionEventTypes.UserInputTranscribed, this.onUserInputTranscribed);
this.agentSession.off(AgentSessionEventTypes.AgentStateChanged, this.onAgentStateChanged);
this.agentSession.off(
AgentSessionEventTypes.ConversationItemAdded,
this.onConversationItemAdded,
);

if (this.textStreamHandlerRegistered) {
this.room.unregisterTextStreamHandler(TOPIC_CHAT);
Expand Down
91 changes: 59 additions & 32 deletions agents/src/voice/transcription/synchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ interface TextSyncOptions {
hyphenateWord: (word: string) => string[];
splitWords: (words: string) => [string, number, number][];
wordTokenizer: WordTokenizer;
enabled: boolean;
}

interface TextData {
Expand Down Expand Up @@ -140,6 +141,7 @@ interface AudioData {
}

class SegmentSynchronizerImpl {
private enabled: boolean;
private textData: TextData;
private audioData: AudioData;
private speed: number;
Expand Down Expand Up @@ -169,6 +171,7 @@ class SegmentSynchronizerImpl {
*/
private readonly seedFromPushAudio: boolean = false,
) {
this.enabled = options.enabled;
this.speed = options.speed * STANDARD_SPEECH_RATE; // hyphens per second
this.textData = {
wordStream: options.wordTokenizer.stream(),
Expand All @@ -185,13 +188,15 @@ class SegmentSynchronizerImpl {
this.outputStream = new IdentityTransform();
this.outputStreamWriter = this.outputStream.writable.getWriter();

this.mainTask()
.then(() => {
this.outputStreamWriter.close();
})
.catch((error) => {
this.logger.error({ error }, 'mainTask SegmentSynchronizerImpl');
});
if (this.enabled) {
this.mainTask()
.then(() => {
this.outputStreamWriter.close();
})
.catch((error) => {
this.logger.error({ error }, 'mainTask SegmentSynchronizerImpl');
});
}
this.captureTask = this.captureTaskImpl();
}

Expand Down Expand Up @@ -266,26 +271,19 @@ class SegmentSynchronizerImpl {
return;
}

// Check if text is a TimedString (has timing information)
let textStr: string;
const textStr = isTimedString(text) ? text.text : text;
let startTime: number | undefined;
let endTime: number | undefined;

if (isTimedString(text)) {
// This is a TimedString
textStr = text.text;
startTime = text.startTime;
endTime = text.endTime;

// Create annotatedRate if it doesn't exist
if (!this.audioData.annotatedRate) {
this.audioData.annotatedRate = new SpeakingRateData();
}

// Add the timing annotation
this.audioData.annotatedRate.addByAnnotation(textStr, startTime, endTime);
} else {
textStr = text;
}

this.textData.wordStream.pushText(textStr);
Expand All @@ -299,7 +297,11 @@ class SegmentSynchronizerImpl {
}

this.textData.done = true;
this.textData.wordStream.endInput();
if (!this.enabled) {
this.outputStreamWriter.close();
} else {
this.textData.wordStream.endInput();
}
}

markPlaybackFinished(_playbackPosition: number, interrupted: boolean) {
Expand Down Expand Up @@ -457,6 +459,10 @@ class SegmentSynchronizerImpl {
}

this.startFuture.resolve(); // avoid deadlock of mainTaskImpl in case it never started
// Close the writer if endTextInput hasn't already done so (e.g. on interruption)
if (!this.enabled && !this.textData.done) {
this.outputStreamWriter.close();
}
this.textData.wordStream.close();
await this.captureTask;
}
Expand All @@ -467,13 +473,15 @@ export interface TranscriptionSynchronizerOptions {
hyphenateWord: (word: string) => string[];
splitWords: (words: string) => [string, number, number][];
wordTokenizer: WordTokenizer;
enabled: boolean;
}

export const defaultTextSyncOptions: TranscriptionSynchronizerOptions = {
speed: 1,
hyphenateWord: basic.hyphenateWord,
splitWords: basic.splitWords,
wordTokenizer: new basic.WordTokenizer(false),
enabled: true,
};

export class TranscriptionSynchronizer {
Expand All @@ -482,7 +490,7 @@ export class TranscriptionSynchronizer {

private options: TextSyncOptions;
private rotateSegmentTask: Task<void>;
private _enabled: boolean = true;
private _outputsAttached: boolean = true;
private closed: boolean = false;

/** @internal */
Expand Down Expand Up @@ -511,6 +519,7 @@ export class TranscriptionSynchronizer {
hyphenateWord: options.hyphenateWord,
splitWords: options.splitWords,
wordTokenizer: options.wordTokenizer,
enabled: options.enabled,
};

// initial segment/first segment, recreated for each new segment
Expand All @@ -520,19 +529,19 @@ export class TranscriptionSynchronizer {
);
}

get outputsAttached(): boolean {
return this._outputsAttached;
}

get enabled(): boolean {
return this._enabled;
return this.options.enabled;
}

set enabled(enabled: boolean) {
if (this._enabled === enabled) {
set enabled(value: boolean) {
if (this.options.enabled === value) {
return;
}

this._enabled = enabled;
if (enabled) {
this._warnedAsymmetricDetach = false;
}
this.options.enabled = value;
this.rotateSegment();
}

Expand All @@ -544,7 +553,15 @@ export class TranscriptionSynchronizer {
if (args.textAttached !== undefined) {
this._textAttached = args.textAttached;
}
this.enabled = this._audioAttached && this._textAttached;
const outputsAttached = this._audioAttached && this._textAttached;
if (this._outputsAttached === outputsAttached) {
return;
}
this._outputsAttached = outputsAttached;
if (outputsAttached) {
this._warnedAsymmetricDetach = false;
}
this.rotateSegment();
}

rotateSegment() {
Expand Down Expand Up @@ -608,7 +625,7 @@ class SyncedAudioOutput extends AudioOutput {
// TODO(AJS-102): use frame.durationMs once available in rtc-node
this.pushedDuration += frame.samplesPerChannel / frame.sampleRate;

if (!this.synchronizer.enabled) {
if (!this.synchronizer.outputsAttached) {
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
if (
this.synchronizer._audioAttached &&
!this.synchronizer._textAttached &&
Expand All @@ -624,6 +641,10 @@ class SyncedAudioOutput extends AudioOutput {
return;
}

if (!this.synchronizer.enabled) {
return;
}

if (this.synchronizer._impl.audioInputEnded) {
this.logger.warn(
'SegmentSynchronizerImpl audio marked as ended in capture audio, rotating segment',
Expand All @@ -638,7 +659,7 @@ class SyncedAudioOutput extends AudioOutput {
super.flush();
this.nextInChainAudio.flush();

if (!this.synchronizer.enabled) {
if (!this.synchronizer.outputsAttached || !this.synchronizer.enabled) {
return;
}

Expand Down Expand Up @@ -666,14 +687,14 @@ class SyncedAudioOutput extends AudioOutput {
// this is going to be automatically called by the next_in_chain
onPlaybackStarted(createdAt: number): void {
super.onPlaybackStarted(createdAt);
if (this.synchronizer.enabled) {
if (this.synchronizer.outputsAttached && this.synchronizer.enabled) {
this.synchronizer._impl.onPlaybackStarted(createdAt);
}
}

// this is going to be automatically called by the next_in_chain
onPlaybackFinished(ev: PlaybackFinishedEvent) {
if (!this.synchronizer.enabled) {
if (!this.synchronizer.outputsAttached || !this.synchronizer.enabled) {
super.onPlaybackFinished(ev);
return;
}
Expand Down Expand Up @@ -717,6 +738,11 @@ class SyncedTextOutput extends TextOutput {
const textStr = isTimedString(text) ? text.text : text;

if (!this.synchronizer.enabled) {
await this.nextInChain.captureText(textStr);
return;
}

if (!this.synchronizer.outputsAttached) {
if (
this.synchronizer._textAttached &&
!this.synchronizer._audioAttached &&
Expand Down Expand Up @@ -750,8 +776,9 @@ class SyncedTextOutput extends TextOutput {
// Wait for any pending rotation to complete before accessing _impl
await this.synchronizer.barrier();

if (!this.synchronizer.enabled) {
this.nextInChain.flush(); // passthrough text if the synchronizer is disabled
if (!this.synchronizer.enabled || !this.synchronizer.outputsAttached) {
this.capturing = false;
this.nextInChain.flush();
return;
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
}
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

Expand Down
1 change: 1 addition & 0 deletions plugins/phonic/src/realtime/realtime_model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ export class RealtimeModel extends llm.RealtimeModel {
midSessionInstructionsUpdate: true,
midSessionToolsUpdate: true,
perResponseToolChoice: false,
nativeTranscriptSync: true,
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
});

const apiKey = options.apiKey || process.env.PHONIC_API_KEY;
Expand Down
Loading