Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions core/src/agents/base_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ export abstract class BaseAgent {
* @returns An AsyncGenerator that yields the events generated by the agent.
*/
async *runLive(
parentContext: InvocationContext, // eslint-disable-line @typescript-eslint/no-unused-vars
parentContext: InvocationContext,
): AsyncGenerator<Event, void, void> {
const span = tracer.startSpan(`invoke_agent ${this.name}`);
const ctx = trace.setSpan(context.active(), span);
Expand All @@ -226,10 +226,33 @@ export abstract class BaseAgent {
ctx,
this,
async function* () {
// TODO(b/425992518): Implement live mode.
const context = this.createInvocationContext(parentContext);

const beforeAgentCallbackEvent =
await this.handleBeforeAgentCallback(context);
if (beforeAgentCallbackEvent) {
yield beforeAgentCallbackEvent;
}

if (context.endInvocation || parentContext.abortSignal?.aborted) {
return;
}

for await (const event of this.runLiveImpl(context)) {
yield event;
}

if (context.endInvocation || parentContext.abortSignal?.aborted) {
return;
}

const afterAgentCallbackEvent =
await this.handleAfterAgentCallback(context);
if (afterAgentCallbackEvent) {
yield afterAgentCallbackEvent;
}
},
);
throw new Error('Live mode is not implemented yet.');
} finally {
span.end();
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/agents/invocation_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export interface InvocationContextParams {
activeStreamingTools?: Record<string, ActiveStreamingTool>;
pluginManager: PluginManager;
abortSignal?: AbortSignal;
liveSessionResumptionHandle?: string;
}

/**
Expand Down Expand Up @@ -191,6 +192,14 @@ export class InvocationContext {
*/
readonly abortSignal?: AbortSignal;

/**
* Most recent Gemini Live session resumption handle. Updated from
* `sessionResumptionUpdate` events on the active connection and replayed
* via `liveConnectConfig.sessionResumption` when reconnecting so the
* server can restore in-flight state without client-side history replay.
*/
liveSessionResumptionHandle?: string;

/**
* @param params The parameters for creating an invocation context.
*/
Expand All @@ -210,6 +219,7 @@ export class InvocationContext {
this.activeStreamingTools = params.activeStreamingTools;
this.pluginManager = params.pluginManager;
this.abortSignal = params.abortSignal;
this.liveSessionResumptionHandle = params.liveSessionResumptionHandle;
}

/**
Expand Down
27 changes: 24 additions & 3 deletions core/src/agents/live_request_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,38 @@ export class LiveRequestQueue {
/**
* Retrieves a request from the queue. If the queue is empty, it will
* wait until a request is available.
*
* @param abortSignal Optional signal. If it aborts while this call is
* waiting, the pending waiter is removed from the queue and the
* returned promise rejects -- so a torn-down consumer does not strand
* a waiter that would later consume (and drop) a request.
* @returns A promise that resolves with the next available request.
*/
async get(): Promise<LiveRequest> {
async get(abortSignal?: AbortSignal): Promise<LiveRequest> {
if (this.queue.length > 0) {
return this.queue.shift()!;
}
if (this.isClosed) {
return {close: true};
}
return new Promise<LiveRequest>((resolve) => {
this.resolveFnFifoQueue.push(resolve);
if (abortSignal?.aborted) {
throw new Error('LiveRequestQueue.get() was aborted.');
}
return new Promise<LiveRequest>((resolve, reject) => {
let resolveFn: PromiseResolveFn;
const onAbort = () => {
const index = this.resolveFnFifoQueue.indexOf(resolveFn);
if (index !== -1) {
this.resolveFnFifoQueue.splice(index, 1);
}
reject(new Error('LiveRequestQueue.get() was aborted.'));
};
resolveFn = (req: LiveRequest) => {
abortSignal?.removeEventListener('abort', onAbort);
resolve(req);
};
this.resolveFnFifoQueue.push(resolveFn);
abortSignal?.addEventListener('abort', onAbort, {once: true});
});
}

Expand Down
Loading