-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix: implement real-time streaming for Agent.start() method #1028
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bfd639e
6663b9b
4f3276b
662e155
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -206,7 +206,7 @@ def __init__( | |
| knowledge_config: Optional[Dict[str, Any]] = None, | ||
| use_system_prompt: Optional[bool] = True, | ||
| markdown: bool = True, | ||
| stream: bool = True, | ||
| stream: bool = False, | ||
| self_reflect: bool = False, | ||
| max_reflect: int = 3, | ||
| min_reflect: int = 1, | ||
|
|
@@ -281,8 +281,8 @@ def __init__( | |
| conversations to establish agent behavior and context. Defaults to True. | ||
| markdown (bool, optional): Enable markdown formatting in agent responses for better | ||
| readability and structure. Defaults to True. | ||
| stream (bool, optional): Enable streaming responses from the language model. Set to False | ||
| for LLM providers that don't support streaming. Defaults to True. | ||
| stream (bool, optional): Enable streaming responses from the language model for real-time | ||
| output when using Agent.start() method. Defaults to False for backward compatibility. | ||
| self_reflect (bool, optional): Enable self-reflection capabilities where the agent | ||
| evaluates and improves its own responses. Defaults to False. | ||
| max_reflect (int, optional): Maximum number of self-reflection iterations to prevent | ||
|
|
@@ -1953,34 +1953,114 @@ def _start_stream(self, prompt: str, **kwargs) -> Generator[str, None, None]: | |
| # Reset the final display flag for each new conversation | ||
| self._final_display_shown = False | ||
|
|
||
| # Temporarily disable verbose mode to prevent console output during streaming | ||
| # Temporarily disable verbose mode to prevent console output conflicts during streaming | ||
| original_verbose = self.verbose | ||
| self.verbose = False | ||
|
|
||
| # Use the existing chat logic but capture and yield chunks | ||
| # This approach reuses all existing logic without duplication | ||
| response = self.chat(prompt, **kwargs) | ||
|
|
||
| # Restore original verbose mode | ||
| self.verbose = original_verbose | ||
|
|
||
| if response: | ||
| # Simulate streaming by yielding the response in word chunks | ||
| # This provides a consistent streaming experience regardless of LLM type | ||
| words = str(response).split() | ||
| chunk_size = max(1, len(words) // 20) # Split into ~20 chunks for smooth streaming | ||
| # For custom LLM path, use the new get_response_stream generator | ||
| if self._using_custom_llm: | ||
| # Handle knowledge search | ||
| actual_prompt = prompt | ||
| if self.knowledge: | ||
| search_results = self.knowledge.search(prompt, agent_id=self.agent_id) | ||
| if search_results: | ||
| if isinstance(search_results, dict) and 'results' in search_results: | ||
| knowledge_content = "\n".join([result['memory'] for result in search_results['results']]) | ||
| else: | ||
| knowledge_content = "\n".join(search_results) | ||
| actual_prompt = f"{prompt}\n\nKnowledge: {knowledge_content}" | ||
|
|
||
| # Handle tools properly | ||
| tools = kwargs.get('tools', self.tools) | ||
| if tools is None or (isinstance(tools, list) and len(tools) == 0): | ||
| tool_param = self.tools | ||
| else: | ||
| tool_param = tools | ||
|
|
||
| for i in range(0, len(words), chunk_size): | ||
| chunk_words = words[i:i + chunk_size] | ||
| chunk = ' '.join(chunk_words) | ||
| # Convert MCP tools if needed | ||
| if tool_param is not None: | ||
| from ..mcp.mcp import MCP | ||
| if isinstance(tool_param, MCP) and hasattr(tool_param, 'to_openai_tool'): | ||
| openai_tool = tool_param.to_openai_tool() | ||
| if openai_tool: | ||
| if isinstance(openai_tool, list): | ||
| tool_param = openai_tool | ||
| else: | ||
| tool_param = [openai_tool] | ||
|
|
||
|
Comment on lines
+1962
to
+1990
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This block of code for handling knowledge search, tools, and MCP tool conversion is very similar to the logic in the To improve maintainability, I recommend refactoring this duplicated logic into a shared private helper method. This method could take the prompt and tools as input and return the processed |
||
| # Store chat history length for potential rollback | ||
| chat_history_length = len(self.chat_history) | ||
|
|
||
| # Normalize prompt content for chat history | ||
| normalized_content = actual_prompt | ||
| if isinstance(actual_prompt, list): | ||
| normalized_content = next((item["text"] for item in actual_prompt if item.get("type") == "text"), "") | ||
|
|
||
| # Prevent duplicate messages in chat history | ||
| if not (self.chat_history and | ||
| self.chat_history[-1].get("role") == "user" and | ||
| self.chat_history[-1].get("content") == normalized_content): | ||
| self.chat_history.append({"role": "user", "content": normalized_content}) | ||
|
|
||
| try: | ||
| # Use the new streaming generator from LLM class | ||
| response_content = "" | ||
| for chunk in self.llm_instance.get_response_stream( | ||
| prompt=actual_prompt, | ||
| system_prompt=self._build_system_prompt(tool_param), | ||
| chat_history=self.chat_history, | ||
| temperature=kwargs.get('temperature', 0.2), | ||
| tools=tool_param, | ||
| output_json=kwargs.get('output_json'), | ||
| output_pydantic=kwargs.get('output_pydantic'), | ||
| verbose=False, # Keep verbose false for streaming | ||
| markdown=self.markdown, | ||
| agent_name=self.name, | ||
| agent_role=self.role, | ||
| agent_tools=[t.__name__ if hasattr(t, '__name__') else str(t) for t in (tool_param or [])], | ||
| task_name=kwargs.get('task_name'), | ||
| task_description=kwargs.get('task_description'), | ||
| task_id=kwargs.get('task_id'), | ||
| execute_tool_fn=self.execute_tool | ||
| ): | ||
| response_content += chunk | ||
| yield chunk | ||
|
|
||
| # Add space after chunk unless it's the last one | ||
| if i + chunk_size < len(words): | ||
| chunk += ' ' | ||
| # Add complete response to chat history | ||
| if response_content: | ||
| self.chat_history.append({"role": "assistant", "content": response_content}) | ||
|
|
||
| except Exception as e: | ||
| # Rollback chat history on error | ||
| self.chat_history = self.chat_history[:chat_history_length] | ||
| logging.error(f"Custom LLM streaming error: {e}") | ||
| raise | ||
|
|
||
| else: | ||
| # For OpenAI-style models, fall back to the chat method for now | ||
| # TODO: Implement OpenAI streaming in future iterations | ||
| response = self.chat(prompt, **kwargs) | ||
|
|
||
| if response: | ||
| # Simulate streaming by yielding the response in word chunks | ||
| words = str(response).split() | ||
| chunk_size = max(1, len(words) // 20) | ||
|
|
||
| yield chunk | ||
| for i in range(0, len(words), chunk_size): | ||
| chunk_words = words[i:i + chunk_size] | ||
| chunk = ' '.join(chunk_words) | ||
|
|
||
| if i + chunk_size < len(words): | ||
| chunk += ' ' | ||
|
|
||
| yield chunk | ||
|
Comment on lines
+2039
to
+2056
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Consider improving the OpenAI fallback streaming simulation. The fallback approach for OpenAI-style models has some potential issues:
Consider implementing true OpenAI streaming using their streaming API or documenting this limitation clearly. - # For OpenAI-style models, fall back to the chat method for now
- # TODO: Implement OpenAI streaming in future iterations
- response = self.chat(prompt, **kwargs)
-
- if response:
- # Simulate streaming by yielding the response in word chunks
- words = str(response).split()
- chunk_size = max(1, len(words) // 20)
-
- for i in range(0, len(words), chunk_size):
- chunk_words = words[i:i + chunk_size]
- chunk = ' '.join(chunk_words)
-
- if i + chunk_size < len(words):
- chunk += ' '
-
- yield chunk
+ # For OpenAI-style models, use _chat_completion with streaming enabled
+ messages, original_prompt = self._build_messages(prompt, kwargs.get('temperature', 0.2),
+ kwargs.get('output_json'), kwargs.get('output_pydantic'), kwargs.get('tools'))
+
+ # Store chat history length for potential rollback
+ chat_history_length = len(self.chat_history)
+
+ # Add user message to chat history
+ normalized_content = original_prompt if isinstance(original_prompt, str) else str(original_prompt)
+ if not (self.chat_history and self.chat_history[-1].get("role") == "user" and
+ self.chat_history[-1].get("content") == normalized_content):
+ self.chat_history.append({"role": "user", "content": normalized_content})
+
+ try:
+ # Use streaming chat completion for real streaming
+ response = self._chat_completion(messages, temperature=kwargs.get('temperature', 0.2),
+ tools=kwargs.get('tools'), stream=True,
+ reasoning_steps=kwargs.get('reasoning_steps', False))
+ if response and response.choices:
+ response_text = response.choices[0].message.content.strip()
+ # Yield the complete response (streaming handled internally by _chat_completion)
+ yield response_text
+ self.chat_history.append({"role": "assistant", "content": response_text})
+ except Exception as e:
+ # Rollback chat history on error
+ self.chat_history = self.chat_history[:chat_history_length]
+ logging.error(f"OpenAI streaming error: {e}")
+ raise
🤖 Prompt for AI Agents |
||
|
|
||
| # Restore original verbose mode | ||
| self.verbose = original_verbose | ||
|
|
||
| except Exception as e: | ||
| # Restore verbose mode on any error | ||
| self.verbose = original_verbose | ||
| # Graceful fallback to non-streaming if streaming fails | ||
| logging.warning(f"Streaming failed, falling back to regular response: {e}") | ||
| response = self.chat(prompt, **kwargs) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1557,6 +1557,198 @@ def get_response( | |
| total_time = time.time() - start_time | ||
| logging.debug(f"get_response completed in {total_time:.2f} seconds") | ||
|
|
||
| def get_response_stream( | ||
| self, | ||
| prompt: Union[str, List[Dict]], | ||
| system_prompt: Optional[str] = None, | ||
| chat_history: Optional[List[Dict]] = None, | ||
| temperature: float = 0.2, | ||
| tools: Optional[List[Any]] = None, | ||
| output_json: Optional[BaseModel] = None, | ||
| output_pydantic: Optional[BaseModel] = None, | ||
| verbose: bool = False, # Default to non-verbose for streaming | ||
| markdown: bool = True, | ||
| agent_name: Optional[str] = None, | ||
| agent_role: Optional[str] = None, | ||
| agent_tools: Optional[List[str]] = None, | ||
| task_name: Optional[str] = None, | ||
| task_description: Optional[str] = None, | ||
| task_id: Optional[str] = None, | ||
| execute_tool_fn: Optional[Callable] = None, | ||
| **kwargs | ||
| ): | ||
| """Generator that yields real-time response chunks from the LLM. | ||
|
|
||
| This method provides true streaming by yielding content chunks as they | ||
| are received from the underlying LLM, enabling real-time display of | ||
| responses without waiting for the complete response. | ||
|
|
||
| Args: | ||
| prompt: The prompt to send to the LLM | ||
| system_prompt: Optional system prompt | ||
| chat_history: Optional chat history | ||
| temperature: Sampling temperature | ||
| tools: Optional list of tools for function calling | ||
| output_json: Optional JSON schema for structured output | ||
| output_pydantic: Optional Pydantic model for structured output | ||
| verbose: Whether to enable verbose logging (default False for streaming) | ||
| markdown: Whether to enable markdown processing | ||
| agent_name: Optional agent name for logging | ||
| agent_role: Optional agent role for logging | ||
| agent_tools: Optional list of agent tools for logging | ||
| task_name: Optional task name for logging | ||
| task_description: Optional task description for logging | ||
| task_id: Optional task ID for logging | ||
| execute_tool_fn: Optional function for executing tools | ||
| **kwargs: Additional parameters | ||
|
|
||
| Yields: | ||
| str: Individual content chunks as they are received from the LLM | ||
|
|
||
| Raises: | ||
| Exception: If streaming fails or LLM call encounters an error | ||
| """ | ||
| try: | ||
| import litellm | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| # Build messages using existing logic | ||
| messages, original_prompt = self._build_messages( | ||
| prompt=prompt, | ||
| system_prompt=system_prompt, | ||
| chat_history=chat_history, | ||
| output_json=output_json, | ||
| output_pydantic=output_pydantic | ||
| ) | ||
|
|
||
| # Format tools for litellm | ||
| formatted_tools = self._format_tools_for_litellm(tools) | ||
|
|
||
| # Determine if we should use streaming based on tool support | ||
| use_streaming = True | ||
| if formatted_tools and not self._supports_streaming_tools(): | ||
| # Provider doesn't support streaming with tools, fall back to non-streaming | ||
| use_streaming = False | ||
|
|
||
| if use_streaming: | ||
| # Real-time streaming approach with tool call support | ||
| try: | ||
| tool_calls = [] | ||
| response_text = "" | ||
|
|
||
| for chunk in litellm.completion( | ||
| **self._build_completion_params( | ||
| messages=messages, | ||
| tools=formatted_tools, | ||
| temperature=temperature, | ||
| stream=True, | ||
| output_json=output_json, | ||
| output_pydantic=output_pydantic, | ||
| **kwargs | ||
| ) | ||
| ): | ||
| if chunk and chunk.choices and chunk.choices[0].delta: | ||
| delta = chunk.choices[0].delta | ||
|
|
||
| # Process both content and tool calls using existing helper | ||
| response_text, tool_calls = self._process_stream_delta( | ||
| delta, response_text, tool_calls, formatted_tools | ||
| ) | ||
|
|
||
| # Yield content chunks in real-time as they arrive | ||
| if delta.content: | ||
| yield delta.content | ||
|
|
||
| # After streaming completes, handle tool calls if present | ||
| if tool_calls and execute_tool_fn: | ||
| # Add assistant message with tool calls to conversation | ||
| if self._is_ollama_provider(): | ||
| messages.append({ | ||
| "role": "assistant", | ||
| "content": response_text | ||
| }) | ||
| else: | ||
| serializable_tool_calls = self._serialize_tool_calls(tool_calls) | ||
| messages.append({ | ||
| "role": "assistant", | ||
| "content": response_text, | ||
| "tool_calls": serializable_tool_calls | ||
| }) | ||
|
|
||
| # Execute tool calls and add results to conversation | ||
| for tool_call in tool_calls: | ||
| is_ollama = self._is_ollama_provider() | ||
| function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call, is_ollama) | ||
|
|
||
| try: | ||
| # Execute the tool | ||
| tool_result = execute_tool_fn(function_name, arguments) | ||
|
|
||
| # Add tool result to messages | ||
| tool_message = self._create_tool_message(function_name, tool_result, tool_call_id, is_ollama) | ||
| messages.append(tool_message) | ||
|
|
||
| except Exception as e: | ||
| logging.error(f"Tool execution error for {function_name}: {e}") | ||
| # Add error message to conversation | ||
| error_message = self._create_tool_message( | ||
| function_name, f"Error executing tool: {e}", tool_call_id, is_ollama | ||
| ) | ||
| messages.append(error_message) | ||
|
|
||
| # Continue conversation after tool execution - get follow-up response | ||
| try: | ||
| follow_up_response = litellm.completion( | ||
| **self._build_completion_params( | ||
| messages=messages, | ||
| tools=formatted_tools, | ||
| temperature=temperature, | ||
| stream=False, | ||
| **kwargs | ||
| ) | ||
| ) | ||
|
|
||
| if follow_up_response and follow_up_response.choices: | ||
| follow_up_content = follow_up_response.choices[0].message.content | ||
| if follow_up_content: | ||
| # Yield the follow-up response after tool execution | ||
| yield follow_up_content | ||
| except Exception as e: | ||
| logging.error(f"Follow-up response failed: {e}") | ||
|
|
||
| except Exception as e: | ||
| logging.error(f"Streaming failed: {e}") | ||
| # Fall back to non-streaming if streaming fails | ||
| use_streaming = False | ||
|
|
||
| if not use_streaming: | ||
| # Fall back to non-streaming and yield the complete response | ||
| try: | ||
| response = litellm.completion( | ||
| **self._build_completion_params( | ||
| messages=messages, | ||
| tools=formatted_tools, | ||
| temperature=temperature, | ||
| stream=False, | ||
| output_json=output_json, | ||
| output_pydantic=output_pydantic, | ||
| **kwargs | ||
| ) | ||
| ) | ||
|
|
||
| if response and response.choices: | ||
| content = response.choices[0].message.content | ||
| if content: | ||
| # Yield the complete response as a single chunk | ||
| yield content | ||
|
|
||
| except Exception as e: | ||
| logging.error(f"Non-streaming fallback failed: {e}") | ||
| raise | ||
|
|
||
| except Exception as e: | ||
| logging.error(f"Error in get_response_stream: {e}") | ||
| raise | ||
|
|
||
| def _is_gemini_model(self) -> bool: | ||
| """Check if the model is a Gemini model.""" | ||
| if not self.model: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to PEP 8, imports should be at the top of the file. This local import makes the code less readable and can hide dependency issues.
Please move this import to the top of the file with the other imports.