From b3b7ed6fe82595ef95d60eb0cc2e740fc3c11dd2 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Thu, 22 Jan 2026 03:21:31 +0100 Subject: [PATCH 01/23] . --- .env.development | 8 ++-- backend/app/service/chat_service.py | 2 + backend/app/service/task.py | 7 ++++ backend/app/utils/agent.py | 32 ++++++++++++++- src/components/WorkFlow/node.tsx | 10 ++++- src/store/chatStore.ts | 61 +++++++++++++++++++++++++++++ 6 files changed, 113 insertions(+), 7 deletions(-) diff --git a/.env.development b/.env.development index 9d26a2f4f..9d5806aad 100644 --- a/.env.development +++ b/.env.development @@ -1,8 +1,6 @@ -VITE_BASE_URL=/api - -VITE_PROXY_URL=https://dev.eigent.ai - -VITE_USE_LOCAL_PROXY=false +VITE_BASE_URL=http://localhost:8000 +VITE_PROXY_URL=http://localhost:8000 +VITE_USE_LOCAL_PROXY=true # VITE_PROXY_URL=http://localhost:3001 # VITE_USE_LOCAL_PROXY=true diff --git a/backend/app/service/chat_service.py b/backend/app/service/chat_service.py index 04accf8bf..404ab646f 100644 --- a/backend/app/service/chat_service.py +++ b/backend/app/service/chat_service.py @@ -891,6 +891,8 @@ def on_stream_text(chunk): logger.warning(f"Cannot resume: workforce is None for project {options.project_id}") elif item.action == Action.decompose_text: yield sse_json("decompose_text", item.data) + elif item.action == Action.streaming_agent_output: + yield sse_json("streaming_agent_output", item.data) elif item.action == Action.decompose_progress: yield sse_json("to_sub_tasks", item.data) elif item.action == Action.new_agent: diff --git a/backend/app/service/task.py b/backend/app/service/task.py index d4958dd9c..33040cc35 100644 --- a/backend/app/service/task.py +++ b/backend/app/service/task.py @@ -22,6 +22,7 @@ class Action(str, Enum): new_task_state = "new_task_state" # backend -> user decompose_progress = "decompose_progress" # backend -> user (streaming decomposition) decompose_text = "decompose_text" # backend -> user (raw streaming text) + streaming_agent_output = "streaming_agent_output" # backend -> user (streaming agent output during task execution) start = "start" # user -> backend create_agent = "create_agent" # backend -> user activate_agent = "activate_agent" # backend -> user @@ -78,6 +79,11 @@ class ActionDecomposeTextData(BaseModel): data: dict +class ActionStreamingAgentOutputData(BaseModel): + action: Literal[Action.streaming_agent_output] = Action.streaming_agent_output + data: dict[Literal["agent_name", "process_task_id", "agent_id", "content", "is_final"], str | bool] + + class ActionNewTaskStateData(BaseModel): action: Literal[Action.new_task_state] = Action.new_task_state data: dict[Literal["task_id", "content", "state", "result", "failure_count"], str | int] @@ -249,6 +255,7 @@ class ActionSkipTaskData(BaseModel): | ActionSkipTaskData | ActionDecomposeTextData | ActionDecomposeProgressData + | ActionStreamingAgentOutputData ) diff --git a/backend/app/utils/agent.py b/backend/app/utils/agent.py index 91ef926ea..22ea04d27 100644 --- a/backend/app/utils/agent.py +++ b/backend/app/utils/agent.py @@ -125,6 +125,7 @@ def _schedule_async_task(coro): ActionCreateAgentData, ActionDeactivateAgentData, ActionDeactivateToolkitData, + ActionStreamingAgentOutputData, Agents, get_task_lock, ) @@ -269,7 +270,22 @@ def _stream_with_deactivate(): last_response = chunk # Accumulate content from each chunk (delta mode) if chunk.msg and chunk.msg.content: - accumulated_content += chunk.msg.content + delta_content = chunk.msg.content + accumulated_content += delta_content + # Stream output chunk to frontend (non-blocking) + asyncio.create_task( + task_lock.put_queue( + ActionStreamingAgentOutputData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "content": delta_content, + "is_final": False, + }, + ) + ) + ) yield chunk finally: total_tokens = 0 @@ -281,6 +297,20 @@ def _stream_with_deactivate(): ) if usage_info: total_tokens = usage_info.get("total_tokens", 0) + # Send final streaming output marker + asyncio.create_task( + task_lock.put_queue( + ActionStreamingAgentOutputData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "content": "", + "is_final": True, + }, + ) + ) + ) asyncio.create_task( task_lock.put_queue( ActionDeactivateAgentData( diff --git a/src/components/WorkFlow/node.tsx b/src/components/WorkFlow/node.tsx index 30b261ab4..b3fb8cad5 100644 --- a/src/components/WorkFlow/node.tsx +++ b/src/components/WorkFlow/node.tsx @@ -713,7 +713,15 @@ export function Node({ id, data }: NodeProps) {
{task.content}
{task?.status === "running" && ( -
+
+ {/* Streaming agent output */} + {chatStore.tasks[chatStore.activeTaskId as string]?.streamingAgentOutput?.[task.id] && ( +
+
+ {chatStore.tasks[chatStore.activeTaskId as string].streamingAgentOutput[task.id]} +
+
+ )} {/* active toolkit */} {task.toolkits && task.toolkits.length > 0 && diff --git a/src/store/chatStore.ts b/src/store/chatStore.ts index 6cc93c98d..f44e26391 100644 --- a/src/store/chatStore.ts +++ b/src/store/chatStore.ts @@ -44,6 +44,8 @@ interface Task { isContextExceeded?: boolean; // Streaming decompose text - stored separately to avoid frequent re-renders streamingDecomposeText: string; + // Streaming agent output - real-time agent output during task execution + streamingAgentOutput: { [processTaskId: string]: string }; } export interface ChatStore { @@ -107,6 +109,8 @@ export interface ChatStore { setNextTaskId: (taskId: string | null) => void; setStreamingDecomposeText: (taskId: string, text: string) => void; clearStreamingDecomposeText: (taskId: string) => void; + updateStreamingAgentOutput: (taskId: string, processTaskId: string, content: string) => void; + clearStreamingAgentOutput: (taskId: string, processTaskId: string) => void; } export type VanillaChatStore = { @@ -207,6 +211,7 @@ const chatStore = (initial?: Partial) => createStore()( isTakeControl: false, isTaskEdit: false, streamingDecomposeText: '', + streamingAgentOutput: {}, }, } })) @@ -794,6 +799,8 @@ const chatStore = (initial?: Partial) => createStore()( setIsContextExceeded, setStreamingDecomposeText, clearStreamingDecomposeText, + updateStreamingAgentOutput, + clearStreamingAgentOutput, setIsTaskEdit } = getCurrentChatStore() currentTaskId = getCurrentTaskId(); @@ -840,6 +847,24 @@ const chatStore = (initial?: Partial) => createStore()( return; } + // Handle streaming agent output during task execution + if (agentMessages.step === "streaming_agent_output") { + const data = agentMessages.data as { process_task_id?: string; content?: string; is_final?: boolean }; + const { process_task_id, content, is_final } = data; + const currentId = getCurrentTaskId(); + + if (!process_task_id) return; + + if (is_final) { + // Clear streaming output when final marker received + clearStreamingAgentOutput(currentId, process_task_id); + } else if (content) { + // Append streaming content + updateStreamingAgentOutput(currentId, process_task_id, content); + } + return; + } + if (agentMessages.step === "to_sub_tasks") { // Clear streaming decompose text when task splitting is done clearStreamingDecomposeText(currentTaskId); @@ -2606,6 +2631,42 @@ const chatStore = (initial?: Partial) => createStore()( }, }; }); + }, + updateStreamingAgentOutput: (taskId, processTaskId, content) => { + set((state) => { + if (!state.tasks[taskId]) return state; + const currentOutput = state.tasks[taskId].streamingAgentOutput[processTaskId] || ''; + return { + ...state, + tasks: { + ...state.tasks, + [taskId]: { + ...state.tasks[taskId], + streamingAgentOutput: { + ...state.tasks[taskId].streamingAgentOutput, + [processTaskId]: currentOutput + content, + }, + }, + }, + }; + }); + }, + clearStreamingAgentOutput: (taskId, processTaskId) => { + set((state) => { + if (!state.tasks[taskId]) return state; + const newStreamingAgentOutput = { ...state.tasks[taskId].streamingAgentOutput }; + delete newStreamingAgentOutput[processTaskId]; + return { + ...state, + tasks: { + ...state.tasks, + [taskId]: { + ...state.tasks[taskId], + streamingAgentOutput: newStreamingAgentOutput, + }, + }, + }; + }); } }) ); From e1ee5c1a58be7db6eb4743f80a638e3cd982f40f Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Thu, 22 Jan 2026 13:50:55 +0100 Subject: [PATCH 02/23] fix: disable streaming for worker agents to prevent camel library error The camel library throws 'AsyncChatCompletionStreamManager object has no attribute choices' when streaming is enabled for agents with tools attached. Changes: - Disable streaming for all astep calls in ListenChatAgent to avoid the error - Preserve model_config_dict when cloning agents (for future streaming support) - Add streaming infrastructure (ActionStreamingAgentOutputData) for when camel library support is available - Handle AsyncStreamingChatAgentResponse in single_agent_worker.py Note: Streaming text output for worker agents is blocked by upstream camel library limitation. Task decomposition streaming still works via decompose_text. --- backend/app/utils/agent.py | 116 +++++++++++++++++++---- backend/app/utils/single_agent_worker.py | 29 +++++- 2 files changed, 123 insertions(+), 22 deletions(-) diff --git a/backend/app/utils/agent.py b/backend/app/utils/agent.py index 22ea04d27..3915db8b0 100644 --- a/backend/app/utils/agent.py +++ b/backend/app/utils/agent.py @@ -386,9 +386,80 @@ async def astep( ) try: + # NOTE: Streaming with tool calls causes AsyncChatCompletionStreamManager error in camel library + # The library doesn't properly handle streaming responses when tools are attached. + # Disable streaming for ALL astep calls to avoid this issue. + # Streaming output is still achieved via the decompose_text event during task decomposition. + self.model_backend.model_config_dict["stream"] = False + res = await super().astep(input_message, response_format) if isinstance(res, AsyncStreamingChatAgentResponse): - res = await res._get_final_response() + # Wrap the async streaming response to send chunks to frontend + async def _async_stream_with_deactivate(): + accumulated_content = "" + last_chunk = None + chunk_count = 0 + try: + async for chunk in res: + chunk_count += 1 + last_chunk = chunk + if chunk.msg and chunk.msg.content: + delta_content = chunk.msg.content + accumulated_content += delta_content + # Stream output chunk to frontend (non-blocking) + asyncio.create_task( + task_lock.put_queue( + ActionStreamingAgentOutputData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "content": delta_content, + "is_final": False, + }, + ) + ) + ) + yield chunk + finally: + total_tokens = 0 + if last_chunk: + usage_info = ( + last_chunk.info.get("usage") + or last_chunk.info.get("token_usage") + or {} + ) + if usage_info: + total_tokens = usage_info.get("total_tokens", 0) + # Send final streaming output marker + asyncio.create_task( + task_lock.put_queue( + ActionStreamingAgentOutputData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "content": "", + "is_final": True, + }, + ) + ) + ) + asyncio.create_task( + task_lock.put_queue( + ActionDeactivateAgentData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "message": accumulated_content, + "tokens": total_tokens, + }, + ) + ) + ) + + return AsyncStreamingChatAgentResponse(_async_stream_with_deactivate()) except ModelProcessingError as e: res = None error_info = e @@ -412,28 +483,28 @@ async def astep( message = f"Error processing message: {e!s}" total_tokens = 0 - if res is not None: + # For non-streaming responses, handle deactivation here + if res is not None and not isinstance(res, AsyncStreamingChatAgentResponse): message = res.msg.content if res.msg else "" - total_tokens = res.info["usage"]["total_tokens"] + usage_info = res.info.get("usage") or res.info.get("token_usage") or {} + total_tokens = usage_info.get("total_tokens", 0) if usage_info else 0 traceroot_logger.info( f"Agent {self.agent_name} completed step, tokens used: {total_tokens}" ) - assert message is not None - - asyncio.create_task( - task_lock.put_queue( - ActionDeactivateAgentData( - data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "agent_id": self.agent_id, - "message": message, - "tokens": total_tokens, - }, + asyncio.create_task( + task_lock.put_queue( + ActionDeactivateAgentData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "message": message, + "tokens": total_tokens, + }, + ) ) ) - ) if error_info is not None: raise error_info @@ -735,6 +806,10 @@ def clone(self, with_memory: bool = False) -> ChatAgent: new_agent.process_task_id = self.process_task_id + # Preserve model_config_dict (including stream setting) from original agent + if hasattr(self, 'model_backend') and hasattr(self.model_backend, 'model_config_dict'): + new_agent.model_backend.model_config_dict.update(self.model_backend.model_config_dict) + # Copy memory if requested if with_memory: # Get all records from the current memory @@ -809,7 +884,14 @@ def agent_model( else: model_config[k] = v - if agent_name == Agents.task_agent: + # Enable streaming for task decomposition and worker agents + if agent_name in { + Agents.task_agent, + Agents.developer_agent, + Agents.browser_agent, + Agents.document_agent, + Agents.multi_modal_agent, + }: model_config["stream"] = True if agent_name == Agents.browser_agent: try: diff --git a/backend/app/utils/single_agent_worker.py b/backend/app/utils/single_agent_worker.py index 5f3fae8f7..2f6c66431 100644 --- a/backend/app/utils/single_agent_worker.py +++ b/backend/app/utils/single_agent_worker.py @@ -106,10 +106,17 @@ async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState if isinstance(response, AsyncStreamingChatAgentResponse): # With stream_accumulate=False, we need to accumulate delta content accumulated_content = "" + last_chunk = None + chunk_count = 0 async for chunk in response: + chunk_count += 1 + last_chunk = chunk if chunk.msg and chunk.msg.content: accumulated_content += chunk.msg.content + logger.info(f"Streaming complete: {chunk_count} chunks, content_length={len(accumulated_content)}") response_content = accumulated_content + # Store usage info from last chunk for later use + response._last_chunk_info = last_chunk.info if last_chunk else {} else: # Regular ChatAgentResponse response_content = response.msg.content if response.msg else "" @@ -124,20 +131,32 @@ async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState ) else: # Use native structured output if supported - response = await worker_agent.astep(prompt, response_format=TaskResult) + # NOTE: Temporarily disable streaming for structured output because + # the camel library doesn't properly handle AsyncChatCompletionStreamManager + # from OpenAI's structured output streaming API + original_stream = worker_agent.model_backend.model_config_dict.get("stream", False) + worker_agent.model_backend.model_config_dict["stream"] = False + try: + response = await worker_agent.astep(prompt, response_format=TaskResult) + finally: + worker_agent.model_backend.model_config_dict["stream"] = original_stream - # Handle streaming response for native output + # Handle streaming response for native output (shouldn't happen now but keep for safety) if isinstance(response, AsyncStreamingChatAgentResponse): task_result = None # With stream_accumulate=False, we need to accumulate delta content accumulated_content = "" + last_chunk = None async for chunk in response: + last_chunk = chunk if chunk.msg: if chunk.msg.content: accumulated_content += chunk.msg.content if chunk.msg.parsed: task_result = chunk.msg.parsed response_content = accumulated_content + # Store usage info from last chunk for later use + response._last_chunk_info = last_chunk.info if last_chunk else {} # If no parsed result found in streaming, create fallback if task_result is None: task_result = TaskResult( @@ -151,9 +170,9 @@ async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState # Get token usage from the response if isinstance(response, AsyncStreamingChatAgentResponse): - # For streaming responses, get the final response info - final_response = await response - usage_info = final_response.info.get("usage") or final_response.info.get("token_usage") + # For streaming responses, get info from last chunk captured during iteration + chunk_info = getattr(response, '_last_chunk_info', {}) + usage_info = chunk_info.get("usage") or chunk_info.get("token_usage") else: usage_info = response.info.get("usage") or response.info.get("token_usage") total_tokens = usage_info.get("total_tokens", 0) if usage_info else 0 From abae37818553ddc13c8832aa34b9ed69a659372a Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Thu, 22 Jan 2026 14:34:42 +0100 Subject: [PATCH 03/23] feat: add streaming tool activity messages during task execution Shows real-time '[Tool] Shell Exec...', '[Tool] Write File...' etc. messages in the UI while tools are executing, giving users feedback on agent activity. Changes: - Add ActionStreamingAgentOutputData events in @listen_toolkit decorator - Add streaming events in _execute_tool and _aexecute_tool methods - Disable model streaming for worker agents (camel library limitation) - Replace emoji with [Tool] text for better compatibility Closes #87 --- backend/app/utils/agent.py | 28 ++++++++++++++++++++++ backend/app/utils/listen/toolkit_listen.py | 28 ++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/backend/app/utils/agent.py b/backend/app/utils/agent.py index 3915db8b0..662f6fbd0 100644 --- a/backend/app/utils/agent.py +++ b/backend/app/utils/agent.py @@ -557,6 +557,21 @@ def _execute_tool(self, tool_call_request: ToolCallRequest) -> ToolCallingRecord ) ) ) + # Stream tool activity to frontend as "thinking" text + tool_display = func_name.replace("_", " ").title() + asyncio.create_task( + task_lock.put_queue( + ActionStreamingAgentOutputData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "content": f"[Tool] {tool_display}...\n", + "is_final": False, + }, + ) + ) + ) # Set process_task context for all tool executions with set_process_task(self.process_task_id): raw_result = tool(**args) @@ -683,6 +698,19 @@ async def _aexecute_tool( }, ) ) + # Stream tool activity to frontend as "thinking" text + tool_display = func_name.replace("_", " ").title() + await task_lock.put_queue( + ActionStreamingAgentOutputData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "content": f"[Tool] {tool_display}...\n", + "is_final": False, + }, + ) + ) try: # Set process_task context for all tool executions with set_process_task(self.process_task_id): diff --git a/backend/app/utils/listen/toolkit_listen.py b/backend/app/utils/listen/toolkit_listen.py index beb7f9d7f..2c371c4fd 100644 --- a/backend/app/utils/listen/toolkit_listen.py +++ b/backend/app/utils/listen/toolkit_listen.py @@ -10,6 +10,7 @@ from app.service.task import ( ActionActivateToolkitData, ActionDeactivateToolkitData, + ActionStreamingAgentOutputData, get_task_lock, ) from app.utils.toolkit.abstract_toolkit import AbstractToolkit @@ -137,6 +138,19 @@ async def async_wrapper(*args, **kwargs): }, ) await task_lock.put_queue(activate_data) + # Stream tool activity to frontend as "thinking" text + tool_display = method_name.replace("_", " ").title() + await task_lock.put_queue( + ActionStreamingAgentOutputData( + data={ + "agent_name": toolkit.agent_name, + "process_task_id": process_task_id, + "agent_id": getattr(toolkit, 'agent_id', ''), + "content": f"[Tool] {tool_display}...\n", + "is_final": False, + }, + ) + ) error = None res = None try: @@ -242,6 +256,20 @@ def sync_wrapper(*args, **kwargs): }, ) _safe_put_queue(task_lock, activate_data) + # Stream tool activity to frontend as "thinking" text + tool_display = method_name.replace("_", " ").title() + _safe_put_queue( + task_lock, + ActionStreamingAgentOutputData( + data={ + "agent_name": toolkit.agent_name, + "process_task_id": process_task_id, + "agent_id": getattr(toolkit, 'agent_id', ''), + "content": f"[Tool] {tool_display}...\n", + "is_final": False, + }, + ) + ) error = None res = None From 309883d7ca32f99ab000da64116e0e357c8f5381 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Fri, 23 Jan 2026 10:05:08 +0100 Subject: [PATCH 04/23] fix: correct comment about streaming error cause The issue is streaming + response_format (structured output), not streaming + tools --- backend/app/utils/agent.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/app/utils/agent.py b/backend/app/utils/agent.py index 662f6fbd0..3abe97b08 100644 --- a/backend/app/utils/agent.py +++ b/backend/app/utils/agent.py @@ -386,10 +386,10 @@ async def astep( ) try: - # NOTE: Streaming with tool calls causes AsyncChatCompletionStreamManager error in camel library - # The library doesn't properly handle streaming responses when tools are attached. - # Disable streaming for ALL astep calls to avoid this issue. - # Streaming output is still achieved via the decompose_text event during task decomposition. + # NOTE: Streaming with response_format (structured output) causes AsyncChatCompletionStreamManager error + # in camel library. OpenAI returns AsyncChatCompletionStreamManager when combining streaming + structured + # output, but _handle_batch_response tries to access .choices which doesn't exist on that object. + # Disable streaming for astep calls that use response_format to avoid this issue. self.model_backend.model_config_dict["stream"] = False res = await super().astep(input_message, response_format) From f8bad622eaf5e0243e0a01dc8b38e3c5217df3dd Mon Sep 17 00:00:00 2001 From: a7m-1st Date: Sat, 24 Jan 2026 01:24:27 +0300 Subject: [PATCH 05/23] choe: revert env --- .env.development | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/.env.development b/.env.development index 9d5806aad..9d26a2f4f 100644 --- a/.env.development +++ b/.env.development @@ -1,6 +1,8 @@ -VITE_BASE_URL=http://localhost:8000 -VITE_PROXY_URL=http://localhost:8000 -VITE_USE_LOCAL_PROXY=true +VITE_BASE_URL=/api + +VITE_PROXY_URL=https://dev.eigent.ai + +VITE_USE_LOCAL_PROXY=false # VITE_PROXY_URL=http://localhost:3001 # VITE_USE_LOCAL_PROXY=true From 52df15b1e3bb2b3f245b967d150a9e42b5c357b0 Mon Sep 17 00:00:00 2001 From: a7m-1st Date: Sat, 24 Jan 2026 01:40:39 +0300 Subject: [PATCH 06/23] enhance: use _schedule_async_task to avoid async race conditions in generating functions --- backend/app/utils/agent.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/app/utils/agent.py b/backend/app/utils/agent.py index 3abe97b08..36c62b687 100644 --- a/backend/app/utils/agent.py +++ b/backend/app/utils/agent.py @@ -544,7 +544,7 @@ def _execute_tool(self, tool_call_request: ToolCallRequest) -> ToolCallingRecord # Only send activate event if tool is NOT wrapped by @listen_toolkit if not has_listen_decorator: - asyncio.create_task( + _schedule_async_task( task_lock.put_queue( ActionActivateToolkitData( data={ @@ -559,7 +559,7 @@ def _execute_tool(self, tool_call_request: ToolCallRequest) -> ToolCallingRecord ) # Stream tool activity to frontend as "thinking" text tool_display = func_name.replace("_", " ").title() - asyncio.create_task( + _schedule_async_task( task_lock.put_queue( ActionStreamingAgentOutputData( data={ @@ -602,7 +602,7 @@ def _execute_tool(self, tool_call_request: ToolCallRequest) -> ToolCallingRecord # Only send deactivate event if tool is NOT wrapped by @listen_toolkit if not has_listen_decorator: - asyncio.create_task( + _schedule_async_task( task_lock.put_queue( ActionDeactivateToolkitData( data={ From 9edb54878fe7c203ae02a6a4343682cc87f8972f Mon Sep 17 00:00:00 2001 From: a7m-1st Date: Sat, 24 Jan 2026 02:23:01 +0300 Subject: [PATCH 07/23] enhance: use _schedule_async_task to avoid async race conditions in generating functions --- backend/app/utils/agent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/app/utils/agent.py b/backend/app/utils/agent.py index 36c62b687..ac7143ddc 100644 --- a/backend/app/utils/agent.py +++ b/backend/app/utils/agent.py @@ -273,7 +273,7 @@ def _stream_with_deactivate(): delta_content = chunk.msg.content accumulated_content += delta_content # Stream output chunk to frontend (non-blocking) - asyncio.create_task( + _schedule_async_task( task_lock.put_queue( ActionStreamingAgentOutputData( data={ @@ -407,7 +407,7 @@ async def _async_stream_with_deactivate(): delta_content = chunk.msg.content accumulated_content += delta_content # Stream output chunk to frontend (non-blocking) - asyncio.create_task( + _schedule_async_task( task_lock.put_queue( ActionStreamingAgentOutputData( data={ From 8a44f386a66aab6151fd82cc67ca6cbf5d5bc893 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Sat, 24 Jan 2026 00:37:17 +0100 Subject: [PATCH 08/23] fix: address PR review comments from a7m-1st - Add is_final field to AgentMessage type in chatbox.d.ts - Simplify chatStore.ts streaming handler (remove type cast, use currentTaskId) - Use deep copy for model_config_dict to ensure isolation between cloned agents --- backend/app/utils/agent.py | 4 +++- src/store/chatStore.ts | 8 +++----- src/types/chatbox.d.ts | 2 ++ 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/backend/app/utils/agent.py b/backend/app/utils/agent.py index ac7143ddc..68c6b24de 100644 --- a/backend/app/utils/agent.py +++ b/backend/app/utils/agent.py @@ -1,5 +1,6 @@ import asyncio import contextvars +import copy import json import os import platform @@ -835,8 +836,9 @@ def clone(self, with_memory: bool = False) -> ChatAgent: new_agent.process_task_id = self.process_task_id # Preserve model_config_dict (including stream setting) from original agent + # Use deep copy to ensure isolation - nested dicts won't affect other agents if hasattr(self, 'model_backend') and hasattr(self.model_backend, 'model_config_dict'): - new_agent.model_backend.model_config_dict.update(self.model_backend.model_config_dict) + new_agent.model_backend.model_config_dict = copy.deepcopy(self.model_backend.model_config_dict) # Copy memory if requested if with_memory: diff --git a/src/store/chatStore.ts b/src/store/chatStore.ts index 048720737..5c8468f60 100644 --- a/src/store/chatStore.ts +++ b/src/store/chatStore.ts @@ -849,18 +849,16 @@ const chatStore = (initial?: Partial) => createStore()( // Handle streaming agent output during task execution if (agentMessages.step === "streaming_agent_output") { - const data = agentMessages.data as { process_task_id?: string; content?: string; is_final?: boolean }; - const { process_task_id, content, is_final } = data; - const currentId = getCurrentTaskId(); + const { process_task_id, content, is_final } = agentMessages.data; if (!process_task_id) return; if (is_final) { // Clear streaming output when final marker received - clearStreamingAgentOutput(currentId, process_task_id); + clearStreamingAgentOutput(currentTaskId, process_task_id); } else if (content) { // Append streaming content - updateStreamingAgentOutput(currentId, process_task_id, content); + updateStreamingAgentOutput(currentTaskId, process_task_id, content); } return; } diff --git a/src/types/chatbox.d.ts b/src/types/chatbox.d.ts index 94beef44f..e61b59ca9 100644 --- a/src/types/chatbox.d.ts +++ b/src/types/chatbox.d.ts @@ -127,6 +127,8 @@ declare global { current_length?: number; max_length?: number; text?: string; + // Streaming agent output + is_final?: boolean; }; status?: 'running' | 'filled' | 'completed'; } From df77971e8b2d9e78f3ef452688b8bef4673706e2 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Sat, 24 Jan 2026 00:54:45 +0100 Subject: [PATCH 09/23] refactor: remove tool streaming events, keep infra for model output streaming Per a7m-1st's review: activate_toolkit already provides sufficient tool activity info. Streaming infrastructure preserved in ListenChatAgent for future model output streaming once CAMEL PR #3743 is merged. --- backend/app/utils/listen/toolkit_listen.py | 28 ---------------------- 1 file changed, 28 deletions(-) diff --git a/backend/app/utils/listen/toolkit_listen.py b/backend/app/utils/listen/toolkit_listen.py index 2c371c4fd..beb7f9d7f 100644 --- a/backend/app/utils/listen/toolkit_listen.py +++ b/backend/app/utils/listen/toolkit_listen.py @@ -10,7 +10,6 @@ from app.service.task import ( ActionActivateToolkitData, ActionDeactivateToolkitData, - ActionStreamingAgentOutputData, get_task_lock, ) from app.utils.toolkit.abstract_toolkit import AbstractToolkit @@ -138,19 +137,6 @@ async def async_wrapper(*args, **kwargs): }, ) await task_lock.put_queue(activate_data) - # Stream tool activity to frontend as "thinking" text - tool_display = method_name.replace("_", " ").title() - await task_lock.put_queue( - ActionStreamingAgentOutputData( - data={ - "agent_name": toolkit.agent_name, - "process_task_id": process_task_id, - "agent_id": getattr(toolkit, 'agent_id', ''), - "content": f"[Tool] {tool_display}...\n", - "is_final": False, - }, - ) - ) error = None res = None try: @@ -256,20 +242,6 @@ def sync_wrapper(*args, **kwargs): }, ) _safe_put_queue(task_lock, activate_data) - # Stream tool activity to frontend as "thinking" text - tool_display = method_name.replace("_", " ").title() - _safe_put_queue( - task_lock, - ActionStreamingAgentOutputData( - data={ - "agent_name": toolkit.agent_name, - "process_task_id": process_task_id, - "agent_id": getattr(toolkit, 'agent_id', ''), - "content": f"[Tool] {tool_display}...\n", - "is_final": False, - }, - ) - ) error = None res = None From ceddf47f2815d5b22130b1378fa3045d932ab489 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Mon, 26 Jan 2026 13:47:23 +0100 Subject: [PATCH 10/23] style: change leading-tight to leading-normal for better readability Per Douglasymlai's review - improves text readability for users --- src/components/WorkFlow/node.tsx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/components/WorkFlow/node.tsx b/src/components/WorkFlow/node.tsx index b44113cd1..eed3f2950 100644 --- a/src/components/WorkFlow/node.tsx +++ b/src/components/WorkFlow/node.tsx @@ -460,7 +460,7 @@ export function Node({ id, data }: NodeProps) {
{/* {JSON.stringify(data.agent)} */} {agentToolkits[ @@ -551,7 +551,7 @@ export function Node({ id, data }: NodeProps) {
{data.agent?.tasks && data.agent?.tasks.length > 0 && (
- {/*
Subtasks
*/} + {/*
Subtasks
*/}
{/* Streaming agent output */} {chatStore.tasks[chatStore.activeTaskId as string]?.streamingAgentOutput?.[task.id] && ( -
+
{chatStore.tasks[chatStore.activeTaskId as string].streamingAgentOutput[task.id]}
From 81b5f6eaa1184ab9515815fd0392aea02215778c Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Mon, 26 Jan 2026 23:06:56 +0100 Subject: [PATCH 11/23] refactor: remove AsyncChatCompletionStreamManager workaround The fix was merged upstream in camel-ai/camel#3743. Once camel-ai is updated to include this fix, streaming with structured output will work correctly without disabling streaming. --- backend/app/utils/agent.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/backend/app/utils/agent.py b/backend/app/utils/agent.py index 5eada0ee2..bfc8a0147 100644 --- a/backend/app/utils/agent.py +++ b/backend/app/utils/agent.py @@ -382,12 +382,6 @@ async def astep( ) try: - # NOTE: Streaming with response_format (structured output) causes AsyncChatCompletionStreamManager error - # in camel library. OpenAI returns AsyncChatCompletionStreamManager when combining streaming + structured - # output, but _handle_batch_response tries to access .choices which doesn't exist on that object. - # Disable streaming for astep calls that use response_format to avoid this issue. - self.model_backend.model_config_dict["stream"] = False - res = await super().astep(input_message, response_format) if isinstance(res, AsyncStreamingChatAgentResponse): # Wrap the async streaming response to send chunks to frontend From f4574acf74ff7f2708219cde6ae8bcc925826768 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Mon, 26 Jan 2026 23:08:38 +0100 Subject: [PATCH 12/23] refactor: remove AsyncChatCompletionStreamManager workaround from single_agent_worker The fix was merged upstream in camel-ai/camel#3743. --- backend/app/utils/single_agent_worker.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/backend/app/utils/single_agent_worker.py b/backend/app/utils/single_agent_worker.py index 96d1664ce..72ea1e23e 100644 --- a/backend/app/utils/single_agent_worker.py +++ b/backend/app/utils/single_agent_worker.py @@ -145,15 +145,7 @@ async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState ) else: # Use native structured output if supported - # NOTE: Temporarily disable streaming for structured output because - # the camel library doesn't properly handle AsyncChatCompletionStreamManager - # from OpenAI's structured output streaming API - original_stream = worker_agent.model_backend.model_config_dict.get("stream", False) - worker_agent.model_backend.model_config_dict["stream"] = False - try: - response = await worker_agent.astep(prompt, response_format=TaskResult) - finally: - worker_agent.model_backend.model_config_dict["stream"] = original_stream + response = await worker_agent.astep(prompt, response_format=TaskResult) # Handle streaming response for native output (shouldn't happen now but keep for safety) if isinstance(response, AsyncStreamingChatAgentResponse): From 6759353dd72a064f52c15dee5cf3c0bea6e1e011 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Mon, 26 Jan 2026 23:17:12 +0100 Subject: [PATCH 13/23] chore: update camel-ai to 0.2.85 Includes fix for AsyncChatCompletionStreamManager from PR #3743 --- backend/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/pyproject.toml b/backend/pyproject.toml index ed7ab050d..f1e2579a3 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -6,7 +6,7 @@ readme = "README.md" requires-python = ">=3.10,<3.11" dependencies = [ "pip>=23.0", - "camel-ai[eigent]==0.2.85a0", + "camel-ai[eigent]==0.2.85", "fastapi>=0.115.12", "fastapi-babel>=1.0.0", "uvicorn[standard]>=0.34.2", From 32316d22d6f8d32a82b1927d5c20a0f57126f6ef Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Mon, 26 Jan 2026 23:51:19 +0100 Subject: [PATCH 14/23] refactor: update clone preservation comment Remove specific mention of stream setting since the workaround for AsyncChatCompletionStreamManager is no longer needed. --- backend/app/utils/agent.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/app/utils/agent.py b/backend/app/utils/agent.py index bfc8a0147..96eea6da3 100644 --- a/backend/app/utils/agent.py +++ b/backend/app/utils/agent.py @@ -794,9 +794,9 @@ def clone(self, with_memory: bool = False) -> ChatAgent: new_agent.process_task_id = self.process_task_id - # Preserve model_config_dict (including stream setting) from original agent - # Use deep copy to ensure isolation - nested dicts won't affect other agents - if hasattr(self, 'model_backend') and hasattr(self.model_backend, 'model_config_dict'): + # Preserve model_config_dict from original agent for consistent behavior + # Deep copy ensures isolation - config changes won't affect other agents + if hasattr(self, "model_backend") and hasattr(self.model_backend, "model_config_dict"): new_agent.model_backend.model_config_dict = copy.deepcopy(self.model_backend.model_config_dict) # Copy memory if requested From 5c6566906375ca6896fc0b5a606697322101389c Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Sun, 1 Feb 2026 20:22:43 +0100 Subject: [PATCH 15/23] fix: PEP8 line too long in task.py --- backend/app/service/task.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/backend/app/service/task.py b/backend/app/service/task.py index f1d1ec5bb..c533ea5dd 100644 --- a/backend/app/service/task.py +++ b/backend/app/service/task.py @@ -45,7 +45,8 @@ class Action(str, Enum): # backend -> user (streaming decomposition) decompose_progress = "decompose_progress" decompose_text = "decompose_text" # backend -> user (raw streaming text) - streaming_agent_output = "streaming_agent_output" # backend -> user (streaming agent output during task execution) + # backend -> user (streaming agent output during task execution) + streaming_agent_output = "streaming_agent_output" start = "start" # user -> backend create_agent = "create_agent" # backend -> user activate_agent = "activate_agent" # backend -> user From 927244f36b413813706c8f5759cbc7625bcf7a69 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Sun, 1 Feb 2026 20:27:16 +0100 Subject: [PATCH 16/23] style: yapf format task.py --- backend/app/service/task.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/app/service/task.py b/backend/app/service/task.py index c533ea5dd..29ce99606 100644 --- a/backend/app/service/task.py +++ b/backend/app/service/task.py @@ -105,8 +105,10 @@ class ActionDecomposeTextData(BaseModel): class ActionStreamingAgentOutputData(BaseModel): - action: Literal[Action.streaming_agent_output] = Action.streaming_agent_output - data: dict[Literal["agent_name", "process_task_id", "agent_id", "content", "is_final"], str | bool] + action: Literal[Action.streaming_agent_output + ] = Action.streaming_agent_output + data: dict[Literal["agent_name", "process_task_id", "agent_id", "content", + "is_final"], str | bool] class ActionNewTaskStateData(BaseModel): From 4dfef33fcc34ab5507d9fe0d369ccdf5d7429029 Mon Sep 17 00:00:00 2001 From: a7m-1st Date: Tue, 3 Feb 2026 04:25:03 +0300 Subject: [PATCH 17/23] chore: isolate set_main_event_loop & _schedule_async_task --- backend/app/agent/__init__.py | 3 +- backend/app/agent/agent_model.py | 59 +--- backend/app/agent/listen_chat_agent.py | 319 ++++++++------------ backend/app/service/chat_service.py | 3 +- backend/app/utils/event_loop_utils.py | 69 +++++ backend/tests/app/agent/test_agent_model.py | 1 - 6 files changed, 195 insertions(+), 259 deletions(-) create mode 100644 backend/app/utils/event_loop_utils.py diff --git a/backend/app/agent/__init__.py b/backend/app/agent/__init__.py index eda5abe29..264103bba 100644 --- a/backend/app/agent/__init__.py +++ b/backend/app/agent/__init__.py @@ -12,7 +12,7 @@ # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -from app.agent.agent_model import agent_model, set_main_event_loop +from app.agent.agent_model import agent_model from app.agent.factory import ( browser_agent, developer_agent, @@ -31,7 +31,6 @@ "agent_model", "get_mcp_tools", "get_toolkits", - "set_main_event_loop", "browser_agent", "developer_agent", "document_agent", diff --git a/backend/app/agent/agent_model.py b/backend/app/agent/agent_model.py index 08c1f3d1c..8da05b9f6 100644 --- a/backend/app/agent/agent_model.py +++ b/backend/app/agent/agent_model.py @@ -12,13 +12,11 @@ # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -import asyncio -import contextvars import logging import uuid -from threading import Lock from typing import Any, Callable +from app.utils.event_loop_utils import _schedule_async_task from app.agent.listen_chat_agent import ListenChatAgent, logger from app.model.chat import AgentModelConfig, Chat from app.service.task import ActionCreateAgentData, Agents, get_task_lock @@ -27,61 +25,6 @@ from camel.toolkits import FunctionTool, RegisteredAgentToolkit from camel.types import ModelPlatformType -# Thread-safe reference to main event loop using contextvars -# This ensures each request has its own event loop reference, -# avoiding race conditions -_main_event_loop_var: contextvars.ContextVar[asyncio.AbstractEventLoop - | None] = contextvars.ContextVar( - "_main_event_loop", - default=None) - -# Global fallback for main event loop reference -# Used when contextvars don't propagate to worker threads -# (e.g., asyncio.to_thread) -_GLOBAL_MAIN_LOOP: asyncio.AbstractEventLoop | None = None -_GLOBAL_MAIN_LOOP_LOCK = Lock() - - -def set_main_event_loop(loop: asyncio.AbstractEventLoop | None): - """Set the main event loop reference for thread-safe task scheduling. - - This should be called from the main async context before spawning threads - that need to schedule async tasks. Uses both contextvars (for request - isolation) and a global fallback (for thread pool workers where - contextvars may not propagate). - """ - global _GLOBAL_MAIN_LOOP - _main_event_loop_var.set(loop) - with _GLOBAL_MAIN_LOOP_LOCK: - _GLOBAL_MAIN_LOOP = loop - - -def _schedule_async_task(coro): - """Schedule an async coroutine as a task, thread-safe. - - This function handles scheduling from both the main event loop thread - and from worker threads (e.g., when using asyncio.to_thread). - """ - try: - # Try to get the running loop (works in main event loop thread) - loop = asyncio.get_running_loop() - loop.create_task(coro) - except RuntimeError: - # No running loop in this thread (we're in a worker thread) - # First try contextvars, then fallback to global reference - main_loop = _main_event_loop_var.get() - if main_loop is None: - with _GLOBAL_MAIN_LOOP_LOCK: - main_loop = _GLOBAL_MAIN_LOOP - if main_loop is not None and main_loop.is_running(): - asyncio.run_coroutine_threadsafe(coro, main_loop) - else: - # This should not happen in normal operation - log error and skip - logging.error("No event loop available for async task " - "scheduling, task skipped. Ensure " - "set_main_event_loop() is called " - "before parallel agent creation.") - def agent_model( agent_name: str, diff --git a/backend/app/agent/listen_chat_agent.py b/backend/app/agent/listen_chat_agent.py index 9ae393c32..03fa8fa8d 100644 --- a/backend/app/agent/listen_chat_agent.py +++ b/backend/app/agent/listen_chat_agent.py @@ -13,12 +13,12 @@ # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= import asyncio -import contextvars import json import logging -from threading import Event, Lock +from threading import Event from typing import Any, Callable, Dict, List, Tuple +from app.utils.event_loop_utils import _schedule_async_task from app.service.task import (Action, ActionActivateAgentData, ActionActivateToolkitData, ActionBudgetNotEnough, ActionDeactivateAgentData, @@ -26,57 +26,6 @@ ActionStreamingAgentOutputData, get_task_lock, set_process_task) -# Thread-safe reference to main event loop using contextvars -# This ensures each request has its own event loop reference, avoiding race conditions -_main_event_loop_var: contextvars.ContextVar[asyncio.AbstractEventLoop | None] = contextvars.ContextVar( - "_main_event_loop", default=None -) - -# Global fallback for main event loop reference -# Used when contextvars don't propagate to worker threads (e.g., asyncio.to_thread) -_GLOBAL_MAIN_LOOP: asyncio.AbstractEventLoop | None = None -_GLOBAL_MAIN_LOOP_LOCK = Lock() - - -def set_main_event_loop(loop: asyncio.AbstractEventLoop | None): - """Set the main event loop reference for thread-safe task scheduling. - - This should be called from the main async context before spawning threads - that need to schedule async tasks. Uses both contextvars (for request isolation) - and a global fallback (for thread pool workers where contextvars may not propagate). - """ - global _GLOBAL_MAIN_LOOP - _main_event_loop_var.set(loop) - with _GLOBAL_MAIN_LOOP_LOCK: - _GLOBAL_MAIN_LOOP = loop - - -def _schedule_async_task(coro): - """Schedule an async coroutine as a task, thread-safe. - - This function handles scheduling from both the main event loop thread - and from worker threads (e.g., when using asyncio.to_thread). - """ - try: - # Try to get the running loop (works in main event loop thread) - loop = asyncio.get_running_loop() - loop.create_task(coro) - except RuntimeError: - # No running loop in this thread (we're in a worker thread) - # First try contextvars, then fallback to global reference - main_loop = _main_event_loop_var.get() - if main_loop is None: - with _GLOBAL_MAIN_LOOP_LOCK: - main_loop = _GLOBAL_MAIN_LOOP - if main_loop is not None and main_loop.is_running(): - asyncio.run_coroutine_threadsafe(coro, main_loop) - else: - # This should not happen in normal operation - log error and skip - logging.error( - "No event loop available for async task scheduling, task skipped. " - "Ensure set_main_event_loop() is called before parallel agent creation." - ) - from camel.agents import ChatAgent from camel.agents._types import ToolCallRequest from camel.agents.chat_agent import (AsyncStreamingChatAgentResponse, @@ -163,6 +112,121 @@ def __init__( process_task_id: str = "" + def _send_streaming_chunk(self, content: str, is_final: bool = False) -> None: + """Send a streaming output chunk to the frontend. + + Args: + content: The content chunk to send + is_final: Whether this is the final chunk + """ + task_lock = get_task_lock(self.api_task_id) + _schedule_async_task( + task_lock.put_queue( + ActionStreamingAgentOutputData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "content": content, + "is_final": is_final, + }, + ) + ) + ) + + def _send_agent_deactivate(self, message: str, tokens: int) -> None: + """Send agent deactivation event to the frontend. + + Args: + message: The accumulated message content + tokens: The total token count used + """ + task_lock = get_task_lock(self.api_task_id) + asyncio.create_task( + task_lock.put_queue( + ActionDeactivateAgentData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "message": message, + "tokens": tokens, + }, + ) + ) + ) + + @staticmethod + def _extract_tokens(response) -> int: + """Extract total token count from a response chunk. + + Args: + response: The response chunk (ChatAgentResponse or similar) + + Returns: + Total token count or 0 if not available + """ + if response is None: + return 0 + usage_info = response.info.get("usage") or response.info.get("token_usage") or {} + return usage_info.get("total_tokens", 0) + + def _stream_chunks(self, response_gen): + """Generator that wraps a streaming response and sends chunks to frontend. + + Args: + response_gen: The original streaming response generator + + Yields: + Each chunk from the original generator + + Returns: + Tuple of (accumulated_content, total_tokens) via StopIteration value + """ + accumulated_content = "" + last_chunk = None + + try: + for chunk in response_gen: + last_chunk = chunk + if chunk.msg and chunk.msg.content: + accumulated_content += chunk.msg.content + # Stream output chunk to frontend (non-blocking) + self._send_streaming_chunk(chunk.msg.content, is_final=False) + yield chunk + finally: + total_tokens = self._extract_tokens(last_chunk) + # Send final streaming output marker and deactivate agent + self._send_streaming_chunk("", is_final=True) + self._send_agent_deactivate(accumulated_content, total_tokens) + + async def _astream_chunks(self, response_gen): + """Async generator that wraps a streaming response and sends chunks to frontend. + + Args: + response_gen: The original async streaming response generator + + Yields: + Each chunk from the original generator + """ + accumulated_content = "" + last_chunk = None + + try: + async for chunk in response_gen: + last_chunk = chunk + if chunk.msg and chunk.msg.content: + delta_content = chunk.msg.content + accumulated_content += delta_content + # Stream output chunk to frontend (non-blocking) + self._send_streaming_chunk(delta_content, is_final=False) + yield chunk + finally: + total_tokens = self._extract_tokens(last_chunk) + # Send final streaming output marker and deactivate agent + self._send_streaming_chunk("", is_final=True) + self._send_agent_deactivate(accumulated_content, total_tokens) + def step( self, input_message: BaseMessage | str, @@ -214,73 +278,8 @@ def step( if res is not None: if isinstance(res, StreamingChatAgentResponse): - - def _stream_with_deactivate(): - last_response: ChatAgentResponse | None = None - # With stream_accumulate=False, - # we need to accumulate delta content - accumulated_content = "" - try: - for chunk in res: - last_response = chunk - # Accumulate content from each chunk (delta mode) - if chunk.msg and chunk.msg.content: - delta_content = chunk.msg.content - accumulated_content += delta_content - # Stream output chunk to frontend (non-blocking) - _schedule_async_task( - task_lock.put_queue( - ActionStreamingAgentOutputData( - data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "agent_id": self.agent_id, - "content": delta_content, - "is_final": False, - }, - ) - ) - ) - yield chunk - finally: - total_tokens = 0 - if last_response: - usage_info = last_response.info.get( - "usage") or last_response.info.get( - "token_usage") or {} - if usage_info: - total_tokens = usage_info.get( - "total_tokens", 0) - # Send final streaming output marker - asyncio.create_task( - task_lock.put_queue( - ActionStreamingAgentOutputData( - data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "agent_id": self.agent_id, - "content": "", - "is_final": True, - }, - ) - ) - ) - asyncio.create_task( - task_lock.put_queue( - ActionDeactivateAgentData(data={ - "agent_name": - self.agent_name, - "process_task_id": - self.process_task_id, - "agent_id": - self.agent_id, - "message": - accumulated_content, - "tokens": - total_tokens, - }, ))) - - return StreamingChatAgentResponse(_stream_with_deactivate()) + # Use reusable stream wrapper to send chunks to frontend + return StreamingChatAgentResponse(self._stream_chunks(res)) message = res.msg.content if res.msg else "" usage_info = res.info.get("usage") or res.info.get( @@ -339,70 +338,8 @@ async def astep( try: res = await super().astep(input_message, response_format) if isinstance(res, AsyncStreamingChatAgentResponse): - # Wrap the async streaming response to send chunks to frontend - async def _async_stream_with_deactivate(): - accumulated_content = "" - last_chunk = None - try: - async for chunk in res: - last_chunk = chunk - if chunk.msg and chunk.msg.content: - delta_content = chunk.msg.content - accumulated_content += delta_content - # Stream output chunk to frontend (non-blocking) - _schedule_async_task( - task_lock.put_queue( - ActionStreamingAgentOutputData( - data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "agent_id": self.agent_id, - "content": delta_content, - "is_final": False, - }, - ) - ) - ) - yield chunk - finally: - total_tokens = 0 - if last_chunk: - usage_info = ( - last_chunk.info.get("usage") - or last_chunk.info.get("token_usage") - or {} - ) - if usage_info: - total_tokens = usage_info.get("total_tokens", 0) - # Send final streaming output marker - asyncio.create_task( - task_lock.put_queue( - ActionStreamingAgentOutputData( - data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "agent_id": self.agent_id, - "content": "", - "is_final": True, - }, - ) - ) - ) - asyncio.create_task( - task_lock.put_queue( - ActionDeactivateAgentData( - data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "agent_id": self.agent_id, - "message": accumulated_content, - "tokens": total_tokens, - }, - ) - ) - ) - - return AsyncStreamingChatAgentResponse(_async_stream_with_deactivate()) + # Use reusable async stream wrapper to send chunks to frontend + return AsyncStreamingChatAgentResponse(self._astream_chunks(res)) except ModelProcessingError as e: res = None error_info = e @@ -497,19 +434,7 @@ def _execute_tool(self, }, ))) # Stream tool activity to frontend as "thinking" text tool_display = func_name.replace("_", " ").title() - _schedule_async_task( - task_lock.put_queue( - ActionStreamingAgentOutputData( - data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "agent_id": self.agent_id, - "content": f"[Tool] {tool_display}...\n", - "is_final": False, - }, - ) - ) - ) + self._send_streaming_chunk(f"[Tool] {tool_display}...\n", is_final=False) # Set process_task context for all tool executions with set_process_task(self.process_task_id): raw_result = tool(**args) diff --git a/backend/app/service/chat_service.py b/backend/app/service/chat_service.py index ea2a7f025..8a74741ae 100644 --- a/backend/app/service/chat_service.py +++ b/backend/app/service/chat_service.py @@ -28,7 +28,7 @@ from inflection import titleize from pydash import chain -from app.agent.agent_model import agent_model, set_main_event_loop +from app.agent.agent_model import agent_model from app.agent.factory import ( browser_agent, developer_agent, @@ -60,6 +60,7 @@ from app.utils.toolkit.note_taking_toolkit import NoteTakingToolkit from app.utils.toolkit.terminal_toolkit import TerminalToolkit from app.utils.workforce import Workforce +from app.utils.event_loop_utils import set_main_event_loop logger = logging.getLogger("chat_service") diff --git a/backend/app/utils/event_loop_utils.py b/backend/app/utils/event_loop_utils.py new file mode 100644 index 000000000..7b293edc9 --- /dev/null +++ b/backend/app/utils/event_loop_utils.py @@ -0,0 +1,69 @@ +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= + +import asyncio +import contextvars +import logging +from threading import Lock + +# Thread-safe reference to main event loop using contextvars +# This ensures each request has its own event loop reference, avoiding race conditions +_main_event_loop_var: contextvars.ContextVar[asyncio.AbstractEventLoop | None] = contextvars.ContextVar( + "_main_event_loop", default=None +) + +# Global fallback for main event loop reference +# Used when contextvars don't propagate to worker threads (e.g., asyncio.to_thread) +_GLOBAL_MAIN_LOOP: asyncio.AbstractEventLoop | None = None +_GLOBAL_MAIN_LOOP_LOCK = Lock() + + +def set_main_event_loop(loop: asyncio.AbstractEventLoop | None): + """Set the main event loop reference for thread-safe task scheduling. + + This should be called from the main async context before spawning threads + that need to schedule async tasks. Uses both contextvars (for request isolation) + and a global fallback (for thread pool workers where contextvars may not propagate). + """ + global _GLOBAL_MAIN_LOOP + _main_event_loop_var.set(loop) + with _GLOBAL_MAIN_LOOP_LOCK: + _GLOBAL_MAIN_LOOP = loop + + +def _schedule_async_task(coro): + """Schedule an async coroutine as a task, thread-safe. + + This function handles scheduling from both the main event loop thread + and from worker threads (e.g., when using asyncio.to_thread). + """ + try: + # Try to get the running loop (works in main event loop thread) + loop = asyncio.get_running_loop() + loop.create_task(coro) + except RuntimeError: + # No running loop in this thread (we're in a worker thread) + # First try contextvars, then fallback to global reference + main_loop = _main_event_loop_var.get() + if main_loop is None: + with _GLOBAL_MAIN_LOOP_LOCK: + main_loop = _GLOBAL_MAIN_LOOP + if main_loop is not None and main_loop.is_running(): + asyncio.run_coroutine_threadsafe(coro, main_loop) + else: + # This should not happen in normal operation - log error and skip + logging.error( + "No event loop available for async task scheduling, task skipped. " + "Ensure set_main_event_loop() is called before parallel agent creation." + ) diff --git a/backend/tests/app/agent/test_agent_model.py b/backend/tests/app/agent/test_agent_model.py index 5b357c88f..606d2f2c0 100644 --- a/backend/tests/app/agent/test_agent_model.py +++ b/backend/tests/app/agent/test_agent_model.py @@ -86,7 +86,6 @@ async def test_full_agent_workflow(self, sample_chat_data): # Create agent agent_model_mod = sys.modules['app.agent.agent_model'] with patch.object(agent_model_mod, 'ModelFactory') as mock_model_factory, \ - patch.object(agent_model_mod, '_schedule_async_task'), \ patch.object(agent_model_mod, 'ListenChatAgent') as mock_listen_agent, \ patch.object(agent_model_mod, 'get_task_lock', return_value=mock_task_lock): mock_model = MagicMock() From 9949d359f4cac1d451dd3cec766455fe32c3c57c Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Tue, 3 Feb 2026 02:55:01 +0100 Subject: [PATCH 18/23] refactor: remove streaming_agent_output feature, keep event_loop_utils and isolated functions - Remove streaming_agent_output SSE step and ActionStreamingAgentOutputData - Remove _send_streaming_chunk method and [Tool] display calls - Revert frontend files to upstream/main (no streaming UI changes) - Keep refactored utilities: event_loop_utils, _schedule_async_task, _send_agent_deactivate, _extract_tokens, _stream_chunks --- backend/app/agent/listen_chat_agent.py | 36 +------------ backend/app/service/chat_service.py | 2 - backend/app/service/task.py | 10 ---- src/components/WorkFlow/node.tsx | 15 +----- src/store/chatStore.ts | 70 -------------------------- src/types/chatbox.d.ts | 2 - src/types/constants.ts | 1 - 7 files changed, 2 insertions(+), 134 deletions(-) diff --git a/backend/app/agent/listen_chat_agent.py b/backend/app/agent/listen_chat_agent.py index 03fa8fa8d..85c754d4d 100644 --- a/backend/app/agent/listen_chat_agent.py +++ b/backend/app/agent/listen_chat_agent.py @@ -22,8 +22,7 @@ from app.service.task import (Action, ActionActivateAgentData, ActionActivateToolkitData, ActionBudgetNotEnough, ActionDeactivateAgentData, - ActionDeactivateToolkitData, - ActionStreamingAgentOutputData, get_task_lock, + ActionDeactivateToolkitData, get_task_lock, set_process_task) from camel.agents import ChatAgent @@ -112,28 +111,6 @@ def __init__( process_task_id: str = "" - def _send_streaming_chunk(self, content: str, is_final: bool = False) -> None: - """Send a streaming output chunk to the frontend. - - Args: - content: The content chunk to send - is_final: Whether this is the final chunk - """ - task_lock = get_task_lock(self.api_task_id) - _schedule_async_task( - task_lock.put_queue( - ActionStreamingAgentOutputData( - data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "agent_id": self.agent_id, - "content": content, - "is_final": is_final, - }, - ) - ) - ) - def _send_agent_deactivate(self, message: str, tokens: int) -> None: """Send agent deactivation event to the frontend. @@ -191,13 +168,9 @@ def _stream_chunks(self, response_gen): last_chunk = chunk if chunk.msg and chunk.msg.content: accumulated_content += chunk.msg.content - # Stream output chunk to frontend (non-blocking) - self._send_streaming_chunk(chunk.msg.content, is_final=False) yield chunk finally: total_tokens = self._extract_tokens(last_chunk) - # Send final streaming output marker and deactivate agent - self._send_streaming_chunk("", is_final=True) self._send_agent_deactivate(accumulated_content, total_tokens) async def _astream_chunks(self, response_gen): @@ -218,13 +191,9 @@ async def _astream_chunks(self, response_gen): if chunk.msg and chunk.msg.content: delta_content = chunk.msg.content accumulated_content += delta_content - # Stream output chunk to frontend (non-blocking) - self._send_streaming_chunk(delta_content, is_final=False) yield chunk finally: total_tokens = self._extract_tokens(last_chunk) - # Send final streaming output marker and deactivate agent - self._send_streaming_chunk("", is_final=True) self._send_agent_deactivate(accumulated_content, total_tokens) def step( @@ -432,9 +401,6 @@ def _execute_tool(self, "message": json.dumps(args, ensure_ascii=False), }, ))) - # Stream tool activity to frontend as "thinking" text - tool_display = func_name.replace("_", " ").title() - self._send_streaming_chunk(f"[Tool] {tool_display}...\n", is_final=False) # Set process_task context for all tool executions with set_process_task(self.process_task_id): raw_result = tool(**args) diff --git a/backend/app/service/chat_service.py b/backend/app/service/chat_service.py index 8a74741ae..4c7667bd9 100644 --- a/backend/app/service/chat_service.py +++ b/backend/app/service/chat_service.py @@ -1573,8 +1573,6 @@ def on_stream_text(chunk): ) elif item.action == Action.decompose_text: yield sse_json("decompose_text", item.data) - elif item.action == Action.streaming_agent_output: - yield sse_json("streaming_agent_output", item.data) elif item.action == Action.decompose_progress: yield sse_json("to_sub_tasks", item.data) elif item.action == Action.new_agent: diff --git a/backend/app/service/task.py b/backend/app/service/task.py index 29ce99606..4b5513afe 100644 --- a/backend/app/service/task.py +++ b/backend/app/service/task.py @@ -45,8 +45,6 @@ class Action(str, Enum): # backend -> user (streaming decomposition) decompose_progress = "decompose_progress" decompose_text = "decompose_text" # backend -> user (raw streaming text) - # backend -> user (streaming agent output during task execution) - streaming_agent_output = "streaming_agent_output" start = "start" # user -> backend create_agent = "create_agent" # backend -> user activate_agent = "activate_agent" # backend -> user @@ -104,13 +102,6 @@ class ActionDecomposeTextData(BaseModel): data: dict -class ActionStreamingAgentOutputData(BaseModel): - action: Literal[Action.streaming_agent_output - ] = Action.streaming_agent_output - data: dict[Literal["agent_name", "process_task_id", "agent_id", "content", - "is_final"], str | bool] - - class ActionNewTaskStateData(BaseModel): action: Literal[Action.new_task_state] = Action.new_task_state data: dict[Literal["task_id", "content", "state", "result", @@ -289,7 +280,6 @@ class ActionSkipTaskData(BaseModel): | ActionSkipTaskData | ActionDecomposeTextData | ActionDecomposeProgressData - | ActionStreamingAgentOutputData ) diff --git a/src/components/WorkFlow/node.tsx b/src/components/WorkFlow/node.tsx index 67f4a4e51..e62d57957 100644 --- a/src/components/WorkFlow/node.tsx +++ b/src/components/WorkFlow/node.tsx @@ -739,20 +739,7 @@ export function Node({ id, data }: NodeProps) {
{task.content}
{task?.status === TaskStatus.RUNNING && ( -
- {/* Streaming agent output */} - {chatStore.tasks[chatStore.activeTaskId as string] - ?.streamingAgentOutput?.[task.id] && ( -
-
- { - chatStore.tasks[ - chatStore.activeTaskId as string - ].streamingAgentOutput[task.id] - } -
-
- )} +
{/* active toolkit */} {task.toolkits && task.toolkits.length > 0 && diff --git a/src/store/chatStore.ts b/src/store/chatStore.ts index f3787f988..bcd84458c 100644 --- a/src/store/chatStore.ts +++ b/src/store/chatStore.ts @@ -75,8 +75,6 @@ interface Task { isContextExceeded?: boolean; // Streaming decompose text - stored separately to avoid frequent re-renders streamingDecomposeText: string; - // Streaming agent output - real-time agent output during task execution - streamingAgentOutput: { [processTaskId: string]: string }; } export interface ChatStore { @@ -170,12 +168,6 @@ export interface ChatStore { setNextTaskId: (taskId: string | null) => void; setStreamingDecomposeText: (taskId: string, text: string) => void; clearStreamingDecomposeText: (taskId: string) => void; - updateStreamingAgentOutput: ( - taskId: string, - processTaskId: string, - content: string - ) => void; - clearStreamingAgentOutput: (taskId: string, processTaskId: string) => void; } export type VanillaChatStore = { @@ -279,7 +271,6 @@ const chatStore = (initial?: Partial) => isTakeControl: false, isTaskEdit: false, streamingDecomposeText: '', - streamingAgentOutput: {}, }, }, })); @@ -1012,28 +1003,6 @@ const chatStore = (initial?: Partial) => return; } - // Streaming agent output - real-time output during task execution - if (agentMessages.step === AgentStep.STREAMING_AGENT_OUTPUT) { - const { process_task_id, content, is_final } = agentMessages.data; - if (!process_task_id) return; - - const { updateStreamingAgentOutput, clearStreamingAgentOutput } = - getCurrentChatStore(); - - if (is_final) { - // Clear streaming output when final marker received - clearStreamingAgentOutput(currentTaskId, process_task_id); - } else if (content) { - // Append content to streaming output - updateStreamingAgentOutput( - currentTaskId, - process_task_id, - content - ); - } - return; - } - if (agentMessages.step === AgentStep.TO_SUB_TASKS) { // Clear streaming decompose text when task splitting is done clearStreamingDecomposeText(currentTaskId); @@ -3179,45 +3148,6 @@ const chatStore = (initial?: Partial) => }; }); }, - updateStreamingAgentOutput: (taskId, processTaskId, content) => { - set((state) => { - if (!state.tasks[taskId]) return state; - const currentOutput = - state.tasks[taskId].streamingAgentOutput[processTaskId] || ''; - return { - ...state, - tasks: { - ...state.tasks, - [taskId]: { - ...state.tasks[taskId], - streamingAgentOutput: { - ...state.tasks[taskId].streamingAgentOutput, - [processTaskId]: currentOutput + content, - }, - }, - }, - }; - }); - }, - clearStreamingAgentOutput: (taskId, processTaskId) => { - set((state) => { - if (!state.tasks[taskId]) return state; - const newStreamingAgentOutput = { - ...state.tasks[taskId].streamingAgentOutput, - }; - delete newStreamingAgentOutput[processTaskId]; - return { - ...state, - tasks: { - ...state.tasks, - [taskId]: { - ...state.tasks[taskId], - streamingAgentOutput: newStreamingAgentOutput, - }, - }, - }; - }); - }, })); const filterMessage = (message: AgentMessage) => { diff --git a/src/types/chatbox.d.ts b/src/types/chatbox.d.ts index e1b9ebb0f..f7b468519 100644 --- a/src/types/chatbox.d.ts +++ b/src/types/chatbox.d.ts @@ -141,8 +141,6 @@ declare global { current_length?: number; max_length?: number; text?: string; - // Streaming agent output - is_final?: boolean; }; status?: AgentMessageStatusType; } diff --git a/src/types/constants.ts b/src/types/constants.ts index 9d7ec4b84..913bd3433 100644 --- a/src/types/constants.ts +++ b/src/types/constants.ts @@ -42,7 +42,6 @@ export const AgentStep = { NOTICE_CARD: 'notice_card', FAILED: 'failed', AGENT_SUMMARY_END: 'agent_summary_end', - STREAMING_AGENT_OUTPUT: 'streaming_agent_output', } as const; export type AgentStepType = (typeof AgentStep)[keyof typeof AgentStep]; From 66c3df620ac97d7bc78136aafa7c9067b00d3a1e Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Tue, 3 Feb 2026 03:07:18 +0100 Subject: [PATCH 19/23] fix: add mock for _schedule_async_task in agent model test --- backend/tests/app/agent/test_agent_model.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/tests/app/agent/test_agent_model.py b/backend/tests/app/agent/test_agent_model.py index 606d2f2c0..5b357c88f 100644 --- a/backend/tests/app/agent/test_agent_model.py +++ b/backend/tests/app/agent/test_agent_model.py @@ -86,6 +86,7 @@ async def test_full_agent_workflow(self, sample_chat_data): # Create agent agent_model_mod = sys.modules['app.agent.agent_model'] with patch.object(agent_model_mod, 'ModelFactory') as mock_model_factory, \ + patch.object(agent_model_mod, '_schedule_async_task'), \ patch.object(agent_model_mod, 'ListenChatAgent') as mock_listen_agent, \ patch.object(agent_model_mod, 'get_task_lock', return_value=mock_task_lock): mock_model = MagicMock() From c5ddcb4f0337ddf8b739183381e84d65a107cc47 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Tue, 3 Feb 2026 03:11:58 +0100 Subject: [PATCH 20/23] style: fix lint issues with ruff --- backend/app/agent/listen_chat_agent.py | 398 ++++++++++++++--------- backend/app/utils/event_loop_utils.py | 6 +- backend/app/utils/single_agent_worker.py | 140 +++++--- 3 files changed, 340 insertions(+), 204 deletions(-) diff --git a/backend/app/agent/listen_chat_agent.py b/backend/app/agent/listen_chat_agent.py index 85c754d4d..2c476b424 100644 --- a/backend/app/agent/listen_chat_agent.py +++ b/backend/app/agent/listen_chat_agent.py @@ -19,16 +19,23 @@ from typing import Any, Callable, Dict, List, Tuple from app.utils.event_loop_utils import _schedule_async_task -from app.service.task import (Action, ActionActivateAgentData, - ActionActivateToolkitData, ActionBudgetNotEnough, - ActionDeactivateAgentData, - ActionDeactivateToolkitData, get_task_lock, - set_process_task) +from app.service.task import ( + Action, + ActionActivateAgentData, + ActionActivateToolkitData, + ActionBudgetNotEnough, + ActionDeactivateAgentData, + ActionDeactivateToolkitData, + get_task_lock, + set_process_task, +) from camel.agents import ChatAgent from camel.agents._types import ToolCallRequest -from camel.agents.chat_agent import (AsyncStreamingChatAgentResponse, - StreamingChatAgentResponse) +from camel.agents.chat_agent import ( + AsyncStreamingChatAgentResponse, + StreamingChatAgentResponse, +) from camel.memories import AgentMemory from camel.messages import BaseMessage from camel.models import BaseModelBackend, ModelManager, ModelProcessingError @@ -44,32 +51,34 @@ class ListenChatAgent(ChatAgent): - def __init__( self, api_task_id: str, agent_name: str, system_message: BaseMessage | str | None = None, - model: (BaseModelBackend - | ModelManager - | Tuple[str, str] - | str - | ModelType - | Tuple[ModelPlatformType, ModelType] - | List[BaseModelBackend] - | List[str] - | List[ModelType] - | List[Tuple[str, str]] - | List[Tuple[ModelPlatformType, ModelType]] - | None) = None, + model: ( + BaseModelBackend + | ModelManager + | Tuple[str, str] + | str + | ModelType + | Tuple[ModelPlatformType, ModelType] + | List[BaseModelBackend] + | List[str] + | List[ModelType] + | List[Tuple[str, str]] + | List[Tuple[ModelPlatformType, ModelType]] + | None + ) = None, memory: AgentMemory | None = None, message_window_size: int | None = None, token_limit: int | None = None, output_language: str | None = None, tools: List[FunctionTool | Callable[..., Any]] | None = None, toolkits_to_register_agent: List[RegisteredAgentToolkit] | None = None, - external_tools: (List[FunctionTool | Callable[..., Any] - | Dict[str, Any]] | None) = None, + external_tools: ( + List[FunctionTool | Callable[..., Any] | Dict[str, Any]] | None + ) = None, response_terminators: List[ResponseTerminator] | None = None, scheduling_strategy: str = "round_robin", max_iteration: int | None = None, @@ -113,7 +122,7 @@ def __init__( def _send_agent_deactivate(self, message: str, tokens: int) -> None: """Send agent deactivation event to the frontend. - + Args: message: The accumulated message content tokens: The total token count used @@ -136,33 +145,37 @@ def _send_agent_deactivate(self, message: str, tokens: int) -> None: @staticmethod def _extract_tokens(response) -> int: """Extract total token count from a response chunk. - + Args: response: The response chunk (ChatAgentResponse or similar) - + Returns: Total token count or 0 if not available """ if response is None: return 0 - usage_info = response.info.get("usage") or response.info.get("token_usage") or {} + usage_info = ( + response.info.get("usage") + or response.info.get("token_usage") + or {} + ) return usage_info.get("total_tokens", 0) def _stream_chunks(self, response_gen): """Generator that wraps a streaming response and sends chunks to frontend. - + Args: response_gen: The original streaming response generator - + Yields: Each chunk from the original generator - + Returns: Tuple of (accumulated_content, total_tokens) via StopIteration value """ accumulated_content = "" last_chunk = None - + try: for chunk in response_gen: last_chunk = chunk @@ -172,19 +185,19 @@ def _stream_chunks(self, response_gen): finally: total_tokens = self._extract_tokens(last_chunk) self._send_agent_deactivate(accumulated_content, total_tokens) - + async def _astream_chunks(self, response_gen): """Async generator that wraps a streaming response and sends chunks to frontend. - + Args: response_gen: The original async streaming response generator - + Yields: Each chunk from the original generator """ accumulated_content = "" last_chunk = None - + try: async for chunk in response_gen: last_chunk = chunk @@ -204,23 +217,31 @@ def step( task_lock = get_task_lock(self.api_task_id) asyncio.create_task( task_lock.put_queue( - ActionActivateAgentData(data={ - "agent_name": - self.agent_name, - "process_task_id": - self.process_task_id, - "agent_id": - self.agent_id, - "message": (input_message.content if isinstance( - input_message, BaseMessage) else input_message), - }, ))) + ActionActivateAgentData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "message": ( + input_message.content + if isinstance(input_message, BaseMessage) + else input_message + ), + }, + ) + ) + ) error_info = None message = None res = None - msg = (input_message.content - if isinstance(input_message, BaseMessage) else input_message) + msg = ( + input_message.content + if isinstance(input_message, BaseMessage) + else input_message + ) logger.info( - f"Agent {self.agent_name} starting step with message: {msg}") + f"Agent {self.agent_name} starting step with message: {msg}" + ) try: res = super().step(input_message, response_format) except ModelProcessingError as e: @@ -230,18 +251,21 @@ def step( message = "Budget has been exceeded" logger.warning(f"Agent {self.agent_name} budget exceeded") asyncio.create_task( - task_lock.put_queue(ActionBudgetNotEnough())) + task_lock.put_queue(ActionBudgetNotEnough()) + ) else: message = str(e) logger.error( - f"Agent {self.agent_name} model processing error: {e}") + f"Agent {self.agent_name} model processing error: {e}" + ) total_tokens = 0 except Exception as e: res = None error_info = e logger.error( f"Agent {self.agent_name} unexpected error in step: {e}", - exc_info=True) + exc_info=True, + ) message = f"Error processing message: {e!s}" total_tokens = 0 @@ -251,24 +275,32 @@ def step( return StreamingChatAgentResponse(self._stream_chunks(res)) message = res.msg.content if res.msg else "" - usage_info = res.info.get("usage") or res.info.get( - "token_usage") or {} - total_tokens = usage_info.get("total_tokens", - 0) if usage_info else 0 - logger.info(f"Agent {self.agent_name} completed step, " - f"tokens used: {total_tokens}") + usage_info = ( + res.info.get("usage") or res.info.get("token_usage") or {} + ) + total_tokens = ( + usage_info.get("total_tokens", 0) if usage_info else 0 + ) + logger.info( + f"Agent {self.agent_name} completed step, " + f"tokens used: {total_tokens}" + ) assert message is not None asyncio.create_task( task_lock.put_queue( - ActionDeactivateAgentData(data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "agent_id": self.agent_id, - "message": message, - "tokens": total_tokens, - }, ))) + ActionDeactivateAgentData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "message": message, + "tokens": total_tokens, + }, + ) + ) + ) if error_info is not None: raise error_info @@ -285,30 +317,37 @@ async def astep( ActionActivateAgentData( action=Action.activate_agent, data={ - "agent_name": - self.agent_name, - "process_task_id": - self.process_task_id, - "agent_id": - self.agent_id, - "message": (input_message.content if isinstance( - input_message, BaseMessage) else input_message), + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "message": ( + input_message.content + if isinstance(input_message, BaseMessage) + else input_message + ), }, - )) + ) + ) error_info = None message = None res = None - msg = (input_message.content - if isinstance(input_message, BaseMessage) else input_message) - logger.debug(f"Agent {self.agent_name} starting async step " - f"with message: {msg}") + msg = ( + input_message.content + if isinstance(input_message, BaseMessage) + else input_message + ) + logger.debug( + f"Agent {self.agent_name} starting async step with message: {msg}" + ) try: res = await super().astep(input_message, response_format) if isinstance(res, AsyncStreamingChatAgentResponse): # Use reusable async stream wrapper to send chunks to frontend - return AsyncStreamingChatAgentResponse(self._astream_chunks(res)) + return AsyncStreamingChatAgentResponse( + self._astream_chunks(res) + ) except ModelProcessingError as e: res = None error_info = e @@ -316,46 +355,62 @@ async def astep( message = "Budget has been exceeded" logger.warning(f"Agent {self.agent_name} budget exceeded") asyncio.create_task( - task_lock.put_queue(ActionBudgetNotEnough())) + task_lock.put_queue(ActionBudgetNotEnough()) + ) else: message = str(e) logger.error( - f"Agent {self.agent_name} model processing error: {e}") + f"Agent {self.agent_name} model processing error: {e}" + ) total_tokens = 0 except Exception as e: res = None error_info = e logger.error( f"Agent {self.agent_name} unexpected error in async step: {e}", - exc_info=True) + exc_info=True, + ) message = f"Error processing message: {e!s}" total_tokens = 0 # For non-streaming responses, handle deactivation here - if res is not None and not isinstance(res, AsyncStreamingChatAgentResponse): + if res is not None and not isinstance( + res, AsyncStreamingChatAgentResponse + ): message = res.msg.content if res.msg else "" - usage_info = res.info.get("usage") or res.info.get("token_usage") or {} - total_tokens = usage_info.get("total_tokens", 0) if usage_info else 0 - logger.info(f"Agent {self.agent_name} completed step, " - f"tokens used: {total_tokens}") + usage_info = ( + res.info.get("usage") or res.info.get("token_usage") or {} + ) + total_tokens = ( + usage_info.get("total_tokens", 0) if usage_info else 0 + ) + logger.info( + f"Agent {self.agent_name} completed step, " + f"tokens used: {total_tokens}" + ) asyncio.create_task( task_lock.put_queue( - ActionDeactivateAgentData(data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "agent_id": self.agent_id, - "message": message, - "tokens": total_tokens, - }, ))) + ActionDeactivateAgentData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "message": message, + "tokens": total_tokens, + }, + ) + ) + ) if error_info is not None: raise error_info assert res is not None return res - def _execute_tool(self, - tool_call_request: ToolCallRequest) -> ToolCallingRecord: + def _execute_tool( + self, tool_call_request: ToolCallRequest + ) -> ToolCallingRecord: func_name = tool_call_request.tool_name tool: FunctionTool = self._internal_tools[func_name] # Route async functions to async execution @@ -378,29 +433,35 @@ def _execute_tool(self, try: task_lock = get_task_lock(self.api_task_id) - toolkit_name = getattr(tool, "_toolkit_name") if hasattr( - tool, "_toolkit_name") else "mcp_toolkit" - logger.debug(f"Agent {self.agent_name} executing tool: " - f"{func_name} from toolkit: {toolkit_name} " - f"with args: {json.dumps(args, ensure_ascii=False)}") + toolkit_name = ( + getattr(tool, "_toolkit_name") + if hasattr(tool, "_toolkit_name") + else "mcp_toolkit" + ) + logger.debug( + f"Agent {self.agent_name} executing tool: " + f"{func_name} from toolkit: {toolkit_name} " + f"with args: {json.dumps(args, ensure_ascii=False)}" + ) # Only send activate event if tool is # NOT wrapped by @listen_toolkit if not has_listen_decorator: _schedule_async_task( task_lock.put_queue( - ActionActivateToolkitData(data={ - "agent_name": - self.agent_name, - "process_task_id": - self.process_task_id, - "toolkit_name": - toolkit_name, - "method_name": - func_name, - "message": - json.dumps(args, ensure_ascii=False), - }, ))) + ActionActivateToolkitData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "toolkit_name": toolkit_name, + "method_name": func_name, + "message": json.dumps( + args, ensure_ascii=False + ), + }, + ) + ) + ) # Set process_task context for all tool executions with set_process_task(self.process_task_id): raw_result = tool(**args) @@ -409,7 +470,8 @@ def _execute_tool(self, self._secure_result_store[tool_call_id] = raw_result result = ( "[The tool has been executed successfully, but the output" - " from the tool is masked. You can move forward]") + " from the tool is masked. You can move forward]" + ) mask_flag = True else: result = raw_result @@ -421,9 +483,10 @@ def _execute_tool(self, result_str = repr(result) MAX_RESULT_LENGTH = 500 if len(result_str) > MAX_RESULT_LENGTH: - result_msg = (result_str[:MAX_RESULT_LENGTH] + - (f"... (truncated, total length: " - f"{len(result_str)} chars)")) + result_msg = result_str[:MAX_RESULT_LENGTH] + ( + f"... (truncated, total length: " + f"{len(result_str)} chars)" + ) else: result_msg = result_str @@ -431,20 +494,25 @@ def _execute_tool(self, if not has_listen_decorator: _schedule_async_task( task_lock.put_queue( - ActionDeactivateToolkitData(data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "toolkit_name": toolkit_name, - "method_name": func_name, - "message": result_msg, - }, ))) + ActionDeactivateToolkitData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "toolkit_name": toolkit_name, + "method_name": func_name, + "message": result_msg, + }, + ) + ) + ) except Exception as e: # Capture the error message to prevent framework crash error_msg = f"Error executing tool '{func_name}': {e!s}" result = f"Tool execution failed: {error_msg}" mask_flag = False - logger.error(f"Tool execution failed for {func_name}: {e}", - exc_info=True) + logger.error( + f"Tool execution failed for {func_name}: {e}", exc_info=True + ) return self._record_tool_calling( func_name, @@ -456,7 +524,8 @@ def _execute_tool(self, ) async def _aexecute_tool( - self, tool_call_request: ToolCallRequest) -> ToolCallingRecord: + self, tool_call_request: ToolCallRequest + ) -> ToolCallingRecord: func_name = tool_call_request.tool_name tool: FunctionTool = self._internal_tools[func_name] @@ -473,20 +542,26 @@ async def _aexecute_tool( toolkit_name = tool._toolkit_name # Method 2: For MCP tools, check if func has __self__ (the toolkit instance) - if not toolkit_name and hasattr(tool, "func") and hasattr( - tool.func, "__self__"): + if ( + not toolkit_name + and hasattr(tool, "func") + and hasattr(tool.func, "__self__") + ): toolkit_instance = tool.func.__self__ if hasattr(toolkit_instance, "toolkit_name") and callable( - toolkit_instance.toolkit_name): + toolkit_instance.toolkit_name + ): toolkit_name = toolkit_instance.toolkit_name() # Method 3: Check if tool.func is a bound method with toolkit if not toolkit_name and hasattr(tool, "func"): - if hasattr(tool.func, "func") and hasattr(tool.func.func, - "__self__"): + if hasattr(tool.func, "func") and hasattr( + tool.func.func, "__self__" + ): toolkit_instance = tool.func.func.__self__ if hasattr(toolkit_instance, "toolkit_name") and callable( - toolkit_instance.toolkit_name): + toolkit_instance.toolkit_name + ): toolkit_name = toolkit_instance.toolkit_name() # Default fallback @@ -504,18 +579,16 @@ async def _aexecute_tool( # Only send activate event if tool is NOT wrapped by @listen_toolkit if not has_listen_decorator: await task_lock.put_queue( - ActionActivateToolkitData(data={ - "agent_name": - self.agent_name, - "process_task_id": - self.process_task_id, - "toolkit_name": - toolkit_name, - "method_name": - func_name, - "message": - json.dumps(args, ensure_ascii=False), - }, )) + ActionActivateToolkitData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "toolkit_name": toolkit_name, + "method_name": func_name, + "message": json.dumps(args, ensure_ascii=False), + }, + ) + ) try: # Set process_task context for all tool executions with set_process_task(self.process_task_id): @@ -546,7 +619,8 @@ async def _aexecute_tool( result = await tool.async_call(**args) elif hasattr(tool, "func") and asyncio.iscoroutinefunction( - tool.func): + tool.func + ): # Case: tool wraps a direct async function result = await tool.func(**args) @@ -566,8 +640,10 @@ async def _aexecute_tool( # Capture the error message to prevent framework crash error_msg = f"Error executing async tool '{func_name}': {e!s}" result = {"error": error_msg} - logger.error(f"Async tool execution failed for {func_name}: {e}", - exc_info=True) + logger.error( + f"Async tool execution failed for {func_name}: {e}", + exc_info=True, + ) # Prepare result message with truncation if isinstance(result, str): @@ -576,20 +652,26 @@ async def _aexecute_tool( result_str = repr(result) MAX_RESULT_LENGTH = 500 if len(result_str) > MAX_RESULT_LENGTH: - result_msg = result_str[:MAX_RESULT_LENGTH] + f"... (truncated, total length: {len(result_str)} chars)" + result_msg = ( + result_str[:MAX_RESULT_LENGTH] + + f"... (truncated, total length: {len(result_str)} chars)" + ) else: result_msg = result_str # Only send deactivate event if tool is NOT wrapped by @listen_toolkit if not has_listen_decorator: await task_lock.put_queue( - ActionDeactivateToolkitData(data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "toolkit_name": toolkit_name, - "method_name": func_name, - "message": result_msg, - }, )) + ActionDeactivateToolkitData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "toolkit_name": toolkit_name, + "method_name": func_name, + "message": result_msg, + }, + ) + ) return self._record_tool_calling( func_name, args, @@ -612,8 +694,9 @@ def clone(self, with_memory: bool = False) -> ChatAgent: model=self.model_backend.models, # Pass the existing model_backend memory=None, # clone memory later message_window_size=getattr(self.memory, "window_size", None), - token_limit=getattr(self.memory.get_context_creator(), - "token_limit", None), + token_limit=getattr( + self.memory.get_context_creator(), "token_limit", None + ), output_language=self._output_language, tools=cloned_tools, toolkits_to_register_agent=toolkits_to_register, @@ -621,8 +704,7 @@ def clone(self, with_memory: bool = False) -> ChatAgent: schema for schema in self._external_tool_schemas.values() ], response_terminators=self.response_terminators, - scheduling_strategy=self.model_backend.scheduling_strategy. - __name__, + scheduling_strategy=self.model_backend.scheduling_strategy.__name__, max_iteration=self.max_iteration, stop_event=self.stop_event, tool_execution_timeout=self.tool_execution_timeout, diff --git a/backend/app/utils/event_loop_utils.py b/backend/app/utils/event_loop_utils.py index 7b293edc9..13207054e 100644 --- a/backend/app/utils/event_loop_utils.py +++ b/backend/app/utils/event_loop_utils.py @@ -19,9 +19,9 @@ # Thread-safe reference to main event loop using contextvars # This ensures each request has its own event loop reference, avoiding race conditions -_main_event_loop_var: contextvars.ContextVar[asyncio.AbstractEventLoop | None] = contextvars.ContextVar( - "_main_event_loop", default=None -) +_main_event_loop_var: contextvars.ContextVar[ + asyncio.AbstractEventLoop | None +] = contextvars.ContextVar("_main_event_loop", default=None) # Global fallback for main event loop reference # Used when contextvars don't propagate to worker threads (e.g., asyncio.to_thread) diff --git a/backend/app/utils/single_agent_worker.py b/backend/app/utils/single_agent_worker.py index 0fc4370e5..2e82a4a42 100644 --- a/backend/app/utils/single_agent_worker.py +++ b/backend/app/utils/single_agent_worker.py @@ -14,7 +14,9 @@ import datetime from camel.agents.chat_agent import AsyncStreamingChatAgentResponse -from camel.societies.workforce.single_agent_worker import SingleAgentWorker as BaseSingleAgentWorker +from camel.societies.workforce.single_agent_worker import ( + SingleAgentWorker as BaseSingleAgentWorker, +) from camel.tasks.task import Task, TaskState, is_task_result_insufficient import logging @@ -40,13 +42,16 @@ def __init__( context_utility: ContextUtility | None = None, enable_workflow_memory: bool = False, ) -> None: - logger.info("Initializing SingleAgentWorker", extra={ - "description": description, - "worker_agent_name": worker.agent_name, - "use_agent_pool": use_agent_pool, - "pool_max_size": pool_max_size, - "enable_workflow_memory": enable_workflow_memory - }) + logger.info( + "Initializing SingleAgentWorker", + extra={ + "description": description, + "worker_agent_name": worker.agent_name, + "use_agent_pool": use_agent_pool, + "pool_max_size": pool_max_size, + "enable_workflow_memory": enable_workflow_memory, + }, + ) super().__init__( description=description, worker=worker, @@ -60,7 +65,9 @@ def __init__( ) self.worker = worker # change type hint - async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState: + async def _process_task( + self, task: Task, dependencies: list[Task] + ) -> TaskState: r"""Processes a task with its dependencies using an efficient agent management system. @@ -82,11 +89,14 @@ async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState worker_agent = await self._get_worker_agent() worker_agent.process_task_id = task.id # type: ignore rewrite line - logger.info("Starting task processing", extra={ - "task_id": task.id, - "worker_agent_id": worker_agent.agent_id, - "dependencies_count": len(dependencies) - }) + logger.info( + "Starting task processing", + extra={ + "task_id": task.id, + "worker_agent_id": worker_agent.agent_id, + "dependencies_count": len(dependencies), + }, + ) response_content = "" final_response = None @@ -127,25 +137,35 @@ async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState last_chunk = chunk if chunk.msg and chunk.msg.content: accumulated_content += chunk.msg.content - logger.info(f"Streaming complete: {chunk_count} chunks, content_length={len(accumulated_content)}") + logger.info( + f"Streaming complete: {chunk_count} chunks, content_length={len(accumulated_content)}" + ) response_content = accumulated_content # Store usage info from last chunk for later use - response._last_chunk_info = last_chunk.info if last_chunk else {} + response._last_chunk_info = ( + last_chunk.info if last_chunk else {} + ) else: # Regular ChatAgentResponse - response_content = response.msg.content if response.msg else "" - - task_result = self.structured_handler.parse_structured_response( - response_text=response_content, - schema=TaskResult, - fallback_values={ - "content": "Task processing failed", - "failed": True, - }, + response_content = ( + response.msg.content if response.msg else "" + ) + + task_result = ( + self.structured_handler.parse_structured_response( + response_text=response_content, + schema=TaskResult, + fallback_values={ + "content": "Task processing failed", + "failed": True, + }, + ) ) else: # Use native structured output if supported - response = await worker_agent.astep(prompt, response_format=TaskResult) + response = await worker_agent.astep( + prompt, response_format=TaskResult + ) # Handle streaming response for native output (shouldn't happen now but keep for safety) if isinstance(response, AsyncStreamingChatAgentResponse): @@ -162,7 +182,9 @@ async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState task_result = chunk.msg.parsed response_content = accumulated_content # Store usage info from last chunk for later use - response._last_chunk_info = last_chunk.info if last_chunk else {} + response._last_chunk_info = ( + last_chunk.info if last_chunk else {} + ) # If no parsed result found in streaming, create fallback if task_result is None: task_result = TaskResult( @@ -172,16 +194,24 @@ async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState else: # Regular ChatAgentResponse task_result = response.msg.parsed - response_content = response.msg.content if response.msg else "" + response_content = ( + response.msg.content if response.msg else "" + ) # Get token usage from the response if isinstance(response, AsyncStreamingChatAgentResponse): # For streaming responses, get info from last chunk captured during iteration - chunk_info = getattr(response, '_last_chunk_info', {}) - usage_info = chunk_info.get("usage") or chunk_info.get("token_usage") + chunk_info = getattr(response, "_last_chunk_info", {}) + usage_info = chunk_info.get("usage") or chunk_info.get( + "token_usage" + ) else: - usage_info = response.info.get("usage") or response.info.get("token_usage") - total_tokens = usage_info.get("total_tokens", 0) if usage_info else 0 + usage_info = response.info.get("usage") or response.info.get( + "token_usage" + ) + total_tokens = ( + usage_info.get("total_tokens", 0) if usage_info else 0 + ) # collect conversation from working agent to # accumulator for workflow memory @@ -195,16 +225,24 @@ async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState work_records = worker_agent.memory.retrieve() # write these records to the accumulator's memory - memory_records = [record.memory_record for record in work_records] + memory_records = [ + record.memory_record for record in work_records + ] accumulator.memory.write_records(memory_records) - logger.debug(f"Transferred {len(memory_records)} memory records to accumulator") + logger.debug( + f"Transferred {len(memory_records)} memory records to accumulator" + ) except Exception as e: - logger.warning(f"Failed to transfer conversation to accumulator: {e}") + logger.warning( + f"Failed to transfer conversation to accumulator: {e}" + ) except Exception as e: - logger.error(f"Error processing task {task.id}: {type(e).__name__}: {e}") + logger.error( + f"Error processing task {task.id}: {type(e).__name__}: {e}" + ) # Store error information in task result task.result = f"{type(e).__name__}: {e!s}" return TaskState.FAILED @@ -218,10 +256,16 @@ async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState # Create worker attempt details with descriptive keys # Use final_response if available (streaming), otherwise use response - response_for_info = final_response if final_response is not None else response + response_for_info = ( + final_response if final_response is not None else response + ) worker_attempt_details = { - "agent_id": getattr(worker_agent, "agent_id", worker_agent.role_name), - "original_worker_id": getattr(self.worker, "agent_id", self.worker.role_name), + "agent_id": getattr( + worker_agent, "agent_id", worker_agent.role_name + ), + "original_worker_id": getattr( + self.worker, "agent_id", self.worker.role_name + ), "timestamp": str(datetime.datetime.now()), "description": f"Attempt by " f"{getattr(worker_agent, 'agent_id', worker_agent.role_name)} " @@ -229,7 +273,11 @@ async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState f"{getattr(self.worker, 'agent_id', self.worker.role_name)}) " f"to process task: {task.content}", "response_content": response_content[:50], - "tool_calls": str(response_for_info.info.get("tool_calls", []) if response_for_info and hasattr(response_for_info, 'info') else [])[:50], + "tool_calls": str( + response_for_info.info.get("tool_calls", []) + if response_for_info and hasattr(response_for_info, "info") + else [] + )[:50], "total_tokens": total_tokens, } @@ -248,8 +296,12 @@ async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState if not self.use_structured_output_handler: # Handle native structured output parsing if task_result is None: - logger.error("Error in worker step execution: Invalid task result") - print(f"{Fore.RED}Error in worker step execution: Invalid task result{Fore.RESET}") + logger.error( + "Error in worker step execution: Invalid task result" + ) + print( + f"{Fore.RED}Error in worker step execution: Invalid task result{Fore.RESET}" + ) task_result = TaskResult( content="Failed to generate valid task result.", failed=True, @@ -271,6 +323,8 @@ async def _process_task(self, task: Task, dependencies: list[Task]) -> TaskState return TaskState.FAILED if is_task_result_insufficient(task): - logger.warning(f"Task {task.id}: Content validation failed - task marked as failed") + logger.warning( + f"Task {task.id}: Content validation failed - task marked as failed" + ) return TaskState.FAILED return TaskState.DONE From 257597f78664fef9dce0b8829e58eb93041bfdb3 Mon Sep 17 00:00:00 2001 From: mkdev11 Date: Tue, 3 Feb 2026 03:25:39 +0100 Subject: [PATCH 21/23] style: fix import order with isort --- backend/app/service/chat_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/app/service/chat_service.py b/backend/app/service/chat_service.py index 4c7667bd9..399ae8ca2 100644 --- a/backend/app/service/chat_service.py +++ b/backend/app/service/chat_service.py @@ -53,6 +53,7 @@ delete_task_lock, set_current_task_id, ) +from app.utils.event_loop_utils import set_main_event_loop from app.utils.file_utils import get_working_directory from app.utils.server.sync_step import sync_step from app.utils.telemetry.workforce_metrics import WorkforceMetricsCallback @@ -60,7 +61,6 @@ from app.utils.toolkit.note_taking_toolkit import NoteTakingToolkit from app.utils.toolkit.terminal_toolkit import TerminalToolkit from app.utils.workforce import Workforce -from app.utils.event_loop_utils import set_main_event_loop logger = logging.getLogger("chat_service") From f7ab0f83c88a30349760fd530e3bb615ad4a5fae Mon Sep 17 00:00:00 2001 From: Wendong-Fan Date: Tue, 3 Feb 2026 21:57:27 +0800 Subject: [PATCH 22/23] update based on review comment --- backend/app/agent/listen_chat_agent.py | 30 +++++++++++++++----------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/backend/app/agent/listen_chat_agent.py b/backend/app/agent/listen_chat_agent.py index 2c476b424..a92d36912 100644 --- a/backend/app/agent/listen_chat_agent.py +++ b/backend/app/agent/listen_chat_agent.py @@ -128,7 +128,7 @@ def _send_agent_deactivate(self, message: str, tokens: int) -> None: tokens: The total token count used """ task_lock = get_task_lock(self.api_task_id) - asyncio.create_task( + _schedule_async_task( task_lock.put_queue( ActionDeactivateAgentData( data={ @@ -373,7 +373,7 @@ async def astep( message = f"Error processing message: {e!s}" total_tokens = 0 - # For non-streaming responses, handle deactivation here + # For non-streaming responses, extract message and tokens from response if res is not None and not isinstance( res, AsyncStreamingChatAgentResponse ): @@ -389,19 +389,23 @@ async def astep( f"tokens used: {total_tokens}" ) - asyncio.create_task( - task_lock.put_queue( - ActionDeactivateAgentData( - data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "agent_id": self.agent_id, - "message": message, - "tokens": total_tokens, - }, - ) + # Send deactivation for all non-streaming cases (success or error) + # Streaming responses handle deactivation in _astream_chunks + assert message is not None + + asyncio.create_task( + task_lock.put_queue( + ActionDeactivateAgentData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "message": message, + "tokens": total_tokens, + }, ) ) + ) if error_info is not None: raise error_info From 77f96d908cd52ce831219fa22f00a12d76cf6705 Mon Sep 17 00:00:00 2001 From: Wendong-Fan Date: Tue, 3 Feb 2026 22:15:05 +0800 Subject: [PATCH 23/23] using _schedule_async_task --- backend/app/agent/listen_chat_agent.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/app/agent/listen_chat_agent.py b/backend/app/agent/listen_chat_agent.py index f0fc1dbff..3dffb5950 100644 --- a/backend/app/agent/listen_chat_agent.py +++ b/backend/app/agent/listen_chat_agent.py @@ -215,7 +215,7 @@ def step( response_format: type[BaseModel] | None = None, ) -> ChatAgentResponse | StreamingChatAgentResponse: task_lock = get_task_lock(self.api_task_id) - asyncio.create_task( + _schedule_async_task( task_lock.put_queue( ActionActivateAgentData( data={ @@ -250,7 +250,7 @@ def step( if "Budget has been exceeded" in str(e): message = "Budget has been exceeded" logger.warning(f"Agent {self.agent_name} budget exceeded") - asyncio.create_task( + _schedule_async_task( task_lock.put_queue(ActionBudgetNotEnough()) ) else: @@ -288,7 +288,7 @@ def step( assert message is not None - asyncio.create_task( + _schedule_async_task( task_lock.put_queue( ActionDeactivateAgentData( data={