-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: implement parallel tool execution (Gap 2) with backward compatibility #1401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e11d456
86f7c53
99b4dda
74fd40f
5c74d64
11bbf9a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,8 @@ | |
| import time | ||
| import json | ||
| import xml.etree.ElementTree as ET | ||
| # Gap 2: Tool call execution imports | ||
| from ..tools.call_executor import ToolCall, create_tool_call_executor | ||
| # Display functions - lazy loaded to avoid importing rich at startup | ||
| # These are only needed when output=verbose | ||
| _display_module = None | ||
|
|
@@ -1649,6 +1651,7 @@ def get_response( | |
| task_description: Optional[str] = None, | ||
| task_id: Optional[str] = None, | ||
| execute_tool_fn: Optional[Callable] = None, | ||
| parallel_tool_calls: bool = False, # Gap 2: Enable parallel tool execution | ||
| stream: bool = True, | ||
| stream_callback: Optional[Callable] = None, | ||
| emit_events: bool = False, | ||
|
|
@@ -1893,26 +1896,47 @@ def _prepare_return_value(text: str) -> Union[str, tuple]: | |
| "tool_calls": serializable_tool_calls, | ||
| }) | ||
|
|
||
| tool_results = [] | ||
| # Execute tool calls using ToolCallExecutor (Gap 2: parallel or sequential) | ||
| is_ollama = self._is_ollama_provider() | ||
| tool_calls_batch = [] | ||
|
|
||
| # Prepare batch of ToolCall objects | ||
| for tool_call in tool_calls: | ||
| function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call) | ||
|
|
||
| logging.debug(f"[RESPONSES_API] Executing tool {function_name} with args: {arguments}") | ||
| tool_result = execute_tool_fn(function_name, arguments, tool_call_id=tool_call_id) | ||
| function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call, is_ollama=is_ollama) | ||
| tool_calls_batch.append(ToolCall( | ||
| function_name=function_name, | ||
| arguments=arguments, | ||
| tool_call_id=tool_call_id, | ||
| is_ollama=is_ollama | ||
| )) | ||
|
|
||
| # Create appropriate executor based on parallel_tool_calls setting | ||
| executor = create_tool_call_executor(parallel=parallel_tool_calls) | ||
|
|
||
| # Execute batch | ||
| tool_results_batch = executor.execute_batch(tool_calls_batch, execute_tool_fn) | ||
|
|
||
| tool_results = [] | ||
| for tool_call_obj, tool_result_obj in zip(tool_calls_batch, tool_results_batch): | ||
| if tool_result_obj.error is not None: | ||
| raise tool_result_obj.error | ||
| tool_result = tool_result_obj.result | ||
|
Comment on lines
+1920
to
+1923
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don’t re-raise per-tool execution errors in batched mode. Line 1921-1922 raises immediately on first tool failure, which aborts remaining tool results and prevents full tool-message emission for the turn. The executor already returns structured error results. Proposed fix- for tool_call_obj, tool_result_obj in zip(tool_calls_batch, tool_results_batch):
- if tool_result_obj.error is not None:
- raise tool_result_obj.error
- tool_result = tool_result_obj.result
+ for tool_call_obj, tool_result_obj in zip(tool_calls_batch, tool_results_batch):
+ tool_result = tool_result_obj.result🧰 Tools🪛 Ruff (0.15.10)[warning] 1920-1920: Add explicit value for parameter (B905) 🤖 Prompt for AI Agents |
||
| tool_results.append(tool_result) | ||
| accumulated_tool_results.append(tool_result) | ||
|
Comment on lines
+1919
to
1925
|
||
|
|
||
| logging.debug(f"[RESPONSES_API] Executed tool {tool_result_obj.function_name} with result: {tool_result}") | ||
|
|
||
| if verbose: | ||
| display_message = f"Agent {agent_name} called function '{function_name}' with arguments: {arguments}\n" | ||
| display_message = f"Agent {agent_name} called function '{tool_call_obj.function_name}' with arguments: {tool_call_obj.arguments}\n" | ||
| display_message += f"Function returned: {tool_result}" if tool_result else "Function returned no output" | ||
|
Comment on lines
1929
to
1931
|
||
| _get_display_functions()['display_tool_call'](display_message, console=self.console) | ||
|
|
||
| result_str = json.dumps(tool_result) if tool_result else "empty" | ||
| _get_display_functions()['execute_sync_callback']( | ||
| 'tool_call', | ||
| message=f"Calling function: {function_name}", | ||
| tool_name=function_name, | ||
| tool_input=arguments, | ||
| message=f"Calling function: {tool_call_obj.function_name}", | ||
| tool_name=tool_call_obj.function_name, | ||
| tool_input=tool_call_obj.arguments, | ||
| tool_output=result_str[:200] if result_str else None, | ||
|
Comment on lines
1935
to
1940
|
||
| ) | ||
|
|
||
|
|
@@ -1927,7 +1951,7 @@ def _prepare_return_value(text: str) -> Union[str, tuple]: | |
| content = json.dumps(tool_result) | ||
| messages.append({ | ||
| "role": "tool", | ||
| "tool_call_id": tool_call_id, | ||
| "tool_call_id": tool_result_obj.tool_call_id, | ||
| "content": content, | ||
| }) | ||
|
|
||
|
|
@@ -3142,6 +3166,7 @@ def get_response_stream( | |
| task_description: Optional[str] = None, | ||
| task_id: Optional[str] = None, | ||
| execute_tool_fn: Optional[Callable] = None, | ||
| parallel_tool_calls: bool = False, # Gap 2: Enable parallel tool execution | ||
| **kwargs | ||
| ): | ||
| """Generator that yields real-time response chunks from the LLM. | ||
|
|
@@ -3167,6 +3192,7 @@ def get_response_stream( | |
| task_description: Optional task description for logging | ||
| task_id: Optional task ID for logging | ||
| execute_tool_fn: Optional function for executing tools | ||
| parallel_tool_calls: If True, execute batched LLM tool calls in parallel (default False) | ||
| **kwargs: Additional parameters | ||
|
|
||
| Yields: | ||
|
|
@@ -3301,26 +3327,44 @@ def get_response_stream( | |
| "tool_calls": serializable_tool_calls | ||
| }) | ||
|
|
||
| # Execute tool calls and add results to conversation | ||
| # Execute tool calls using ToolCallExecutor (Gap 2: parallel or sequential) | ||
| is_ollama = self._is_ollama_provider() | ||
| tool_calls_batch = [] | ||
|
|
||
| # Prepare batch of ToolCall objects | ||
| for tool_call in tool_calls: | ||
| is_ollama = self._is_ollama_provider() | ||
| function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call, is_ollama) | ||
|
|
||
| try: | ||
| # Execute the tool (pass tool_call_id for event correlation) | ||
| tool_result = execute_tool_fn(function_name, arguments, tool_call_id=tool_call_id) | ||
|
|
||
| # Add tool result to messages | ||
| tool_message = self._create_tool_message(function_name, tool_result, tool_call_id, is_ollama) | ||
| messages.append(tool_message) | ||
|
|
||
| except Exception as e: | ||
| logging.error(f"Tool execution error for {function_name}: {e}") | ||
| # Add error message to conversation | ||
| error_message = self._create_tool_message( | ||
| function_name, f"Error executing tool: {e}", tool_call_id, is_ollama | ||
| tool_calls_batch.append(ToolCall( | ||
| function_name=function_name, | ||
| arguments=arguments, | ||
| tool_call_id=tool_call_id, | ||
| is_ollama=is_ollama | ||
| )) | ||
|
|
||
| # Create appropriate executor based on parallel_tool_calls setting | ||
| executor = create_tool_call_executor(parallel=parallel_tool_calls) | ||
|
|
||
| # Execute batch and add results to conversation | ||
| tool_results = executor.execute_batch(tool_calls_batch, execute_tool_fn) | ||
|
|
||
| for tool_result in tool_results: | ||
| if tool_result.error is None: | ||
| # Successful execution | ||
| tool_message = self._create_tool_message( | ||
| tool_result.function_name, | ||
| tool_result.result, | ||
| tool_result.tool_call_id, | ||
| tool_result.is_ollama | ||
| ) | ||
| else: | ||
| # Error during execution (already logged by executor) | ||
| tool_message = self._create_tool_message( | ||
| tool_result.function_name, | ||
| tool_result.result, # Contains error message | ||
| tool_result.tool_call_id, | ||
| tool_result.is_ollama | ||
| ) | ||
| messages.append(error_message) | ||
| messages.append(tool_message) | ||
|
Comment on lines
+3350
to
+3367
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This branch will raise 🤖 Prompt for AI Agents |
||
|
|
||
| # Continue conversation after tool execution - get follow-up response | ||
| try: | ||
|
|
@@ -5462,4 +5506,4 @@ def _generate_tool_definition(self, function_or_name) -> Optional[Dict]: | |
| } | ||
| } | ||
| logging.debug(f"Generated tool definition: {tool_def}") | ||
| return tool_def | ||
| return tool_def | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parallel_tool_callsis still a no-op for the main Chat Completions path.This new flag is only consumed in the Responses API / streaming branches. The later tool loop in
get_response()still callsexecute_tool_fn(...)serially, so Anthropic/Gemini/Ollama and any non-Responses provider won't get parallel execution even when this isTrue.🤖 Prompt for AI Agents