diff --git a/src/praisonai-agents/praisonaiagents/agent/agent.py b/src/praisonai-agents/praisonaiagents/agent/agent.py index 6473cc73d..ca6153a3e 100644 --- a/src/praisonai-agents/praisonaiagents/agent/agent.py +++ b/src/praisonai-agents/praisonaiagents/agent/agent.py @@ -768,6 +768,14 @@ def __init__( alternative="use 'execution=ExecutionConfig(rate_limiter=obj)' instead", stacklevel=3 ) + if parallel_tool_calls is not None: + warn_deprecated_param( + "parallel_tool_calls", + since="1.0.0", + removal="2.0.0", + alternative="use 'execution=ExecutionConfig(parallel_tool_calls=True)' instead", + stacklevel=3 + ) if verification_hooks is not None: warn_deprecated_param( "verification_hooks", @@ -943,6 +951,8 @@ def __init__( allow_code_execution = True if _exec_config.code_mode != "safe": code_execution_mode = _exec_config.code_mode + # Get parallel_tool_calls from ExecutionConfig + parallel_tool_calls = _exec_config.parallel_tool_calls # Budget guard extraction _max_budget = getattr(_exec_config, 'max_budget', None) _on_budget_exceeded = getattr(_exec_config, 'on_budget_exceeded', 'stop') or 'stop' @@ -950,6 +960,8 @@ def __init__( max_iter, max_rpm, max_execution_time, max_retry_limit = 20, None, None, 2 _max_budget = None _on_budget_exceeded = 'stop' + # Default parallel_tool_calls when no ExecutionConfig provided + parallel_tool_calls = False # ───────────────────────────────────────────────────────────────────── # Resolve TEMPLATES param - FAST PATH @@ -1440,6 +1452,8 @@ def __init__( self.self_reflect = True if self_reflect is None else self_reflect self.instructions = instructions + # Gap 2: Store parallel tool calls setting for ToolCallExecutor selection + self.parallel_tool_calls = parallel_tool_calls # Check for model name in environment variable if not provided self._using_custom_llm = False # Flag to track if final result has been displayed to prevent duplicates diff --git a/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py b/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py index 9630a1513..7c21268ea 100644 --- a/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py +++ b/src/praisonai-agents/praisonaiagents/agent/chat_mixin.py @@ -1250,6 +1250,7 @@ def _chat_impl(self, prompt, temperature, tools, output_json, output_pydantic, r task_description=task_description, task_id=task_id, execute_tool_fn=self.execute_tool, + parallel_tool_calls=getattr(self.execution, "parallel_tool_calls", False), reasoning_steps=reasoning_steps, stream=stream ) @@ -1719,6 +1720,7 @@ async def _achat_impl(self, prompt, temperature, tools, output_json, output_pyda task_description=task_description, task_id=task_id, execute_tool_fn=self.execute_tool_async, + parallel_tool_calls=getattr(self.execution, "parallel_tool_calls", False), reasoning_steps=reasoning_steps, stream=stream ) @@ -2248,7 +2250,8 @@ def _start_stream(self, prompt: str, **kwargs) -> Generator[str, None, None]: task_name=kwargs.get('task_name'), task_description=kwargs.get('task_description'), task_id=kwargs.get('task_id'), - execute_tool_fn=self.execute_tool + execute_tool_fn=self.execute_tool, + parallel_tool_calls=getattr(self.execution, "parallel_tool_calls", False) ): response_content += chunk yield chunk diff --git a/src/praisonai-agents/praisonaiagents/config/feature_configs.py b/src/praisonai-agents/praisonaiagents/config/feature_configs.py index 9ed543a11..d9d0c60d1 100644 --- a/src/praisonai-agents/praisonaiagents/config/feature_configs.py +++ b/src/praisonai-agents/praisonaiagents/config/feature_configs.py @@ -735,6 +735,11 @@ class ExecutionConfig: # Action when budget exceeded: "stop" (default) raises BudgetExceededError, # "warn" logs warning but continues, or callable(total_cost, max_budget). on_budget_exceeded: Any = "stop" + + # Parallel tool execution (Gap 2): Enable parallel execution of batched LLM tool calls + # When True, multiple tool calls from LLM are executed concurrently instead of sequentially + # Default False preserves existing behavior for backward compatibility + parallel_tool_calls: bool = False def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" @@ -749,6 +754,7 @@ def to_dict(self) -> Dict[str, Any]: "context_compaction": self.context_compaction, "max_context_tokens": self.max_context_tokens, "max_budget": self.max_budget, + "parallel_tool_calls": self.parallel_tool_calls, } diff --git a/src/praisonai-agents/praisonaiagents/llm/llm.py b/src/praisonai-agents/praisonaiagents/llm/llm.py index c761285cd..024f10a11 100644 --- a/src/praisonai-agents/praisonaiagents/llm/llm.py +++ b/src/praisonai-agents/praisonaiagents/llm/llm.py @@ -15,6 +15,8 @@ import time import json import xml.etree.ElementTree as ET +# Gap 2: Tool call execution imports +from ..tools.call_executor import ToolCall, create_tool_call_executor # Display functions - lazy loaded to avoid importing rich at startup # These are only needed when output=verbose _display_module = None @@ -1649,6 +1651,7 @@ def get_response( task_description: Optional[str] = None, task_id: Optional[str] = None, execute_tool_fn: Optional[Callable] = None, + parallel_tool_calls: bool = False, # Gap 2: Enable parallel tool execution stream: bool = True, stream_callback: Optional[Callable] = None, emit_events: bool = False, @@ -1893,26 +1896,47 @@ def _prepare_return_value(text: str) -> Union[str, tuple]: "tool_calls": serializable_tool_calls, }) - tool_results = [] + # Execute tool calls using ToolCallExecutor (Gap 2: parallel or sequential) + is_ollama = self._is_ollama_provider() + tool_calls_batch = [] + + # Prepare batch of ToolCall objects for tool_call in tool_calls: - function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call) - - logging.debug(f"[RESPONSES_API] Executing tool {function_name} with args: {arguments}") - tool_result = execute_tool_fn(function_name, arguments, tool_call_id=tool_call_id) + function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call, is_ollama=is_ollama) + tool_calls_batch.append(ToolCall( + function_name=function_name, + arguments=arguments, + tool_call_id=tool_call_id, + is_ollama=is_ollama + )) + + # Create appropriate executor based on parallel_tool_calls setting + executor = create_tool_call_executor(parallel=parallel_tool_calls) + + # Execute batch + tool_results_batch = executor.execute_batch(tool_calls_batch, execute_tool_fn) + + tool_results = [] + for tool_call_obj, tool_result_obj in zip(tool_calls_batch, tool_results_batch): + if tool_result_obj.error is not None: + raise tool_result_obj.error + tool_result = tool_result_obj.result tool_results.append(tool_result) accumulated_tool_results.append(tool_result) + logging.debug(f"[RESPONSES_API] Executed tool {tool_result_obj.function_name} with result: {tool_result}") + if verbose: - display_message = f"Agent {agent_name} called function '{function_name}' with arguments: {arguments}\n" + display_message = f"Agent {agent_name} called function '{tool_call_obj.function_name}' with arguments: {tool_call_obj.arguments}\n" display_message += f"Function returned: {tool_result}" if tool_result else "Function returned no output" _get_display_functions()['display_tool_call'](display_message, console=self.console) result_str = json.dumps(tool_result) if tool_result else "empty" _get_display_functions()['execute_sync_callback']( 'tool_call', - message=f"Calling function: {function_name}", - tool_name=function_name, - tool_input=arguments, + message=f"Calling function: {tool_call_obj.function_name}", + tool_name=tool_call_obj.function_name, + tool_input=tool_call_obj.arguments, tool_output=result_str[:200] if result_str else None, ) @@ -1927,7 +1951,7 @@ def _prepare_return_value(text: str) -> Union[str, tuple]: content = json.dumps(tool_result) messages.append({ "role": "tool", - "tool_call_id": tool_call_id, + "tool_call_id": tool_result_obj.tool_call_id, "content": content, }) @@ -3142,6 +3166,7 @@ def get_response_stream( task_description: Optional[str] = None, task_id: Optional[str] = None, execute_tool_fn: Optional[Callable] = None, + parallel_tool_calls: bool = False, # Gap 2: Enable parallel tool execution **kwargs ): """Generator that yields real-time response chunks from the LLM. @@ -3167,6 +3192,7 @@ def get_response_stream( task_description: Optional task description for logging task_id: Optional task ID for logging execute_tool_fn: Optional function for executing tools + parallel_tool_calls: If True, execute batched LLM tool calls in parallel (default False) **kwargs: Additional parameters Yields: @@ -3301,26 +3327,44 @@ def get_response_stream( "tool_calls": serializable_tool_calls }) - # Execute tool calls and add results to conversation + # Execute tool calls using ToolCallExecutor (Gap 2: parallel or sequential) + is_ollama = self._is_ollama_provider() + tool_calls_batch = [] + + # Prepare batch of ToolCall objects for tool_call in tool_calls: - is_ollama = self._is_ollama_provider() function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call, is_ollama) - - try: - # Execute the tool (pass tool_call_id for event correlation) - tool_result = execute_tool_fn(function_name, arguments, tool_call_id=tool_call_id) - - # Add tool result to messages - tool_message = self._create_tool_message(function_name, tool_result, tool_call_id, is_ollama) - messages.append(tool_message) - - except Exception as e: - logging.error(f"Tool execution error for {function_name}: {e}") - # Add error message to conversation - error_message = self._create_tool_message( - function_name, f"Error executing tool: {e}", tool_call_id, is_ollama + tool_calls_batch.append(ToolCall( + function_name=function_name, + arguments=arguments, + tool_call_id=tool_call_id, + is_ollama=is_ollama + )) + + # Create appropriate executor based on parallel_tool_calls setting + executor = create_tool_call_executor(parallel=parallel_tool_calls) + + # Execute batch and add results to conversation + tool_results = executor.execute_batch(tool_calls_batch, execute_tool_fn) + + for tool_result in tool_results: + if tool_result.error is None: + # Successful execution + tool_message = self._create_tool_message( + tool_result.function_name, + tool_result.result, + tool_result.tool_call_id, + tool_result.is_ollama + ) + else: + # Error during execution (already logged by executor) + tool_message = self._create_tool_message( + tool_result.function_name, + tool_result.result, # Contains error message + tool_result.tool_call_id, + tool_result.is_ollama ) - messages.append(error_message) + messages.append(tool_message) # Continue conversation after tool execution - get follow-up response try: @@ -5462,4 +5506,4 @@ def _generate_tool_definition(self, function_or_name) -> Optional[Dict]: } } logging.debug(f"Generated tool definition: {tool_def}") - return tool_def \ No newline at end of file + return tool_def diff --git a/src/praisonai-agents/praisonaiagents/tools/call_executor.py b/src/praisonai-agents/praisonaiagents/tools/call_executor.py new file mode 100644 index 000000000..0d17f342b --- /dev/null +++ b/src/praisonai-agents/praisonaiagents/tools/call_executor.py @@ -0,0 +1,199 @@ +""" +Tool Call Executor protocols for parallel and sequential tool execution. + +This module implements Gap 2 from Issue #1392: enables parallel execution +of batched LLM tool calls while maintaining backward compatibility. + +Design principles: +- Protocol-driven: ToolCallExecutor defines interface, concrete implementations provide behavior +- Opt-in: parallel_tool_calls=False by default (zero regression risk) +- Respects existing per-tool timeout infrastructure +- Thread-safe with bounded workers +""" + +import concurrent.futures +import contextvars +import logging +from typing import Any, Callable, Dict, List, Optional, Protocol +from dataclasses import dataclass +from ..trace.context_events import copy_context_to_callable + +logger = logging.getLogger(__name__) + + +@dataclass +class ToolCall: + """Represents a single tool call from LLM.""" + function_name: str + arguments: Dict[str, Any] + tool_call_id: str + is_ollama: bool = False + + +@dataclass +class ToolResult: + """Result of executing a single tool call.""" + function_name: str + arguments: Dict[str, Any] + result: Any + tool_call_id: str + is_ollama: bool + error: Optional[Exception] = None + + +class ToolCallExecutor(Protocol): + """Protocol for executing batched tool calls.""" + + def execute_batch( + self, + tool_calls: List[ToolCall], + execute_tool_fn: Callable[[str, Dict[str, Any], Optional[str]], Any] + ) -> List[ToolResult]: + """ + Execute a batch of tool calls and return results in original order. + + Args: + tool_calls: List of tool calls to execute + execute_tool_fn: Function to execute individual tools + + Returns: + List of ToolResult in same order as input tool_calls + """ + ... + + +class SequentialToolCallExecutor: + """ + Sequential tool call executor - maintains current behavior. + + Executes tool calls one after another, preserving exact current semantics. + """ + + def execute_batch( + self, + tool_calls: List[ToolCall], + execute_tool_fn: Callable[[str, Dict[str, Any], Optional[str]], Any] + ) -> List[ToolResult]: + """Execute tool calls sequentially - current behavior.""" + results = [] + + for tool_call in tool_calls: + try: + result = execute_tool_fn( + tool_call.function_name, + tool_call.arguments, + tool_call.tool_call_id + ) + results.append(ToolResult( + function_name=tool_call.function_name, + arguments=tool_call.arguments, + result=result, + tool_call_id=tool_call.tool_call_id, + is_ollama=tool_call.is_ollama + )) + except Exception as e: + logger.error(f"Tool execution error for {tool_call.function_name}: {e}") + results.append(ToolResult( + function_name=tool_call.function_name, + arguments=tool_call.arguments, + result=f"Error executing tool: {e}", + tool_call_id=tool_call.tool_call_id, + is_ollama=tool_call.is_ollama, + error=e + )) + + return results + + +class ParallelToolCallExecutor: + """ + Parallel tool call executor with bounded concurrency. + + Executes tool calls concurrently using thread pool while respecting: + - Per-tool timeout (from existing infrastructure) + - Bounded max_workers to prevent resource exhaustion + - Result ordering (matches input order) + """ + + def __init__(self, max_workers: int = 5): + """ + Initialize parallel executor. + + Args: + max_workers: Maximum concurrent tool executions (default 5) + """ + self.max_workers = max_workers + + def execute_batch( + self, + tool_calls: List[ToolCall], + execute_tool_fn: Callable[[str, Dict[str, Any], Optional[str]], Any] + ) -> List[ToolResult]: + """Execute tool calls in parallel using thread pool.""" + if not tool_calls: + return [] + + # Single tool call - no need for parallelism overhead + if len(tool_calls) == 1: + sequential_executor = SequentialToolCallExecutor() + return sequential_executor.execute_batch(tool_calls, execute_tool_fn) + + def _execute_single_tool(tool_call: ToolCall) -> ToolResult: + """Execute a single tool call with error handling.""" + try: + result = execute_tool_fn( + tool_call.function_name, + tool_call.arguments, + tool_call.tool_call_id + ) + return ToolResult( + function_name=tool_call.function_name, + arguments=tool_call.arguments, + result=result, + tool_call_id=tool_call.tool_call_id, + is_ollama=tool_call.is_ollama + ) + except Exception as e: + logger.error(f"Tool execution error for {tool_call.function_name}: {e}") + return ToolResult( + function_name=tool_call.function_name, + arguments=tool_call.arguments, + result=f"Error executing tool: {e}", + tool_call_id=tool_call.tool_call_id, + is_ollama=tool_call.is_ollama, + error=e + ) + + # Use ThreadPoolExecutor for sync tools + with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + # Submit all tool calls with context propagation + future_to_index = { + # Preserve contextvars (tracing/session context) across worker threads. + executor.submit(copy_context_to_callable(_execute_single_tool), tool_call): i + for i, tool_call in enumerate(tool_calls) + } + + # Collect results and restore original order + results = [None] * len(tool_calls) + for future in concurrent.futures.as_completed(future_to_index): + index = future_to_index[future] + results[index] = future.result() + + return results + + +def create_tool_call_executor(parallel: bool = False, max_workers: int = 5) -> ToolCallExecutor: + """ + Factory function to create appropriate tool call executor. + + Args: + parallel: If True, return ParallelToolCallExecutor; else SequentialToolCallExecutor + max_workers: Maximum concurrent workers for parallel executor + + Returns: + ToolCallExecutor implementation + """ + if parallel: + return ParallelToolCallExecutor(max_workers=max_workers) + else: + return SequentialToolCallExecutor() diff --git a/src/praisonai-agents/tests/test_parallel_tools.py b/src/praisonai-agents/tests/test_parallel_tools.py new file mode 100644 index 000000000..1ab698c3f --- /dev/null +++ b/src/praisonai-agents/tests/test_parallel_tools.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +""" +Real agentic test for parallel tool execution (Gap 2). + +This test verifies that Agent(execution=ExecutionConfig(parallel_tool_calls=True)) executes +batched LLM tool calls concurrently with improved latency. + +Per AGENTS.md requirements: Agent MUST call agent.start() with a real prompt +and call the LLM end-to-end, not just object construction. +""" + +import time +import logging +import pytest +from typing import List +from praisonaiagents import Agent, tool +from praisonaiagents.config.feature_configs import ExecutionConfig +from praisonaiagents.tools.call_executor import create_tool_call_executor, ToolCall + +# Set up logging to see execution details +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Mock slow tools for latency testing +@tool +def fetch_user_data(user_id: str) -> str: + """Fetch user data (simulated slow I/O).""" + time.sleep(0.5) # Simulate 500ms network delay + return f"User {user_id}: John Doe, email: john@example.com" + +@tool +def fetch_analytics_data(metric: str) -> str: + """Fetch analytics data (simulated slow I/O).""" + time.sleep(0.5) # Simulate 500ms network delay + return f"Analytics for {metric}: 42,000 views, 3.2% conversion" + +@tool +def fetch_config_data(config_key: str) -> str: + """Fetch configuration data (simulated slow I/O).""" + time.sleep(0.5) # Simulate 500ms network delay + return f"Config {config_key}: enabled=true, timeout=30s" + +def test_executor_protocols(): + """Test the ToolCallExecutor protocols directly.""" + print("=== Testing ToolCallExecutor Protocols ===") + + def mock_execute_tool(name: str, args: dict, tool_call_id: str = None) -> str: + """Mock tool execution function.""" + start = time.time() + if name == "fetch_user_data": + time.sleep(0.3) + result = f"User {args.get('user_id', 'unknown')}: Mock Data" + elif name == "fetch_analytics_data": + time.sleep(0.3) + result = f"Analytics {args.get('metric', 'unknown')}: Mock Data" + elif name == "fetch_config_data": + time.sleep(0.3) + result = f"Config {args.get('config_key', 'unknown')}: Mock Data" + else: + result = "Unknown tool" + + duration = time.time() - start + print(f" Tool {name} executed in {duration:.2f}s -> {result}") + return result + + # Create test tool calls + tool_calls = [ + ToolCall("fetch_user_data", {"user_id": "123"}, "call_1", False), + ToolCall("fetch_analytics_data", {"metric": "views"}, "call_2", False), + ToolCall("fetch_config_data", {"config_key": "timeout"}, "call_3", False), + ] + + # Test sequential execution + print("\n--- Sequential Execution ---") + sequential_start = time.time() + seq_executor = create_tool_call_executor(parallel=False) + seq_results = seq_executor.execute_batch(tool_calls, mock_execute_tool) + sequential_time = time.time() - sequential_start + print(f"Sequential execution took: {sequential_time:.2f}s") + print(f"Results: {len(seq_results)} tools executed") + + # Test parallel execution + print("\n--- Parallel Execution ---") + parallel_start = time.time() + par_executor = create_tool_call_executor(parallel=True, max_workers=3) + par_results = par_executor.execute_batch(tool_calls, mock_execute_tool) + parallel_time = time.time() - parallel_start + print(f"Parallel execution took: {parallel_time:.2f}s") + print(f"Results: {len(par_results)} tools executed") + + # Verify results are identical and in correct order + assert len(seq_results) == len(par_results), "Result counts should match" + for i, (seq_result, par_result) in enumerate(zip(seq_results, par_results)): + assert seq_result.function_name == par_result.function_name, f"Function names should match at index {i}" + assert seq_result.arguments == par_result.arguments, f"Arguments should match at index {i}" + assert seq_result.tool_call_id == par_result.tool_call_id, f"Tool call IDs should match at index {i}" + print(f" Result {i+1}: {seq_result.function_name} -> {seq_result.result}") + + # Verify latency improvement + speedup = sequential_time / parallel_time if parallel_time > 0 else 1 + print(f"\nSpeedup: {speedup:.2f}x") + print(f"Expected ~3x speedup for 3 parallel tools with 0.3s each") + + # Should be at least 2x faster for 3 parallel tools + assert speedup >= 1.5, f"Expected speedup >= 1.5x, got {speedup:.2f}x" + print("✅ ToolCallExecutor protocol test passed!\n") + +@pytest.mark.live +def test_agent_parallel_tools(): + """Real agentic test with LLM end-to-end.""" + print("=== Real Agentic Test: Parallel Tool Execution ===") + + # Skip if no OpenAI API key + import os + if not os.getenv('OPENAI_API_KEY') and not os.getenv('PRAISONAI_LIVE_TESTS'): + pytest.skip("OpenAI API key not available for live test") + + # Create agents with different settings + sequential_agent = Agent( + name="sequential_agent", + instructions="You are a data fetcher. Use the provided tools to fetch user, analytics, and config data.", + tools=[fetch_user_data, fetch_analytics_data, fetch_config_data], + execution=ExecutionConfig(parallel_tool_calls=False), # Sequential (current behavior) + llm="gpt-4o-mini" + ) + + parallel_agent = Agent( + name="parallel_agent", + instructions="You are a data fetcher. Use the provided tools to fetch user, analytics, and config data.", + tools=[fetch_user_data, fetch_analytics_data, fetch_config_data], + execution=ExecutionConfig(parallel_tool_calls=True), # Parallel (new feature) + llm="gpt-4o-mini" + ) + + # Prompt that should trigger multiple tool calls + prompt = """Please fetch the following data concurrently: +1. User data for user ID 'user123' +2. Analytics data for metric 'page_views' +3. Config data for key 'max_connections' + +Return a summary of all the fetched data.""" + + print(f"\nPrompt: {prompt}") + + # Test sequential agent (baseline) + print("\n--- Sequential Agent ---") + sequential_start = time.time() + sequential_result = sequential_agent.start(prompt) + sequential_time = time.time() - sequential_start + print(f"Sequential agent completed in: {sequential_time:.2f}s") + print(f"Result length: {len(sequential_result)} chars") + print(f"Result preview: {sequential_result[:200]}...") + + # Test parallel agent + print("\n--- Parallel Agent ---") + parallel_start = time.time() + parallel_result = parallel_agent.start(prompt) + parallel_time = time.time() - parallel_start + print(f"Parallel agent completed in: {parallel_time:.2f}s") + print(f"Result length: {len(parallel_result)} chars") + print(f"Result preview: {parallel_result[:200]}...") + + speedup = sequential_time / parallel_time if parallel_time > 0 else float("inf") + print(f"\n=== Performance Comparison ===") + print(f"Sequential time: {sequential_time:.2f}s") + print(f"Parallel time: {parallel_time:.2f}s") + print(f"Speedup: {speedup:.2f}x") + + # Assertions for test validation + assert isinstance(sequential_result, str) and sequential_result.strip(), ( + "Sequential agent should return a non-empty string result." + ) + assert isinstance(parallel_result, str) and parallel_result.strip(), ( + "Parallel agent should return a non-empty string result." + ) + + # Both results should contain evidence of tool execution + assert 'user123' in sequential_result.lower() or 'john doe' in sequential_result.lower(), ( + "Sequential result should contain user data" + ) + assert 'user123' in parallel_result.lower() or 'john doe' in parallel_result.lower(), ( + "Parallel result should contain user data" + ) + + print("✅ Real agentic test completed!\n") + +if __name__ == "__main__": + """Run tests directly.""" + print("Testing Gap 2: Parallel Tool Execution") + print("=====================================") + + # Test 1: Direct executor protocol testing + test_executor_protocols() + + # Test 2: Real agentic test (per AGENTS.md requirement) + try: + test_agent_parallel_tools() + except Exception as e: + print(f"Live test skipped or failed: {e}") + + print("Tests completed! 🎉") + print("\nGap 2 implementation allows agents to execute batched LLM tool calls in parallel,") + print("reducing latency for I/O-bound workflows while maintaining backward compatibility.")