Skip to content

Commit 1b962a3

Browse files
committed
feat(worker): simulation mode (skip HTTP server, disable load limit, agent name override)
Add a --simulation CLI flag (on the start command, set by `lk simulate`) that threads through to worker options. In simulation mode: skip worker HTTP server creation/start/close so concurrent runs don't collide on a fixed port, disable the worker load limit (loadThreshold = Infinity) so runs can saturate the agent, and resolve agentName from LIVEKIT_AGENT_NAME_OVERRIDE first so lk simulate dispatch matches. Ported from livekit/agents#6055
1 parent 0047ffa commit 1b962a3

4 files changed

Lines changed: 62 additions & 31 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@livekit/agents': patch
3+
---
4+
5+
Simulation fixes: do not start the worker HTTP server, disable the worker load limit so runs can saturate the agent, and honor `LIVEKIT_AGENT_NAME_OVERRIDE` so the worker registers under the name `lk simulate` dispatches to.

agents/src/cli.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const runServer = async (args: CliArgs) => {
3030

3131
// though `production` is defined in ServerOptions, it will always be overridden by CLI.
3232
const { production: _, ...opts } = args.opts; // eslint-disable-line @typescript-eslint/no-unused-vars
33-
const server = new AgentServer(new ServerOptions({ production: args.production, ...opts }));
33+
const server = new AgentServer(new ServerOptions({ ...opts, production: args.production }));
3434

3535
if (args.room) {
3636
server.event.once('worker_registered', () => {
@@ -136,6 +136,13 @@ export const runApp = (opts: ServerOptions) => {
136136
.command('start')
137137
.description('Start the worker in production mode')
138138
.addOption(logLevelOption('info'))
139+
.addOption(
140+
new Option(
141+
'--simulation',
142+
'Run under an agent simulation: the worker load limit is disabled so runs ' +
143+
'can saturate the agent, and the worker HTTP server is not started. Set by `lk simulate`.',
144+
).hideHelp(),
145+
)
139146
.action((...[, command]) => {
140147
const globalOptions = program.optsWithGlobals();
141148
const commandOptions = command.opts();
@@ -144,6 +151,7 @@ export const runApp = (opts: ServerOptions) => {
144151
opts.apiSecret = globalOptions.apiSecret || opts.apiSecret;
145152
opts.logLevel = commandOptions.logLevel;
146153
opts.workerToken = globalOptions.workerToken || opts.workerToken;
154+
opts.simulation = commandOptions.simulation || opts.simulation;
147155
runServer({
148156
opts,
149157
production: true,

agents/src/worker.ts

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ export class ServerOptions {
144144
port: number;
145145
logLevel: string;
146146
production: boolean;
147+
simulation: boolean;
147148
jobMemoryWarnMB: number;
148149
jobMemoryLimitMB: number;
149150

@@ -169,6 +170,7 @@ export class ServerOptions {
169170
port = undefined,
170171
logLevel = 'info',
171172
production = false,
173+
simulation = false,
172174
jobMemoryWarnMB = 500,
173175
jobMemoryLimitMB = 0,
174176
}: {
@@ -210,6 +212,7 @@ export class ServerOptions {
210212
port?: number;
211213
logLevel?: string;
212214
production?: boolean;
215+
simulation?: boolean;
213216
jobMemoryWarnMB?: number;
214217
jobMemoryLimitMB?: number;
215218
}) {
@@ -219,14 +222,19 @@ export class ServerOptions {
219222
}
220223
this.requestFunc = requestFunc;
221224
this.loadFunc = loadFunc;
222-
this.loadThreshold = loadThreshold || Default.loadThreshold(production);
225+
this.loadThreshold = simulation ? Infinity : loadThreshold || Default.loadThreshold(production);
223226
this.numIdleProcesses = numIdleProcesses || Default.numIdleProcesses(production);
224227
this.shutdownProcessTimeout = shutdownProcessTimeout;
225228
this.initializeProcessTimeout = initializeProcessTimeout;
226229
this.permissions = permissions;
227230
// agentNameIsEnv may be passed explicitly when ServerOptions is re-constructed (e.g.
228231
// cli.ts spreads an existing ServerOptions instance), so prefer it when defined.
229-
if (agentName) {
232+
if (process.env.LIVEKIT_AGENT_NAME_OVERRIDE) {
233+
// Highest priority: `lk simulate` sets this to force the worker to register
234+
// under the agent name it dispatches to, overriding any configured agentName.
235+
this.agentName = process.env.LIVEKIT_AGENT_NAME_OVERRIDE;
236+
this.agentNameIsEnv = agentNameIsEnv ?? true;
237+
} else if (agentName) {
230238
this.agentName = agentName;
231239
this.agentNameIsEnv = agentNameIsEnv ?? false;
232240
} else if (process.env.LIVEKIT_AGENT_NAME) {
@@ -246,6 +254,7 @@ export class ServerOptions {
246254
this.port = port || Default.port(production);
247255
this.logLevel = logLevel;
248256
this.production = production;
257+
this.simulation = simulation;
249258
this.jobMemoryWarnMB = jobMemoryWarnMB;
250259
this.jobMemoryLimitMB = jobMemoryLimitMB;
251260
}
@@ -283,7 +292,7 @@ export class AgentServer {
283292

284293
event = new EventEmitter();
285294
#session: WebSocket | undefined = undefined;
286-
#httpServer: HTTPServer;
295+
#httpServer?: HTTPServer;
287296
#logger = log().child({ version });
288297
#inferenceExecutor?: InferenceProcExecutor;
289298

@@ -336,35 +345,39 @@ export class AgentServer {
336345

337346
this.#opts = opts;
338347

339-
const healthCheck = () => {
340-
// Check if inference executor exists and is not alive
341-
if (this.#inferenceExecutor && !this.#inferenceExecutor.isAlive) {
342-
return { healthy: false, message: 'inference process not running' };
343-
}
348+
// Simulations run ephemeral workers side by side; a health endpoint on a fixed port would make
349+
// concurrent runs collide.
350+
if (!opts.simulation) {
351+
const healthCheck = () => {
352+
// Check if inference executor exists and is not alive
353+
if (this.#inferenceExecutor && !this.#inferenceExecutor.isAlive) {
354+
return { healthy: false, message: 'inference process not running' };
355+
}
344356

345-
// Only healthy when fully connected with an active WebSocket
346-
if (
347-
this.#closed ||
348-
this.#connecting ||
349-
!this.#session ||
350-
this.#session.readyState !== WebSocket.OPEN
351-
) {
352-
return { healthy: false, message: 'not connected to livekit' };
353-
}
357+
// Only healthy when fully connected with an active WebSocket
358+
if (
359+
this.#closed ||
360+
this.#connecting ||
361+
!this.#session ||
362+
this.#session.readyState !== WebSocket.OPEN
363+
) {
364+
return { healthy: false, message: 'not connected to livekit' };
365+
}
354366

355-
return { healthy: true, message: 'OK' };
356-
};
367+
return { healthy: true, message: 'OK' };
368+
};
357369

358-
const getWorkerInfo = () => ({
359-
agent_name: opts.agentName,
360-
agent_name_is_env: opts.agentNameIsEnv,
361-
worker_type: JobType[opts.serverType],
362-
active_jobs: this.activeJobs.length,
363-
sdk_version: version,
364-
project_type: PROJECT_TYPE,
365-
});
370+
const getWorkerInfo = () => ({
371+
agent_name: opts.agentName,
372+
agent_name_is_env: opts.agentNameIsEnv,
373+
worker_type: JobType[opts.serverType],
374+
active_jobs: this.activeJobs.length,
375+
sdk_version: version,
376+
project_type: PROJECT_TYPE,
377+
});
366378

367-
this.#httpServer = new HTTPServer(opts.host, opts.port, healthCheck, getWorkerInfo);
379+
this.#httpServer = new HTTPServer(opts.host, opts.port, healthCheck, getWorkerInfo);
380+
}
368381
}
369382

370383
/** @throws {@link WorkerError} if worker failed to connect or already running */
@@ -431,7 +444,11 @@ export class AgentServer {
431444
}
432445
};
433446

434-
await ThrowsPromise.all([workerWS(), this.#httpServer.run()]);
447+
const tasks = [workerWS()];
448+
if (this.#httpServer) {
449+
tasks.push(this.#httpServer.run());
450+
}
451+
await ThrowsPromise.all(tasks);
435452
this.#close.resolve();
436453
}
437454

@@ -875,7 +892,7 @@ export class AgentServer {
875892

876893
await this.#inferenceExecutor?.close();
877894
await this.#procPool.close();
878-
await this.#httpServer.close();
895+
await this.#httpServer?.close();
879896
await ThrowsPromise.allSettled(this.#tasks);
880897

881898
this.#session?.close();

turbo.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
"LLAMA_API_KEY",
4343
"LIVEKIT_AGENT_ID",
4444
"LIVEKIT_AGENT_NAME",
45+
"LIVEKIT_AGENT_NAME_OVERRIDE",
4546
"LOG_LEVEL",
4647
"OCTOAI_TOKEN",
4748
"OPENAI_API_KEY",

0 commit comments

Comments
 (0)