-
Notifications
You must be signed in to change notification settings - Fork 40
added an optional environment parameter to agent.run_batch #205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,7 +29,7 @@ | |
| Thought, | ||
| TrainingText, | ||
| ) | ||
| from tapeagents.llms import LLM, LLMCall, LLMEvent, LLMOutput, LLMStream, TrainableLLM | ||
| from tapeagents.llms import LLM, LLMCall, LLMEvent, LLMOutput, LLMStream | ||
| from tapeagents.observe import observe_llm_call | ||
| from tapeagents.tool_calling import ToolSpec | ||
| from tapeagents.view import TapeViewStack | ||
|
|
@@ -718,39 +718,78 @@ def _run_implementation(): | |
|
|
||
| return AgentStream(_run_implementation()) | ||
|
|
||
| def run_batch(self: Agent[TapeType], tapes: list[TapeType]) -> list[Tape]: | ||
| """Run agent in parallel on tapes using batched LLM calls. | ||
|
|
||
| This is faster than running agents in thread and having the LLM server batch the calls. | ||
|
|
||
| def run_batch(self: Agent[TapeType], tapes: list[Tape], environment = None) -> list[Tape]: | ||
| """ | ||
| Run agent in parallel on tapes using batched LLM calls with optional environment integration. | ||
|
|
||
| Args: | ||
| tapes: List of tapes to process in parallel | ||
| environment: Optional environment with a react(tape) method | ||
|
|
||
| Returns: | ||
| List of processed tapes | ||
| """ | ||
| if len(self.llms) > 1: | ||
| raise NotImplementedError("For run_agent_batch the agent must have only one LLM for now") | ||
| if not isinstance(self.llm, TrainableLLM): | ||
| raise NotImplementedError("For run_agent_batch the LLM must be TrainableLLM") | ||
| raise NotImplementedError("For batch processing the agent must have only one LLM for now") | ||
|
|
||
| llm = self.llms.get("default") or next(iter(self.llms.values())) | ||
| if not hasattr(llm, "batch_generate"): | ||
| raise NotImplementedError("The LLM must support batch_generate method") | ||
|
|
||
| # Check environment has react method if provided | ||
| if environment is not None and (not hasattr(environment, "react") or not callable(environment.react)): | ||
| raise ValueError("Environment must have a callable react method") | ||
|
|
||
| original_tapes = list(tapes) | ||
| n_iterations = 0 | ||
| active_indices = set(range(len(tapes))) | ||
| while n_iterations < self.max_iterations: | ||
| prompts = [] | ||
| current_subagents = [self.delegate(tapes[i]) for i in active_indices] | ||
| prompts = [subagent.make_prompt(tape) for subagent, tape in zip(current_subagents, tapes)] | ||
| llm_calls = self.llm.batch_generate(prompts) | ||
| for i in active_indices: | ||
| # Run the equivalent of agent.run_iteration | ||
| pending_tape_indices = set(range(len(tapes))) | ||
|
|
||
| while n_iterations < self.max_iterations and pending_tape_indices: | ||
| # Create batch structure with tape indices, subagents, and prompts | ||
| batch = [] | ||
| for tape_idx in list(pending_tape_indices): | ||
| subagent = self.delegate(tapes[tape_idx]) | ||
| prompt = subagent.make_prompt(tapes[tape_idx]) | ||
| batch.append((tape_idx, subagent, prompt)) | ||
|
|
||
| if not batch: | ||
| break | ||
|
|
||
| # Batch LLM calls | ||
| prompts = [item[2] for item in batch] | ||
| llm_responses = llm.batch_generate(prompts) | ||
|
|
||
| # Process results for each tape in the batch | ||
| for batch_pos, (tape_idx, subagent, _) in enumerate(batch): | ||
| llm_call = llm_responses[batch_pos] | ||
|
|
||
| # Create LLM stream | ||
| llm_stream = LLMStream( | ||
| (LLMEvent(output=output) for output in (llm_calls[i].output,)), llm_calls[i].prompt | ||
| (LLMEvent(output=llm_call.output) for _ in range(1)), | ||
| llm_call.prompt | ||
| ) | ||
| for step in self.generate_steps(tapes[i], llm_stream): | ||
| step.metadata.agent = current_subagents[i].full_name | ||
|
|
||
| # Process steps directly using generate_steps | ||
| for step in subagent.generate_steps(tapes[tape_idx], llm_stream): | ||
| if isinstance(step, AgentStep): | ||
| step.metadata.prompt_id = llm_calls[i].prompt.id | ||
| tapes[i] = tapes[i].append(step) | ||
| if self.should_stop(tapes[i]): | ||
| active_indices.remove(i) | ||
| if self.store_llm_calls: | ||
| step.metadata.other["llm_call"] = llm_calls[i] | ||
| step.metadata.prompt_id = llm_call.prompt.id | ||
| tapes[tape_idx] = tapes[tape_idx].append(step) | ||
|
|
||
| # Store LLM call in metadata if needed | ||
| if subagent.store_llm_calls: | ||
| step.metadata.other["llm_call"] = llm_call | ||
|
|
||
| # Apply environment reactions if environment is provided | ||
| if environment is not None: | ||
| tapes[tape_idx] = environment.react(tapes[tape_idx]) | ||
|
Comment on lines
+783
to
+784
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing Stop Check After Environment React
Tell me moreWhat is the issue?The environment reaction is applied after each step generation, but it's unclear if steps after the environment reaction should lead to agent termination. Why this mattersIf the environment's reaction creates a state that should stop the agent, this isn't currently checked, potentially leading to unnecessary continued processing. Suggested change ∙ Feature PreviewAdd a stop check after environment reaction: # Apply environment reactions if environment is provided
if environment is not None:
tapes[tape_idx] = environment.react(tapes[tape_idx])
# Check if agent should stop after environment reaction
if subagent.should_stop(tapes[tape_idx]):
pending_tape_indices.remove(tape_idx)
continueProvide feedback to improve future suggestions💬 Looking for more details? Reply to this comment to chat with Korbit.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if this is really relevant, is it possible for the agent to stop immediately after an environment reaction? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You bring up a good point, we may need to consider cases where the |
||
|
|
||
| # Check if agent should stop for this tape | ||
| if subagent.should_stop(tapes[tape_idx]): | ||
| pending_tape_indices.remove(tape_idx) | ||
|
|
||
| n_iterations += 1 | ||
|
|
||
| # Update metadata for all tapes | ||
| for i in range(len(tapes)): | ||
| updated_metadata = original_tapes[i].metadata.model_validate( | ||
| dict( | ||
|
|
@@ -760,6 +799,7 @@ def run_batch(self: Agent[TapeType], tapes: list[TapeType]) -> list[Tape]: | |
| ) | ||
| ) | ||
| tapes[i] = tapes[i].model_copy(update=dict(metadata=updated_metadata)) | ||
|
|
||
| return tapes | ||
|
|
||
| def reuse(self, tape: TapeType) -> tuple[TapeType, list[LLMCall]]: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-deterministic LLM Selection
Tell me more
What is the issue?
The LLM selection logic is not deterministic when there's no 'default' LLM. Using next(iter()) on a dictionary can return any key-value pair.
Why this matters
This non-deterministic behavior could lead to inconsistent results across different runs, especially when batch processing multiple tapes.
Suggested change ∙ Feature Preview
Replace with explicit LLM selection logic that guarantees consistent behavior:
Provide feedback to improve future suggestions
💬 Looking for more details? Reply to this comment to chat with Korbit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is false because I check if the length of the llm array is more than 1, so this is unnecessary. this is deterministic.