Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 156 additions & 45 deletions backend/app/service/chat_service.py

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions backend/app/service/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class Action(str, Enum):
update_task = "update_task" # user -> backend
task_state = "task_state" # backend -> user
new_task_state = "new_task_state" # backend -> user
decompose_progress = "decompose_progress" # backend -> user (streaming decomposition)
decompose_text = "decompose_text" # backend -> user (raw streaming text)
start = "start" # user -> backend
create_agent = "create_agent" # backend -> user
activate_agent = "activate_agent" # backend -> user
Expand Down Expand Up @@ -64,6 +66,17 @@ class ActionTaskStateData(BaseModel):
action: Literal[Action.task_state] = Action.task_state
data: dict[Literal["task_id", "content", "state", "result", "failure_count"], str | int]


class ActionDecomposeProgressData(BaseModel):
action: Literal[Action.decompose_progress] = Action.decompose_progress
data: dict


class ActionDecomposeTextData(BaseModel):
action: Literal[Action.decompose_text] = Action.decompose_text
data: dict


class ActionNewTaskStateData(BaseModel):
action: Literal[Action.new_task_state] = Action.new_task_state
data: dict[Literal["task_id", "content", "state", "result", "failure_count"], str | int]
Expand Down Expand Up @@ -227,6 +240,8 @@ class ActionSkipTaskData(BaseModel):
| ActionAddTaskData
| ActionRemoveTaskData
| ActionSkipTaskData
| ActionDecomposeTextData
| ActionDecomposeProgressData
)


Expand Down
29 changes: 19 additions & 10 deletions backend/app/utils/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def __init__(
prune_tool_calls_from_memory: bool = False,
enable_snapshot_clean: bool = False,
step_timeout: float | None = 900,
**kwargs: Any,
) -> None:
super().__init__(
system_message=system_message,
Expand All @@ -130,6 +131,7 @@ def __init__(
prune_tool_calls_from_memory=prune_tool_calls_from_memory,
enable_snapshot_clean=enable_snapshot_clean,
step_timeout=step_timeout,
**kwargs,
)
self.api_task_id = api_task_id
self.agent_name = agent_name
Expand Down Expand Up @@ -533,6 +535,21 @@ def agent_model(
)
)

# Build model config, defaulting to streaming for planner
extra_params = options.extra_params or {}
model_config: dict[str, Any] = {}
if options.is_cloud():
model_config["user"] = str(options.project_id)
model_config.update(
{
k: v
for k, v in extra_params.items()
if k not in ["model_platform", "model_type", "api_key", "url"]
}
)
if agent_name == Agents.task_agent:
model_config["stream"] = True

return ListenChatAgent(
options.project_id,
agent_name,
Expand All @@ -542,23 +559,15 @@ def agent_model(
model_type=options.model_type,
api_key=options.api_key,
url=options.api_url,
model_config_dict={
"user": str(options.project_id),
}
if options.is_cloud()
else None,
**{
k: v
for k, v in (options.extra_params or {}).items()
if k not in ["model_platform", "model_type", "api_key", "url"]
},
model_config_dict=model_config or None,
),
# output_language=options.language,
tools=tools,
agent_id=agent_id,
prune_tool_calls_from_memory=prune_tool_calls_from_memory,
toolkits_to_register_agent=toolkits_to_register_agent,
enable_snapshot_clean=enable_snapshot_clean,
stream_accumulate=False,
)


Expand Down
77 changes: 72 additions & 5 deletions backend/app/utils/workforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from camel.societies.workforce.utils import TaskAssignResult
from camel.societies.workforce.workforce_metrics import WorkforceMetrics
from camel.societies.workforce.events import WorkerCreatedEvent
from camel.societies.workforce.prompts import TASK_DECOMPOSE_PROMPT
from camel.tasks.task import Task, TaskState, validate_task_content
from app.component import code
from app.exception.exception import UserException
Expand Down Expand Up @@ -65,14 +66,22 @@ def __init__(
)
logger.info(f"[WF-LIFECYCLE] ✅ Workforce.__init__ COMPLETED, id={id(self)}")

