Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions backend/app/utils/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,52 @@ def step(

if res is not None:
if isinstance(res, StreamingChatAgentResponse):
# Check if stream_accumulate is enabled - if so, consume the stream
# and return a non-streaming response for compatibility with workforce
# task analysis which expects to access response.msg directly
if getattr(self, 'stream_accumulate', False) or getattr(self, '_stream_accumulate_explicit', False):
# Consume the streaming response to get the final result
last_response: ChatAgentResponse | None = None
accumulated_content = ""
for chunk in res:
last_response = chunk
if chunk.msg and chunk.msg.content:
# In accumulate mode, chunk.msg.content is already accumulated
accumulated_content = chunk.msg.content

total_tokens = 0
if last_response:
usage_info = (
last_response.info.get("usage")
or last_response.info.get("token_usage")
or {}
)
if usage_info:
total_tokens = usage_info.get("total_tokens", 0)

traceroot_logger.info(
f"Agent {self.agent_name} completed step (stream consumed), tokens used: {total_tokens}"
)

asyncio.create_task(
task_lock.put_queue(
ActionDeactivateAgentData(
data={
"agent_name": self.agent_name,
"process_task_id": self.process_task_id,
"agent_id": self.agent_id,
"message": accumulated_content,
"tokens": total_tokens,
},
)
)
)

if error_info is not None:
raise error_info
# Return the last response which has .msg accessible
return last_response if last_response else res

def _stream_with_deactivate():
last_response: ChatAgentResponse | None = None
# With stream_accumulate=False, we need to accumulate delta content
Expand Down Expand Up @@ -623,9 +669,6 @@ def agent_model(
if k not in ["model_platform", "model_type", "api_key", "url"]
}
)
if agent_name == Agents.task_agent:
model_config["stream"] = True

return ListenChatAgent(
options.project_id,
agent_name,
Expand Down
56 changes: 46 additions & 10 deletions backend/app/utils/workforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ def __init__(
enabled_strategies=["retry", "replan"],
),
)
self.task_agent.stream_accumulate = True
self.task_agent._stream_accumulate_explicit = True
logger.info(f"[WF-LIFECYCLE] ✅ Workforce.__init__ COMPLETED, id={id(self)}")

def eigent_make_sub_tasks(
Expand Down Expand Up @@ -113,16 +111,54 @@ def eigent_make_sub_tasks(
task.state = TaskState.OPEN
logger.info(f"[DECOMPOSE] Workforce reset complete, state: {self._state.name}")

model = getattr(self.task_agent, "model", None)
model_stream_attr = None
model_stream_prev = None
model_stream_had_key = False
model_stream_created = False
if model is not None:
for attr in ("model_config_dict", "model_config"):
if hasattr(model, attr):
config = getattr(model, attr)
if config is None:
config = {}
setattr(model, attr, config)
model_stream_created = True
if isinstance(config, dict):
model_stream_attr = attr
model_stream_had_key = "stream" in config
model_stream_prev = config.get("stream")
config["stream"] = True
break

stream_accumulate_prev = getattr(self.task_agent, "stream_accumulate", False)
stream_explicit_prev = getattr(self.task_agent, "_stream_accumulate_explicit", False)
self.task_agent.stream_accumulate = True
self.task_agent._stream_accumulate_explicit = True

logger.info(f"[DECOMPOSE] Calling handle_decompose_append_task")
subtasks = asyncio.run(
self.handle_decompose_append_task(
task,
reset=False,
coordinator_context=coordinator_context,
on_stream_batch=on_stream_batch,
on_stream_text=on_stream_text
try:
subtasks = asyncio.run(
self.handle_decompose_append_task(
task,
reset=False,
coordinator_context=coordinator_context,
on_stream_batch=on_stream_batch,
on_stream_text=on_stream_text,
)
)
)
finally:
self.task_agent.stream_accumulate = stream_accumulate_prev
self.task_agent._stream_accumulate_explicit = stream_explicit_prev
if model is not None and model_stream_attr:
config = getattr(model, model_stream_attr, None)
if isinstance(config, dict):
if model_stream_had_key:
config["stream"] = model_stream_prev
else:
config.pop("stream", None)
if model_stream_created and set(config.keys()) <= {"stream"}:
setattr(model, model_stream_attr, None)
logger.info("=" * 80)
logger.info(f"✅ [DECOMPOSE] Task decomposition COMPLETED", extra={
"api_task_id": self.api_task_id,
Expand Down