Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 103 additions & 23 deletions src/praisonai-agents/praisonaiagents/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def __init__(
knowledge_config: Optional[Dict[str, Any]] = None,
use_system_prompt: Optional[bool] = True,
markdown: bool = True,
stream: bool = True,
stream: bool = False,
self_reflect: bool = False,
max_reflect: int = 3,
min_reflect: int = 1,
Expand Down Expand Up @@ -281,8 +281,8 @@ def __init__(
conversations to establish agent behavior and context. Defaults to True.
markdown (bool, optional): Enable markdown formatting in agent responses for better
readability and structure. Defaults to True.
stream (bool, optional): Enable streaming responses from the language model. Set to False
for LLM providers that don't support streaming. Defaults to True.
stream (bool, optional): Enable streaming responses from the language model for real-time
output when using Agent.start() method. Defaults to False for backward compatibility.
self_reflect (bool, optional): Enable self-reflection capabilities where the agent
evaluates and improves its own responses. Defaults to False.
max_reflect (int, optional): Maximum number of self-reflection iterations to prevent
Expand Down Expand Up @@ -1953,34 +1953,114 @@ def _start_stream(self, prompt: str, **kwargs) -> Generator[str, None, None]:
# Reset the final display flag for each new conversation
self._final_display_shown = False

# Temporarily disable verbose mode to prevent console output during streaming
# Temporarily disable verbose mode to prevent console output conflicts during streaming
original_verbose = self.verbose
self.verbose = False

# Use the existing chat logic but capture and yield chunks
# This approach reuses all existing logic without duplication
response = self.chat(prompt, **kwargs)

# Restore original verbose mode
self.verbose = original_verbose

if response:
# Simulate streaming by yielding the response in word chunks
# This provides a consistent streaming experience regardless of LLM type
words = str(response).split()
chunk_size = max(1, len(words) // 20) # Split into ~20 chunks for smooth streaming
# For custom LLM path, use the new get_response_stream generator
if self._using_custom_llm:
# Handle knowledge search
actual_prompt = prompt
if self.knowledge:
search_results = self.knowledge.search(prompt, agent_id=self.agent_id)
if search_results:
if isinstance(search_results, dict) and 'results' in search_results:
knowledge_content = "\n".join([result['memory'] for result in search_results['results']])
else:
knowledge_content = "\n".join(search_results)
actual_prompt = f"{prompt}\n\nKnowledge: {knowledge_content}"

# Handle tools properly
tools = kwargs.get('tools', self.tools)
if tools is None or (isinstance(tools, list) and len(tools) == 0):
tool_param = self.tools
else:
tool_param = tools

for i in range(0, len(words), chunk_size):
chunk_words = words[i:i + chunk_size]
chunk = ' '.join(chunk_words)
# Convert MCP tools if needed
if tool_param is not None:
from ..mcp.mcp import MCP
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

According to PEP 8, imports should be at the top of the file. This local import makes the code less readable and can hide dependency issues.

Please move this import to the top of the file with the other imports.

from ..mcp.mcp import MCP

if isinstance(tool_param, MCP) and hasattr(tool_param, 'to_openai_tool'):
openai_tool = tool_param.to_openai_tool()
if openai_tool:
if isinstance(openai_tool, list):
tool_param = openai_tool
else:
tool_param = [openai_tool]

Comment on lines +1962 to +1990
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of code for handling knowledge search, tools, and MCP tool conversion is very similar to the logic in the chat method (lines 1212-1249). This duplication can make the code harder to maintain, as any future changes will need to be made in both places.

To improve maintainability, I recommend refactoring this duplicated logic into a shared private helper method. This method could take the prompt and tools as input and return the processed actual_prompt and tool_param.

# Store chat history length for potential rollback
chat_history_length = len(self.chat_history)

# Normalize prompt content for chat history
normalized_content = actual_prompt
if isinstance(actual_prompt, list):
normalized_content = next((item["text"] for item in actual_prompt if item.get("type") == "text"), "")

# Prevent duplicate messages in chat history
if not (self.chat_history and
self.chat_history[-1].get("role") == "user" and
self.chat_history[-1].get("content") == normalized_content):
self.chat_history.append({"role": "user", "content": normalized_content})

try:
# Use the new streaming generator from LLM class
response_content = ""
for chunk in self.llm_instance.get_response_stream(
prompt=actual_prompt,
system_prompt=self._build_system_prompt(tool_param),
chat_history=self.chat_history,
temperature=kwargs.get('temperature', 0.2),
tools=tool_param,
output_json=kwargs.get('output_json'),
output_pydantic=kwargs.get('output_pydantic'),
verbose=False, # Keep verbose false for streaming
markdown=self.markdown,
agent_name=self.name,
agent_role=self.role,
agent_tools=[t.__name__ if hasattr(t, '__name__') else str(t) for t in (tool_param or [])],
task_name=kwargs.get('task_name'),
task_description=kwargs.get('task_description'),
task_id=kwargs.get('task_id'),
execute_tool_fn=self.execute_tool
):
response_content += chunk
yield chunk

# Add space after chunk unless it's the last one
if i + chunk_size < len(words):
chunk += ' '
# Add complete response to chat history
if response_content:
self.chat_history.append({"role": "assistant", "content": response_content})

except Exception as e:
# Rollback chat history on error
self.chat_history = self.chat_history[:chat_history_length]
logging.error(f"Custom LLM streaming error: {e}")
raise

else:
# For OpenAI-style models, fall back to the chat method for now
# TODO: Implement OpenAI streaming in future iterations
response = self.chat(prompt, **kwargs)

if response:
# Simulate streaming by yielding the response in word chunks
words = str(response).split()
chunk_size = max(1, len(words) // 20)

yield chunk
for i in range(0, len(words), chunk_size):
chunk_words = words[i:i + chunk_size]
chunk = ' '.join(chunk_words)

if i + chunk_size < len(words):
chunk += ' '

yield chunk
Comment on lines +2039 to +2056
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider improving the OpenAI fallback streaming simulation.

The fallback approach for OpenAI-style models has some potential issues:

  1. Recursion risk: Calling self.chat() could cause infinite recursion if the chat method internally uses streaming
  2. Unnatural chunking: Splitting into ~20-word chunks may not provide a natural streaming experience
  3. Missing streaming benefits: This doesn't provide true real-time streaming, just delayed chunk delivery

Consider implementing true OpenAI streaming using their streaming API or documenting this limitation clearly.

-                # For OpenAI-style models, fall back to the chat method for now
-                # TODO: Implement OpenAI streaming in future iterations
-                response = self.chat(prompt, **kwargs)
-                
-                if response:
-                    # Simulate streaming by yielding the response in word chunks
-                    words = str(response).split()
-                    chunk_size = max(1, len(words) // 20)
-                    
-                    for i in range(0, len(words), chunk_size):
-                        chunk_words = words[i:i + chunk_size]
-                        chunk = ' '.join(chunk_words)
-                        
-                        if i + chunk_size < len(words):
-                            chunk += ' '
-                        
-                        yield chunk
+                # For OpenAI-style models, use _chat_completion with streaming enabled
+                messages, original_prompt = self._build_messages(prompt, kwargs.get('temperature', 0.2), 
+                                                               kwargs.get('output_json'), kwargs.get('output_pydantic'), kwargs.get('tools'))
+                
+                # Store chat history length for potential rollback
+                chat_history_length = len(self.chat_history)
+                
+                # Add user message to chat history
+                normalized_content = original_prompt if isinstance(original_prompt, str) else str(original_prompt)
+                if not (self.chat_history and self.chat_history[-1].get("role") == "user" and 
+                        self.chat_history[-1].get("content") == normalized_content):
+                    self.chat_history.append({"role": "user", "content": normalized_content})
+                
+                try:
+                    # Use streaming chat completion for real streaming
+                    response = self._chat_completion(messages, temperature=kwargs.get('temperature', 0.2), 
+                                                   tools=kwargs.get('tools'), stream=True, 
+                                                   reasoning_steps=kwargs.get('reasoning_steps', False))
+                    if response and response.choices:
+                        response_text = response.choices[0].message.content.strip()
+                        # Yield the complete response (streaming handled internally by _chat_completion)
+                        yield response_text
+                        self.chat_history.append({"role": "assistant", "content": response_text})
+                except Exception as e:
+                    # Rollback chat history on error
+                    self.chat_history = self.chat_history[:chat_history_length]
+                    logging.error(f"OpenAI streaming error: {e}")
+                    raise

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/praisonai-agents/praisonaiagents/agent/agent.py around lines 2039 to
2056, the fallback streaming simulation for OpenAI-style models uses
self.chat(), which risks infinite recursion if chat uses streaming internally,
and splits responses into fixed word chunks that do not mimic natural streaming.
To fix this, avoid calling self.chat() directly in the fallback to prevent
recursion, and either implement true streaming using OpenAI's streaming API or
clearly document that this fallback is a simplified simulation with limitations
on real-time streaming and chunk naturalness.


# Restore original verbose mode
self.verbose = original_verbose

except Exception as e:
# Restore verbose mode on any error
self.verbose = original_verbose
# Graceful fallback to non-streaming if streaming fails
logging.warning(f"Streaming failed, falling back to regular response: {e}")
response = self.chat(prompt, **kwargs)
Expand Down
192 changes: 192 additions & 0 deletions src/praisonai-agents/praisonaiagents/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1557,6 +1557,198 @@ def get_response(
total_time = time.time() - start_time
logging.debug(f"get_response completed in {total_time:.2f} seconds")

def get_response_stream(
self,
prompt: Union[str, List[Dict]],
system_prompt: Optional[str] = None,
chat_history: Optional[List[Dict]] = None,
temperature: float = 0.2,
tools: Optional[List[Any]] = None,
output_json: Optional[BaseModel] = None,
output_pydantic: Optional[BaseModel] = None,
verbose: bool = False, # Default to non-verbose for streaming
markdown: bool = True,
agent_name: Optional[str] = None,
agent_role: Optional[str] = None,
agent_tools: Optional[List[str]] = None,
task_name: Optional[str] = None,
task_description: Optional[str] = None,
task_id: Optional[str] = None,
execute_tool_fn: Optional[Callable] = None,
**kwargs
):
"""Generator that yields real-time response chunks from the LLM.

This method provides true streaming by yielding content chunks as they
are received from the underlying LLM, enabling real-time display of
responses without waiting for the complete response.

Args:
prompt: The prompt to send to the LLM
system_prompt: Optional system prompt
chat_history: Optional chat history
temperature: Sampling temperature
tools: Optional list of tools for function calling
output_json: Optional JSON schema for structured output
output_pydantic: Optional Pydantic model for structured output
verbose: Whether to enable verbose logging (default False for streaming)
markdown: Whether to enable markdown processing
agent_name: Optional agent name for logging
agent_role: Optional agent role for logging
agent_tools: Optional list of agent tools for logging
task_name: Optional task name for logging
task_description: Optional task description for logging
task_id: Optional task ID for logging
execute_tool_fn: Optional function for executing tools
**kwargs: Additional parameters

Yields:
str: Individual content chunks as they are received from the LLM

Raises:
Exception: If streaming fails or LLM call encounters an error
"""
try:
import litellm
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Per PEP 8, imports should be at the top of the file. Placing imports inside functions or methods can lead to circular dependency issues and makes it harder to see the file's dependencies at a glance.

Please move import litellm to the top of llm.py.

import litellm


# Build messages using existing logic
messages, original_prompt = self._build_messages(
prompt=prompt,
system_prompt=system_prompt,
chat_history=chat_history,
output_json=output_json,
output_pydantic=output_pydantic
)

# Format tools for litellm
formatted_tools = self._format_tools_for_litellm(tools)

# Determine if we should use streaming based on tool support
use_streaming = True
if formatted_tools and not self._supports_streaming_tools():
# Provider doesn't support streaming with tools, fall back to non-streaming
use_streaming = False

if use_streaming:
# Real-time streaming approach with tool call support
try:
tool_calls = []
response_text = ""

for chunk in litellm.completion(
**self._build_completion_params(
messages=messages,
tools=formatted_tools,
temperature=temperature,
stream=True,
output_json=output_json,
output_pydantic=output_pydantic,
**kwargs
)
):
if chunk and chunk.choices and chunk.choices[0].delta:
delta = chunk.choices[0].delta

# Process both content and tool calls using existing helper
response_text, tool_calls = self._process_stream_delta(
delta, response_text, tool_calls, formatted_tools
)

# Yield content chunks in real-time as they arrive
if delta.content:
yield delta.content

# After streaming completes, handle tool calls if present
if tool_calls and execute_tool_fn:
# Add assistant message with tool calls to conversation
if self._is_ollama_provider():
messages.append({
"role": "assistant",
"content": response_text
})
else:
serializable_tool_calls = self._serialize_tool_calls(tool_calls)
messages.append({
"role": "assistant",
"content": response_text,
"tool_calls": serializable_tool_calls
})

# Execute tool calls and add results to conversation
for tool_call in tool_calls:
is_ollama = self._is_ollama_provider()
function_name, arguments, tool_call_id = self._extract_tool_call_info(tool_call, is_ollama)

try:
# Execute the tool
tool_result = execute_tool_fn(function_name, arguments)

# Add tool result to messages
tool_message = self._create_tool_message(function_name, tool_result, tool_call_id, is_ollama)
messages.append(tool_message)

except Exception as e:
logging.error(f"Tool execution error for {function_name}: {e}")
# Add error message to conversation
error_message = self._create_tool_message(
function_name, f"Error executing tool: {e}", tool_call_id, is_ollama
)
messages.append(error_message)

# Continue conversation after tool execution - get follow-up response
try:
follow_up_response = litellm.completion(
**self._build_completion_params(
messages=messages,
tools=formatted_tools,
temperature=temperature,
stream=False,
**kwargs
)
)

if follow_up_response and follow_up_response.choices:
follow_up_content = follow_up_response.choices[0].message.content
if follow_up_content:
# Yield the follow-up response after tool execution
yield follow_up_content
except Exception as e:
logging.error(f"Follow-up response failed: {e}")

except Exception as e:
logging.error(f"Streaming failed: {e}")
# Fall back to non-streaming if streaming fails
use_streaming = False

if not use_streaming:
# Fall back to non-streaming and yield the complete response
try:
response = litellm.completion(
**self._build_completion_params(
messages=messages,
tools=formatted_tools,
temperature=temperature,
stream=False,
output_json=output_json,
output_pydantic=output_pydantic,
**kwargs
)
)

if response and response.choices:
content = response.choices[0].message.content
if content:
# Yield the complete response as a single chunk
yield content

except Exception as e:
logging.error(f"Non-streaming fallback failed: {e}")
raise

except Exception as e:
logging.error(f"Error in get_response_stream: {e}")
raise

def _is_gemini_model(self) -> bool:
"""Check if the model is a Gemini model."""
if not self.model:
Expand Down
Loading