Skip to content

Commit 8ca24e9

Browse files
Merge pull request #991 from MervinPraison/claude/issue-981-20250718-1032
fix: implement real-time streaming for Agent.start() method
2 parents a7cc623 + da81622 commit 8ca24e9

5 files changed

Lines changed: 525 additions & 1 deletion

File tree

src/praisonai-agents/praisonaiagents/agent/agent.py

Lines changed: 182 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1937,7 +1937,188 @@ def run(self):
19371937

19381938
def start(self, prompt: str, **kwargs):
19391939
"""Start the agent with a prompt. This is a convenience method that wraps chat()."""
1940-
return self.chat(prompt, **kwargs)
1940+
# Check if streaming is enabled and user wants streaming chunks
1941+
if self.stream and kwargs.get('stream', True):
1942+
return self._start_stream(prompt, **kwargs)
1943+
else:
1944+
return self.chat(prompt, **kwargs)
1945+
1946+
def _start_stream(self, prompt: str, **kwargs):
1947+
"""Generator method that yields streaming chunks from the agent."""
1948+
# Reset the final display flag for each new conversation
1949+
self._final_display_shown = False
1950+
1951+
# Search for existing knowledge if any knowledge is provided
1952+
if self.knowledge:
1953+
search_results = self.knowledge.search(prompt, agent_id=self.agent_id)
1954+
if search_results:
1955+
# Check if search_results is a list of dictionaries or strings
1956+
if isinstance(search_results, dict) and 'results' in search_results:
1957+
# Extract memory content from the results
1958+
knowledge_content = "\n".join([result['memory'] for result in search_results['results']])
1959+
else:
1960+
# If search_results is a list of strings, join them directly
1961+
knowledge_content = "\n".join(search_results)
1962+
1963+
# Append found knowledge to the prompt
1964+
prompt = f"{prompt}\n\nKnowledge: {knowledge_content}"
1965+
1966+
# Get streaming response using the internal streaming method
1967+
for chunk in self._chat_stream(prompt, **kwargs):
1968+
yield chunk
1969+
1970+
def _chat_stream(self, prompt, temperature=0.2, tools=None, output_json=None, output_pydantic=None, reasoning_steps=False, **kwargs):
1971+
"""Internal streaming method that yields chunks from the LLM response."""
1972+
1973+
# Use the same logic as chat() but yield chunks instead of returning final response
1974+
if self._using_custom_llm:
1975+
# For custom LLM, yield chunks from the LLM instance
1976+
for chunk in self._custom_llm_stream(prompt, temperature, tools, output_json, output_pydantic, reasoning_steps, **kwargs):
1977+
yield chunk
1978+
else:
1979+
# For standard OpenAI client, yield chunks from the streaming response
1980+
for chunk in self._openai_stream(prompt, temperature, tools, output_json, output_pydantic, reasoning_steps, **kwargs):
1981+
yield chunk
1982+
1983+
def _custom_llm_stream(self, prompt, temperature=0.2, tools=None, output_json=None, output_pydantic=None, reasoning_steps=False, **kwargs):
1984+
"""Handle streaming for custom LLM instances."""
1985+
# Store chat history length for potential rollback
1986+
chat_history_length = len(self.chat_history)
1987+
1988+
try:
1989+
# Special handling for MCP tools when using provider/model format
1990+
if tools is None or (isinstance(tools, list) and len(tools) == 0):
1991+
tool_param = self.tools
1992+
else:
1993+
tool_param = tools
1994+
1995+
# Convert MCP tool objects to OpenAI format if needed
1996+
if tool_param is not None:
1997+
from ..mcp.mcp import MCP
1998+
if isinstance(tool_param, MCP) and hasattr(tool_param, 'to_openai_tool'):
1999+
openai_tool = tool_param.to_openai_tool()
2000+
if openai_tool:
2001+
if isinstance(openai_tool, list):
2002+
tool_param = openai_tool
2003+
else:
2004+
tool_param = [openai_tool]
2005+
2006+
# Normalize prompt content for consistent chat history storage
2007+
normalized_content = prompt
2008+
if isinstance(prompt, list):
2009+
normalized_content = next((item["text"] for item in prompt if item.get("type") == "text"), "")
2010+
2011+
# Prevent duplicate messages
2012+
if not (self.chat_history and
2013+
self.chat_history[-1].get("role") == "user" and
2014+
self.chat_history[-1].get("content") == normalized_content):
2015+
self.chat_history.append({"role": "user", "content": normalized_content})
2016+
2017+
# Get streaming response from LLM instance
2018+
if hasattr(self.llm_instance, 'get_response_stream'):
2019+
# Use streaming method if available
2020+
stream_response = self.llm_instance.get_response_stream(
2021+
prompt=prompt,
2022+
system_prompt=self._build_system_prompt(tools),
2023+
chat_history=self.chat_history,
2024+
temperature=temperature,
2025+
tools=tool_param,
2026+
output_json=output_json,
2027+
output_pydantic=output_pydantic,
2028+
verbose=self.verbose,
2029+
markdown=self.markdown,
2030+
console=self.console,
2031+
agent_name=self.name,
2032+
agent_role=self.role,
2033+
agent_tools=[t.__name__ if hasattr(t, '__name__') else str(t) for t in (tools if tools is not None else self.tools)],
2034+
reasoning_steps=reasoning_steps,
2035+
execute_tool_fn=self.execute_tool
2036+
)
2037+
2038+
accumulated_response = ""
2039+
for chunk in stream_response:
2040+
accumulated_response += chunk
2041+
yield chunk
2042+
2043+
# Add final response to chat history
2044+
self.chat_history.append({"role": "assistant", "content": accumulated_response})
2045+
2046+
else:
2047+
# Fallback to regular response if streaming not available
2048+
response_text = self.llm_instance.get_response(
2049+
prompt=prompt,
2050+
system_prompt=self._build_system_prompt(tools),
2051+
chat_history=self.chat_history,
2052+
temperature=temperature,
2053+
tools=tool_param,
2054+
output_json=output_json,
2055+
output_pydantic=output_pydantic,
2056+
verbose=self.verbose,
2057+
markdown=self.markdown,
2058+
console=self.console,
2059+
agent_name=self.name,
2060+
agent_role=self.role,
2061+
agent_tools=[t.__name__ if hasattr(t, '__name__') else str(t) for t in (tools if tools is not None else self.tools)],
2062+
reasoning_steps=reasoning_steps,
2063+
execute_tool_fn=self.execute_tool,
2064+
stream=True
2065+
)
2066+
2067+
self.chat_history.append({"role": "assistant", "content": response_text})
2068+
# Yield the complete response as a single chunk
2069+
yield response_text
2070+
2071+
except Exception as e:
2072+
# Rollback chat history on error
2073+
self.chat_history = self.chat_history[:chat_history_length]
2074+
yield f"Error: {str(e)}"
2075+
2076+
def _openai_stream(self, prompt, temperature=0.2, tools=None, output_json=None, output_pydantic=None, reasoning_steps=False, **kwargs):
2077+
"""Handle streaming for standard OpenAI client."""
2078+
# Store chat history length for potential rollback
2079+
chat_history_length = len(self.chat_history)
2080+
2081+
try:
2082+
# Use the new _build_messages helper method
2083+
messages, original_prompt = self._build_messages(prompt, temperature, output_json, output_pydantic)
2084+
2085+
# Normalize original_prompt for consistent chat history storage
2086+
normalized_content = original_prompt
2087+
if isinstance(original_prompt, list):
2088+
normalized_content = next((item["text"] for item in original_prompt if item.get("type") == "text"), "")
2089+
2090+
# Prevent duplicate messages
2091+
if not (self.chat_history and
2092+
self.chat_history[-1].get("role") == "user" and
2093+
self.chat_history[-1].get("content") == normalized_content):
2094+
self.chat_history.append({"role": "user", "content": normalized_content})
2095+
2096+
# Get streaming response from OpenAI client
2097+
if self._openai_client is None:
2098+
raise ValueError("OpenAI client is not initialized. Please provide OPENAI_API_KEY or use a custom LLM provider.")
2099+
2100+
# Stream the response using OpenAI client
2101+
accumulated_response = ""
2102+
for chunk in self._openai_client.chat_completion_with_tools_stream(
2103+
messages=messages,
2104+
model=self.llm,
2105+
temperature=temperature,
2106+
tools=self._format_tools_for_completion(tools),
2107+
execute_tool_fn=self.execute_tool,
2108+
reasoning_steps=reasoning_steps,
2109+
verbose=self.verbose,
2110+
max_iterations=10
2111+
):
2112+
accumulated_response += chunk
2113+
yield chunk
2114+
2115+
# Add the accumulated response to chat history
2116+
self.chat_history.append({"role": "assistant", "content": accumulated_response})
2117+
2118+
except Exception as e:
2119+
# Rollback chat history on error
2120+
self.chat_history = self.chat_history[:chat_history_length]
2121+
yield f"Error: {str(e)}"
19412122