def eigent_make_sub_tasks(self, task: Task, coordinator_context: str = ""):
def eigent_make_sub_tasks(
self,
task: Task,
coordinator_context: str = "",
on_stream_batch=None,
on_stream_text=None,
):
"""
Split process_task method to eigent_make_sub_tasks and eigent_start method.

Args:
task: The main task to decompose
coordinator_context: Optional context ONLY for coordinator agent during decomposition.
This context will NOT be passed to subtasks or worker agents.
on_stream_batch: Optional callback for streaming batches signature (List[Task], bool)
on_stream_text: Optional callback for raw streaming text chunks
"""
logger.info("=" * 80)
logger.info("🧩 [DECOMPOSE] eigent_make_sub_tasks CALLED", extra={
Expand Down Expand Up @@ -103,7 +112,15 @@ def eigent_make_sub_tasks(self, task: Task, coordinator_context: str = ""):
logger.info(f"[DECOMPOSE] Workforce reset complete, state: {self._state.name}")

logger.info(f"[DECOMPOSE] Calling handle_decompose_append_task")
subtasks = asyncio.run(self.handle_decompose_append_task(task, reset=False, coordinator_context=coordinator_context))
subtasks = asyncio.run(
self.handle_decompose_append_task(
task,
reset=False,
coordinator_context=coordinator_context,
on_stream_batch=on_stream_batch,
on_stream_text=on_stream_text
)
)
logger.info("=" * 80)
logger.info(f"✅ [DECOMPOSE] Task decomposition COMPLETED", extra={
"api_task_id": self.api_task_id,
Expand Down Expand Up @@ -142,8 +159,45 @@ async def eigent_start(self, subtasks: list[Task]):
self._state = WorkforceState.IDLE
logger.info(f"[WF-LIFECYCLE] Workforce state set to IDLE")

def _decompose_task(self, task: Task, stream_callback=None):
"""Decompose task with optional streaming text callback."""

decompose_prompt = str(
TASK_DECOMPOSE_PROMPT.format(
content=task.content,
child_nodes_info=self._get_child_nodes_info(),
additional_info=task.additional_info,
)
)
self.task_agent.reset()
result = task.decompose(
self.task_agent, decompose_prompt, stream_callback=stream_callback
)

if isinstance(result, Generator):
def streaming_with_dependencies():
all_subtasks = []
for new_tasks in result:
all_subtasks.extend(new_tasks)
if new_tasks:
self._update_dependencies_for_decomposition(
task, all_subtasks
)
yield new_tasks
return streaming_with_dependencies()
else:
subtasks = result
if subtasks:
self._update_dependencies_for_decomposition(task, subtasks)
return subtasks

async def handle_decompose_append_task(
self, task: Task, reset: bool = True, coordinator_context: str = ""
self,
task: Task,
reset: bool = True,
coordinator_context: str = "",
on_stream_batch=None,
on_stream_text=None,
) -> List[Task]:
"""
Override to support coordinator_context parameter.
Expand All @@ -153,6 +207,8 @@ async def handle_decompose_append_task(
task: The task to be processed
reset: Should trigger workforce reset (Workforce must not be running)
coordinator_context: Optional context ONLY for coordinator during decomposition
on_stream_batch: Optional callback for streaming batches signature (List[Task], bool)
on_stream_text: Optional callback for raw streaming text chunks

Returns:
List[Task]: The decomposed subtasks or the original task
Expand Down Expand Up @@ -186,18 +242,23 @@ async def handle_decompose_append_task(
task.content = task_with_context

logger.info(f"[DECOMPOSE] Calling _decompose_task with context")
subtasks_result = self._decompose_task(task)
subtasks_result = self._decompose_task(task, stream_callback=on_stream_text)

task.content = original_content
else:
logger.info(f"[DECOMPOSE] Calling _decompose_task without context")
subtasks_result = self._decompose_task(task)
subtasks_result = self._decompose_task(task, stream_callback=on_stream_text)

logger.info(f"[DECOMPOSE] _decompose_task returned, processing results")
if isinstance(subtasks_result, Generator):
subtasks = []
for new_tasks in subtasks_result:
subtasks.extend(new_tasks)
if on_stream_batch:
try:
on_stream_batch(new_tasks, False)
except Exception as e:
logger.warning(f"Streaming callback failed: {e}")
logger.info(f"[DECOMPOSE] Collected {len(subtasks)} subtasks from generator")
else:
subtasks = subtasks_result
Expand All @@ -218,6 +279,12 @@ async def handle_decompose_append_task(
subtasks = [fallback_task]
logger.info(f"[DECOMPOSE] Created fallback task: {fallback_task.id}")

if on_stream_batch:
try:
on_stream_batch(subtasks, True)
except Exception as e:
logger.warning(f"Final streaming callback failed: {e}")

return subtasks

async def _find_assignee(self, tasks: List[Task]) -> TaskAssignResult:
Expand Down
49 changes: 45 additions & 4 deletions src/components/ChatBox/ProjectSection.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,39 @@ export const ProjectSection = React.forwardRef<HTMLDivElement, ProjectSectionPro
onSkip,
isPauseResumeLoading
}, ref) => {
const chatState = chatStore.getState();
// Subscribe to store changes with throttling to prevent excessive re-renders
const [chatState, setChatState] = React.useState(() => chatStore.getState());

React.useEffect(() => {
let timeoutId: NodeJS.Timeout | null = null;
let latestState: any = null;

const unsubscribe = chatStore.subscribe((state) => {
latestState = state;

// Throttle updates to max once per 100ms
if (!timeoutId) {
timeoutId = setTimeout(() => {
if (latestState) {
setChatState(latestState);
}
timeoutId = null;
}, 100);
}
});

return () => {
unsubscribe();
if (timeoutId) {
clearTimeout(timeoutId);
// Apply final state on cleanup
if (latestState) {
setChatState(latestState);
}
}
};
}, [chatStore]);

const activeTaskId = chatState.activeTaskId;

if (!activeTaskId || !chatState.tasks[activeTaskId]) {
Expand All @@ -33,8 +65,17 @@ export const ProjectSection = React.forwardRef<HTMLDivElement, ProjectSectionPro
const task = chatState.tasks[activeTaskId];
const messages = task.messages || [];

// Group messages by query cycles and show in chronological order (oldest first)
const queryGroups = groupMessagesByQuery(messages);
// Create a stable key based on messages content to prevent excessive re-renders
const lastMessage = messages[messages.length - 1];
const messagesKey = React.useMemo(() => {
// Only re-compute when message count or last message changes
return `${messages.length}-${lastMessage?.id || ''}-${lastMessage?.content?.length || 0}`;
}, [messages.length, lastMessage?.id, lastMessage?.content?.length]);

// Memoize grouping to prevent re-creating objects on every render
const queryGroups = React.useMemo(() => {
return groupMessagesByQuery(messages);
}, [messagesKey]);

return (
<motion.div
Expand Down Expand Up @@ -152,7 +193,7 @@ function groupMessagesByQuery(messages: any[]) {
otherMessages: []
};
}
} else {
} else {
// Other messages (assistant responses, errors, etc.)
if (currentGroup) {
currentGroup.otherMessages.push(message);
Expand Down
Loading