@@ -39,7 +39,7 @@ import (
3939)
4040
4141func init () {
42- AgentCommands [0 ].Commands = append (AgentCommands [0 ].Commands , simulateCommand , exportScenariosCommand )
42+ AgentCommands [0 ].Commands = append (AgentCommands [0 ].Commands , simulateCommand )
4343}
4444
4545var (
@@ -71,6 +71,10 @@ var simulateCommand = &cli.Command{
7171 Aliases : []string {"n" },
7272 Usage : "Number of scenarios to generate" ,
7373 },
74+ & cli.IntFlag {
75+ Name : "concurrency" ,
76+ Usage : "Max simulations running in parallel (default: server-side limit)" ,
77+ },
7478 & cli.StringFlag {
7579 Name : "scenarios" ,
7680 Usage : "Path to a scenarios `FILE` (yaml). If omitted, scenarios are generated from the agent's source" ,
@@ -83,78 +87,33 @@ var simulateCommand = &cli.Command{
8387 },
8488}
8589
86- var exportScenariosCommand = & cli.Command {
87- Name : "export-scenarios" ,
88- Usage : "Export a simulation run's scenarios to a scenarios.yaml" ,
89- ArgsUsage : "<simulation-run-id>" ,
90- Before : func (ctx context.Context , cmd * cli.Command ) (context.Context , error ) {
91- pc , err := loadProjectDetails (cmd )
92- if err != nil {
93- return nil , err
94- }
95- simulateProjectConfig = pc
96- return nil , nil
97- },
98- Action : runExportScenarios ,
99- Flags : []cli.Flag {
100- & cli.StringFlag {
101- Name : "output" ,
102- Aliases : []string {"o" },
103- Usage : "Write to `FILE` instead of stdout" ,
104- },
105- },
106- }
107-
108- func runExportScenarios (ctx context.Context , cmd * cli.Command ) error {
109- runID := cmd .Args ().First ()
110- if runID == "" {
111- return fmt .Errorf ("a simulation run ID is required" )
112- }
113-
114- pc := simulateProjectConfig
115- client := lksdk .NewAgentSimulationClient (serverURL , pc .APIKey , pc .APISecret )
116- resp , err := client .GetSimulationRun (ctx , & livekit.SimulationRun_Get_Request {SimulationRunId : runID })
117- if err != nil {
118- return fmt .Errorf ("failed to get simulation run: %w" , err )
119- }
120-
121- group := resp .GetRun ().GetScenarioGroup ()
90+ // writeGeneratedScenariosTemp writes a generated run's scenarios to a temp
91+ // scenarios.yaml; "" when the run carries none.
92+ func writeGeneratedScenariosTemp (run * livekit.SimulationRun ) (string , error ) {
93+ group := run .GetScenarioGroup ()
12294 if group == nil || len (group .GetScenarios ()) == 0 {
123- return fmt . Errorf ( "simulation run %q has no scenarios to export " , runID )
95+ return " " , nil
12496 }
125-
12697 out , err := scenarioGroupToYAML (group )
12798 if err != nil {
128- return err
99+ return "" , err
129100 }
130-
131- if path := cmd .String ("output" ); path != "" {
132- // Never overwrite: refuse if the file already exists so the caller picks
133- // another name (O_EXCL makes the check atomic).
134- f , err := os .OpenFile (path , os .O_WRONLY | os .O_CREATE | os .O_EXCL , 0o644 )
135- if os .IsExist (err ) {
136- return fmt .Errorf ("%s already exists; refusing to overwrite (choose a different --output path)" , path )
137- }
138- if err != nil {
139- return fmt .Errorf ("failed to write %s: %w" , path , err )
140- }
141- _ , werr := f .Write (out )
142- if cerr := f .Close (); werr == nil {
143- werr = cerr
144- }
145- if werr != nil {
146- return fmt .Errorf ("failed to write %s: %w" , path , werr )
147- }
148- fmt .Printf ("Wrote %d scenarios to %s\n " , len (group .GetScenarios ()), path )
149- return nil
101+ f , err := os .CreateTemp ("" , "scenarios-*.yaml" )
102+ if err != nil {
103+ return "" , err
104+ }
105+ _ , werr := f .Write (out )
106+ if cerr := f .Close (); werr == nil {
107+ werr = cerr
108+ }
109+ if werr != nil {
110+ return "" , werr
150111 }
151- _ , err = os .Stdout .Write (out )
152- return err
112+ return f .Name (), nil
153113}
154114
155- // scenarioGroupToYAML renders a ScenarioGroup as a scenarios.yaml document — the
156- // inverse of loadScenarioGroup, decoding each scenario's JSON userdata string
157- // back into a nested mapping.
115+ // scenarioGroupToYAML renders a ScenarioGroup as a scenarios.yaml document, the
116+ // inverse of loadScenarioGroup.
158117func scenarioGroupToYAML (group * livekit.ScenarioGroup ) ([]byte , error ) {
159118 f := scenariosFile {Name : group .GetName ()}
160119 for _ , s := range group .GetScenarios () {
@@ -176,9 +135,8 @@ func scenarioGroupToYAML(group *livekit.ScenarioGroup) ([]byte, error) {
176135 return yaml .Marshal (f )
177136}
178137
179- // scenariosFile mirrors a scenarios.yaml (the source of truth for scenarios).
180- // It maps field-for-field onto livekit.ScenarioGroup; `userdata` is written as a
181- // nested mapping here and JSON-encoded into the proto's string field.
138+ // scenariosFile mirrors a scenarios.yaml; `userdata` is a nested mapping here
139+ // and JSON-encoded into the proto's string field.
182140type scenariosFile struct {
183141 Name string `yaml:"name"`
184142 Scenarios []yamlScenario `yaml:"scenarios"`
@@ -192,12 +150,12 @@ type yamlScenario struct {
192150 Userdata map [string ]any `yaml:"userdata"`
193151}
194152
195- // simulateConfig holds all parameters needed to run a simulation in either TUI or CI mode.
196153type simulateConfig struct {
197154 ctx context.Context
198155 client * lksdk.AgentSimulationClient
199156 pc * config.ProjectConfig
200157 numSimulations int32
158+ concurrency int32
201159 mode simulateMode
202160 agentName string
203161 projectDir string
@@ -207,16 +165,13 @@ type simulateConfig struct {
207165 scenariosPath string // path to the --scenarios file (empty when generating from source)
208166}
209167
210- // simulateMode represents how scenarios are sourced.
211168type simulateMode int
212169
213170const (
214171 modeScenarios simulateMode = iota
215172 modeGenerateFromSource
216173)
217174
218- // loadScenarioGroup reads a scenarios.yaml into a livekit.ScenarioGroup, JSON-encoding
219- // each scenario's nested `userdata` mapping into the proto's string field.
220175func loadScenarioGroup (path string ) (* livekit.ScenarioGroup , error ) {
221176 data , err := os .ReadFile (path )
222177 if err != nil {
@@ -261,6 +216,7 @@ func runSimulate(ctx context.Context, cmd *cli.Command) error {
261216 pc := simulateProjectConfig
262217
263218 numSimulations := int32 (cmd .Int ("num-simulations" ))
219+ concurrency := int32 (cmd .Int ("concurrency" ))
264220 agentName := generateAgentName ()
265221
266222 projectDir , projectType , err := agentfs .DetectProjectRoot ("." )
@@ -276,9 +232,8 @@ func runSimulate(ctx context.Context, cmd *cli.Command) error {
276232 return err
277233 }
278234
279- // The scenarios file must be specified explicitly via --scenarios; we never
280- // auto-discover one. When provided, those scenarios are the source of truth;
281- // otherwise scenarios are generated from the agent's source.
235+ // never auto-discovered: an explicit --scenarios file is the source of
236+ // truth, otherwise scenarios are generated from the agent's source
282237 scenariosPath := cmd .String ("scenarios" )
283238
284239 var scenarioGroup * livekit.ScenarioGroup
@@ -296,8 +251,6 @@ func runSimulate(ctx context.Context, cmd *cli.Command) error {
296251 mode = modeGenerateFromSource
297252 }
298253
299- // Generating from source uploads the agent's code to LiveKit Cloud, so make
300- // the user agree to it explicitly before anything is sent.
301254 if mode == modeGenerateFromSource {
302255 if err := confirmSourceUpload (cmd , projectDir ); err != nil {
303256 return err
@@ -311,6 +264,7 @@ func runSimulate(ctx context.Context, cmd *cli.Command) error {
311264 client : simClient ,
312265 pc : pc ,
313266 numSimulations : numSimulations ,
267+ concurrency : concurrency ,
314268 mode : mode ,
315269 agentName : agentName ,
316270 projectDir : projectDir ,
@@ -333,8 +287,8 @@ func isInteractive() bool {
333287 return isatty .IsTerminal (os .Stdin .Fd ()) && isatty .IsTerminal (os .Stdout .Fd ())
334288}
335289
336- // confirmSourceUpload makes the user explicitly agree that their agent's source
337- // code will be uploaded to LiveKit Cloud before generating scenarios from it .
290+ // confirmSourceUpload makes the user agree before their agent's source is
291+ // uploaded to LiveKit Cloud.
338292func confirmSourceUpload (cmd * cli.Command , projectDir string ) error {
339293 if cmd .Bool ("yes" ) {
340294 return nil
@@ -369,6 +323,51 @@ func confirmSourceUpload(cmd *cli.Command, projectDir string) error {
369323
370324// --- Shared lifecycle functions used by both TUI and CI modes ---
371325
326+ // agentLauncher owns the agent subprocess lifecycle around the TUI. Stop kills
327+ // the worker even when the TUI quits mid-start; a leaked worker keeps its port
328+ // bound and breaks the next run.
329+ type agentLauncher struct {
330+ done chan struct {}
331+ proc * AgentProcess
332+ err error
333+ }
334+
335+ func launchSimulationAgent (c * simulateConfig ) * agentLauncher {
336+ l := & agentLauncher {done : make (chan struct {})}
337+ go func () {
338+ l .proc , l .err = startSimulationAgent (c , nil )
339+ close (l .done )
340+ }()
341+ return l
342+ }
343+
344+ func (l * agentLauncher ) Wait () (* AgentProcess , error ) {
345+ <- l .done
346+ return l .proc , l .err
347+ }
348+
349+ // Stop kills the agent once the start attempt finishes (bounded wait) and
350+ // returns it for post-exit reporting.
351+ func (l * agentLauncher ) Stop () * AgentProcess {
352+ select {
353+ case <- l .done :
354+ case <- time .After (10 * time .Second ):
355+ return nil
356+ }
357+ if l .proc != nil {
358+ l .proc .Kill ()
359+ }
360+ return l .proc
361+ }
362+
363+ // ForceStop kills the agent immediately, without the SIGINT grace.
364+ func (l * agentLauncher ) ForceStop () {
365+ <- l .done
366+ if l .proc != nil {
367+ l .proc .ForceKill ()
368+ }
369+ }
370+
372371func startSimulationAgent (c * simulateConfig , forwardOutput io.Writer ) (* AgentProcess , error ) {
373372 return startAgent (AgentStartConfig {
374373 Dir : c .projectDir ,
@@ -381,11 +380,12 @@ func startSimulationAgent(c *simulateConfig, forwardOutput io.Writer) (*AgentPro
381380 "--api-secret" , c .pc .APISecret ,
382381 "--log-level" , "DEBUG" ,
383382 "--log-format" , "colored" ,
383+ // disable the worker load limit so the run can saturate the agent
384+ "--simulation" ,
384385 },
385386 Env : []string {
386- // force the agent to register under the dispatch name regardless of any
387- // agent_name hardcoded in the user's code (see LIVEKIT_AGENT_NAME_OVERRIDE
388- // precedence in livekit-agents worker.py).
387+ // register under the dispatch name regardless of any agent_name
388+ // hardcoded in the user's code
389389 "LIVEKIT_AGENT_NAME_OVERRIDE=" + c .agentName ,
390390 "LIVEKIT_URL=" + c .pc .URL ,
391391 "LIVEKIT_API_KEY=" + c .pc .APIKey ,
@@ -401,9 +401,10 @@ func createSimulationRun(ctx context.Context, c *simulateConfig) (string, *livek
401401 AgentName : c .agentName ,
402402 NumSimulations : c .numSimulations ,
403403 }
404+ if c .concurrency > 0 {
405+ req .Concurrency = & c .concurrency
406+ }
404407 if c .mode == modeScenarios {
405- // Run the scenarios from the yaml. When unset, the server generates
406- // num_simulations scenarios from the uploaded source.
407408 req .ScenarioGroup = c .scenarioGroup
408409 }
409410
0 commit comments