Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 66 additions & 26 deletions tapeagents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
Thought,
TrainingText,
)
from tapeagents.llms import LLM, LLMCall, LLMEvent, LLMOutput, LLMStream, TrainableLLM
from tapeagents.llms import LLM, LLMCall, LLMEvent, LLMOutput, LLMStream
from tapeagents.observe import observe_llm_call
from tapeagents.tool_calling import ToolSpec
from tapeagents.view import TapeViewStack
Expand Down Expand Up @@ -718,39 +718,78 @@ def _run_implementation():

return AgentStream(_run_implementation())

def run_batch(self: Agent[TapeType], tapes: list[TapeType]) -> list[Tape]:
"""Run agent in parallel on tapes using batched LLM calls.

This is faster than running agents in thread and having the LLM server batch the calls.

def run_batch(self: Agent[TapeType], tapes: list[Tape], environment = None) -> list[Tape]:
"""
Run agent in parallel on tapes using batched LLM calls with optional environment integration.

Args:
tapes: List of tapes to process in parallel
environment: Optional environment with a react(tape) method

Returns:
List of processed tapes
"""
if len(self.llms) > 1:
raise NotImplementedError("For run_agent_batch the agent must have only one LLM for now")
if not isinstance(self.llm, TrainableLLM):
raise NotImplementedError("For run_agent_batch the LLM must be TrainableLLM")
raise NotImplementedError("For batch processing the agent must have only one LLM for now")

llm = self.llms.get("default") or next(iter(self.llms.values()))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-deterministic LLM Selection category Functionality

Tell me more
What is the issue?

The LLM selection logic is not deterministic when there's no 'default' LLM. Using next(iter()) on a dictionary can return any key-value pair.

Why this matters

This non-deterministic behavior could lead to inconsistent results across different runs, especially when batch processing multiple tapes.

Suggested change ∙ Feature Preview

Replace with explicit LLM selection logic that guarantees consistent behavior:

if 'default' in self.llms:
    llm = self.llms['default']
elif len(self.llms) == 1:
    llm = next(iter(self.llms.values()))
else:
    raise ValueError("No default LLM specified and multiple LLMs present")
Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is false because I check if the length of the llm array is more than 1, so this is unnecessary. this is deterministic.

if not hasattr(llm, "batch_generate"):
raise NotImplementedError("The LLM must support batch_generate method")

# Check environment has react method if provided
if environment is not None and (not hasattr(environment, "react") or not callable(environment.react)):
raise ValueError("Environment must have a callable react method")

original_tapes = list(tapes)
n_iterations = 0
active_indices = set(range(len(tapes)))
while n_iterations < self.max_iterations:
prompts = []
current_subagents = [self.delegate(tapes[i]) for i in active_indices]
prompts = [subagent.make_prompt(tape) for subagent, tape in zip(current_subagents, tapes)]
llm_calls = self.llm.batch_generate(prompts)
for i in active_indices:
# Run the equivalent of agent.run_iteration
pending_tape_indices = set(range(len(tapes)))

while n_iterations < self.max_iterations and pending_tape_indices:
# Create batch structure with tape indices, subagents, and prompts
batch = []
for tape_idx in list(pending_tape_indices):
subagent = self.delegate(tapes[tape_idx])
prompt = subagent.make_prompt(tapes[tape_idx])
batch.append((tape_idx, subagent, prompt))

if not batch:
break

# Batch LLM calls
prompts = [item[2] for item in batch]
llm_responses = llm.batch_generate(prompts)

# Process results for each tape in the batch
for batch_pos, (tape_idx, subagent, _) in enumerate(batch):
llm_call = llm_responses[batch_pos]

# Create LLM stream
llm_stream = LLMStream(
(LLMEvent(output=output) for output in (llm_calls[i].output,)), llm_calls[i].prompt
(LLMEvent(output=llm_call.output) for _ in range(1)),
llm_call.prompt
)
for step in self.generate_steps(tapes[i], llm_stream):
step.metadata.agent = current_subagents[i].full_name

# Process steps directly using generate_steps
for step in subagent.generate_steps(tapes[tape_idx], llm_stream):
if isinstance(step, AgentStep):
step.metadata.prompt_id = llm_calls[i].prompt.id
tapes[i] = tapes[i].append(step)
if self.should_stop(tapes[i]):
active_indices.remove(i)
if self.store_llm_calls:
step.metadata.other["llm_call"] = llm_calls[i]
step.metadata.prompt_id = llm_call.prompt.id
tapes[tape_idx] = tapes[tape_idx].append(step)

# Store LLM call in metadata if needed
if subagent.store_llm_calls:
step.metadata.other["llm_call"] = llm_call

# Apply environment reactions if environment is provided
if environment is not None:
tapes[tape_idx] = environment.react(tapes[tape_idx])
Comment on lines +783 to +784
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Stop Check After Environment React category Functionality

Tell me more
What is the issue?

The environment reaction is applied after each step generation, but it's unclear if steps after the environment reaction should lead to agent termination.

Why this matters

If the environment's reaction creates a state that should stop the agent, this isn't currently checked, potentially leading to unnecessary continued processing.

Suggested change ∙ Feature Preview

Add a stop check after environment reaction:

# Apply environment reactions if environment is provided
if environment is not None:
    tapes[tape_idx] = environment.react(tapes[tape_idx])
    # Check if agent should stop after environment reaction
    if subagent.should_stop(tapes[tape_idx]):
        pending_tape_indices.remove(tape_idx)
        continue
Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is really relevant, is it possible for the agent to stop immediately after an environment reaction?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You bring up a good point, we may need to consider cases where the agent should stop immediately after an environment reaction, but doesn't. I would suggest adding this logic as @korbit-ai advised. Although the conditions under which the agent would need to stop after an environment reaction are still ambiguous, I believe adding this validation check increases the robustness of our code. What do you think?


# Check if agent should stop for this tape
if subagent.should_stop(tapes[tape_idx]):
pending_tape_indices.remove(tape_idx)

n_iterations += 1

# Update metadata for all tapes
for i in range(len(tapes)):
updated_metadata = original_tapes[i].metadata.model_validate(
dict(
Expand All @@ -760,6 +799,7 @@ def run_batch(self: Agent[TapeType], tapes: list[TapeType]) -> list[Tape]:
)
)
tapes[i] = tapes[i].model_copy(update=dict(metadata=updated_metadata))

return tapes

def reuse(self, tape: TapeType) -> tuple[TapeType, list[LLMCall]]:
Expand Down