19422123
def execute(self, task, context=None):
19432124
"""Execute a task synchronously - backward compatibility method"""

src/praisonai-agents/praisonaiagents/llm/openai_client.py

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,150 @@ async def achat_completion_with_tools(
10491049
break
10501050

10511051
return final_response
1052+
1053+
def chat_completion_with_tools_stream(
1054+
self,
1055+
messages: List[Dict[str, Any]],
1056+
model: str = "gpt-4o",
1057+
temperature: float = 0.7,
1058+
tools: Optional[List[Any]] = None,
1059+
execute_tool_fn: Optional[Callable] = None,
1060+
reasoning_steps: bool = False,
1061+
verbose: bool = True,
1062+
max_iterations: int = 10,
1063+
**kwargs
1064+
):
1065+
"""
1066+
Create a streaming chat completion with tool support.
1067+
1068+
This method yields chunks of the response as they are generated,
1069+
enabling real-time streaming to the user.
1070+
1071+
Args:
1072+
messages: List of message dictionaries
1073+
model: Model to use
1074+
temperature: Temperature for generation
1075+
tools: List of tools (can be callables, dicts, or strings)
1076+
execute_tool_fn: Function to execute tools
1077+
reasoning_steps: Whether to show reasoning
1078+
verbose: Whether to show verbose output
1079+
max_iterations: Maximum tool calling iterations
1080+
**kwargs: Additional API parameters
1081+
1082+
Yields:
1083+
String chunks of the response as they are generated
1084+
"""
1085+
# Format tools for OpenAI API
1086+
formatted_tools = self.format_tools(tools)
1087+
1088+
# Continue tool execution loop until no more tool calls are needed
1089+
iteration_count = 0
1090+
1091+
while iteration_count < max_iterations:
1092+
try:
1093+
# Create streaming response
1094+
response_stream = self._sync_client.chat.completions.create(
1095+
model=model,
1096+
messages=messages,
1097+
temperature=temperature,
1098+
tools=formatted_tools if formatted_tools else None,
1099+
stream=True,
1100+
**kwargs
1101+
)
1102+
1103+
full_response_text = ""
1104+
reasoning_content = ""
1105+
chunks = []
1106+
1107+
# Stream the response chunk by chunk
1108+
for chunk in response_stream:
1109+
chunks.append(chunk)
1110+
if chunk.choices and chunk.choices[0].delta.content:
1111+
content = chunk.choices[0].delta.content
1112+
full_response_text += content
1113+
yield content
1114+
1115+
# Handle reasoning content if enabled
1116+
if reasoning_steps and chunk.choices and hasattr(chunk.choices[0].delta, "reasoning_content"):
1117+
rc = chunk.choices[0].delta.reasoning_content
1118+
if rc:
1119+
reasoning_content += rc
1120+
yield f"[Reasoning: {rc}]"
1121+
1122+
# Process the complete response to check for tool calls
1123+
final_response = process_stream_chunks(chunks)
1124+
1125+
if not final_response:
1126+
return
1127+
1128+
# Check for tool calls
1129+
tool_calls = getattr(final_response.choices[0].message, 'tool_calls', None)
1130+
1131+
if tool_calls and execute_tool_fn:
1132+
# Convert ToolCall dataclass objects to dict for JSON serialization
1133+
serializable_tool_calls = []
1134+
for tc in tool_calls:
1135+
if isinstance(tc, ToolCall):
1136+
# Convert dataclass to dict
1137+
serializable_tool_calls.append({
1138+
"id": tc.id,
1139+
"type": tc.type,
1140+
"function": tc.function
1141+
})
1142+
else:
1143+
# Already an OpenAI object, keep as is
1144+
serializable_tool_calls.append(tc)
1145+
1146+
messages.append({
1147+
"role": "assistant",
1148+
"content": final_response.choices[0].message.content,
1149+
"tool_calls": serializable_tool_calls
1150+
})
1151+
1152+
for tool_call in tool_calls:
1153+
# Handle both ToolCall dataclass and OpenAI object
1154+
try:
1155+
if isinstance(tool_call, ToolCall):
1156+
function_name = tool_call.function["name"]
1157+
arguments = json.loads(tool_call.function["arguments"])
1158+
else:
1159+
function_name = tool_call.function.name
1160+
arguments = json.loads(tool_call.function.arguments)
1161+
except json.JSONDecodeError as e:
1162+
if verbose:
1163+
yield f"\n[Error parsing arguments for {function_name if 'function_name' in locals() else 'unknown function'}: {str(e)}]"
1164+
continue
1165+
1166+
if verbose:
1167+
yield f"\n[Calling function: {function_name}]"
1168+
1169+
# Execute the tool with error handling
1170+
try:
1171+
tool_result = execute_tool_fn(function_name, arguments)
1172+
results_str = json.dumps(tool_result) if tool_result else "Function returned an empty output"
1173+
except Exception as e:
1174+
results_str = f"Error executing function: {str(e)}"
1175+
if verbose:
1176+
yield f"\n[Function error: {str(e)}]"
1177+
1178+
if verbose:
1179+
yield f"\n[Function result: {results_str}]"
1180+
1181+
messages.append({
1182+
"role": "tool",
1183+
"tool_call_id": tool_call.id if hasattr(tool_call, 'id') else tool_call['id'],
1184+
"content": results_str
1185+
})
1186+
1187+
# Continue the loop to allow more tool calls
1188+
iteration_count += 1
1189+
else:
1190+
# No tool calls, we're done
1191+
break
1192+
1193+
except Exception as e:
1194+
yield f"Error: {str(e)}"
1195+
break
10521196

10531197
def parse_structured_output(
10541198
self,

0 commit comments

Comments
 (0)