Skip to content

Commit 7d9e141

Browse files
committed
Revert "feat: Stream mode task spliting (#767)"
This reverts commit 16ac0d8, reversing changes made to 4ce7926.
1 parent 16ac0d8 commit 7d9e141

8 files changed

Lines changed: 336 additions & 714 deletions

File tree

backend/app/service/chat_service.py

Lines changed: 45 additions & 156 deletions
Large diffs are not rendered by default.

backend/app/service/task.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ class Action(str, Enum):
2020
update_task = "update_task" # user -> backend
2121
task_state = "task_state" # backend -> user
2222
new_task_state = "new_task_state" # backend -> user
23-
decompose_progress = "decompose_progress" # backend -> user (streaming decomposition)
24-
decompose_text = "decompose_text" # backend -> user (raw streaming text)
2523
start = "start" # user -> backend
2624
create_agent = "create_agent" # backend -> user
2725
activate_agent = "activate_agent" # backend -> user
@@ -66,17 +64,6 @@ class ActionTaskStateData(BaseModel):
6664
action: Literal[Action.task_state] = Action.task_state
6765
data: dict[Literal["task_id", "content", "state", "result", "failure_count"], str | int]
6866

69-
70-
class ActionDecomposeProgressData(BaseModel):
71-
action: Literal[Action.decompose_progress] = Action.decompose_progress
72-
data: dict
73-
74-
75-
class ActionDecomposeTextData(BaseModel):
76-
action: Literal[Action.decompose_text] = Action.decompose_text
77-
data: dict
78-
79-
8067
class ActionNewTaskStateData(BaseModel):
8168
action: Literal[Action.new_task_state] = Action.new_task_state
8269
data: dict[Literal["task_id", "content", "state", "result", "failure_count"], str | int]
@@ -240,8 +227,6 @@ class ActionSkipTaskData(BaseModel):
240227
| ActionAddTaskData
241228
| ActionRemoveTaskData
242229
| ActionSkipTaskData
243-
| ActionDecomposeTextData
244-
| ActionDecomposeProgressData
245230
)
246231

247232

backend/app/utils/agent.py

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ def __init__(
108108
prune_tool_calls_from_memory: bool = False,
109109
enable_snapshot_clean: bool = False,
110110
step_timeout: float | None = 900,
111-
**kwargs: Any,
112111
) -> None:
113112
super().__init__(
114113
system_message=system_message,
@@ -131,7 +130,6 @@ def __init__(
131130
prune_tool_calls_from_memory=prune_tool_calls_from_memory,
132131
enable_snapshot_clean=enable_snapshot_clean,
133132
step_timeout=step_timeout,
134-
**kwargs,
135133
)
136134
self.api_task_id = api_task_id
137135
self.agent_name = agent_name
@@ -535,21 +533,6 @@ def agent_model(
535533
)
536534
)
537535

538-
# Build model config, defaulting to streaming for planner
539-
extra_params = options.extra_params or {}
540-
model_config: dict[str, Any] = {}
541-
if options.is_cloud():
542-
model_config["user"] = str(options.project_id)
543-
model_config.update(
544-
{
545-
k: v
546-
for k, v in extra_params.items()
547-
if k not in ["model_platform", "model_type", "api_key", "url"]
548-
}
549-
)
550-
if agent_name == Agents.task_agent:
551-
model_config["stream"] = True
552-
553536
return ListenChatAgent(
554537
options.project_id,
555538
agent_name,
@@ -559,15 +542,23 @@ def agent_model(
559542
model_type=options.model_type,
560543
api_key=options.api_key,
561544
url=options.api_url,
562-
model_config_dict=model_config or None,
545+
model_config_dict={
546+
"user": str(options.project_id),
547+
}
548+
if options.is_cloud()
549+
else None,
550+
**{
551+
k: v
552+
for k, v in (options.extra_params or {}).items()
553+
if k not in ["model_platform", "model_type", "api_key", "url"]
554+
},
563555
),
564556
# output_language=options.language,
565557
tools=tools,
566558
agent_id=agent_id,
567559
prune_tool_calls_from_memory=prune_tool_calls_from_memory,
568560
toolkits_to_register_agent=toolkits_to_register_agent,
569561
enable_snapshot_clean=enable_snapshot_clean,
570-
stream_accumulate=False,
571562
)
572563

573564

backend/app/utils/workforce.py

Lines changed: 5 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from camel.societies.workforce.utils import TaskAssignResult
1313
from camel.societies.workforce.workforce_metrics import WorkforceMetrics
1414
from camel.societies.workforce.events import WorkerCreatedEvent
15-
from camel.societies.workforce.prompts import TASK_DECOMPOSE_PROMPT
1615
from camel.tasks.task import Task, TaskState, validate_task_content
1716
from app.component import code
1817
from app.exception.exception import UserException
@@ -66,22 +65,14 @@ def __init__(
6665
)
6766
logger.info(f"[WF-LIFECYCLE] ✅ Workforce.__init__ COMPLETED, id={id(self)}")
6867

69-
def eigent_make_sub_tasks(
70-
self,
71-
task: Task,
72-
coordinator_context: str = "",
73-
on_stream_batch=None,
74-
on_stream_text=None,
75-
):
68+
def eigent_make_sub_tasks(self, task: Task, coordinator_context: str = ""):
7669
"""
7770
Split process_task method to eigent_make_sub_tasks and eigent_start method.
7871
7972
Args:
8073
task: The main task to decompose
8174
coordinator_context: Optional context ONLY for coordinator agent during decomposition.
8275
This context will NOT be passed to subtasks or worker agents.
83-
on_stream_batch: Optional callback for streaming batches signature (List[Task], bool)
84-
on_stream_text: Optional callback for raw streaming text chunks
8576
"""
8677
logger.info("=" * 80)
8778
logger.info("🧩 [DECOMPOSE] eigent_make_sub_tasks CALLED", extra={
@@ -112,15 +103,7 @@ def eigent_make_sub_tasks(
112103
logger.info(f"[DECOMPOSE] Workforce reset complete, state: {self._state.name}")
113104

114105
logger.info(f"[DECOMPOSE] Calling handle_decompose_append_task")
115-
subtasks = asyncio.run(
116-
self.handle_decompose_append_task(
117-
task,
118-
reset=False,
119-
coordinator_context=coordinator_context,
120-
on_stream_batch=on_stream_batch,
121-
on_stream_text=on_stream_text
122-
)
123-
)
106+
subtasks = asyncio.run(self.handle_decompose_append_task(task, reset=False, coordinator_context=coordinator_context))
124107
logger.info("=" * 80)
125108
logger.info(f"✅ [DECOMPOSE] Task decomposition COMPLETED", extra={
126109
"api_task_id": self.api_task_id,
@@ -159,45 +142,8 @@ async def eigent_start(self, subtasks: list[Task]):
159142
self._state = WorkforceState.IDLE
160143
logger.info(f"[WF-LIFECYCLE] Workforce state set to IDLE")
161144

162-
def _decompose_task(self, task: Task, stream_callback=None):
163-
"""Decompose task with optional streaming text callback."""
164-
165-
decompose_prompt = str(
166-
TASK_DECOMPOSE_PROMPT.format(
167-
content=task.content,
168-
child_nodes_info=self._get_child_nodes_info(),
169-
additional_info=task.additional_info,
170-
)
171-
)
172-
self.task_agent.reset()
173-
result = task.decompose(
174-
self.task_agent, decompose_prompt, stream_callback=stream_callback
175-
)
176-
177-
if isinstance(result, Generator):
178-
def streaming_with_dependencies():
179-
all_subtasks = []
180-
for new_tasks in result:
181-
all_subtasks.extend(new_tasks)
182-
if new_tasks:
183-
self._update_dependencies_for_decomposition(
184-
task, all_subtasks
185-
)
186-
yield new_tasks
187-
return streaming_with_dependencies()
188-
else:
189-
subtasks = result
190-
if subtasks:
191-
self._update_dependencies_for_decomposition(task, subtasks)
192-
return subtasks
193-
194145
async def handle_decompose_append_task(
195-
self,
196-
task: Task,
197-
reset: bool = True,
198-
coordinator_context: str = "",
199-
on_stream_batch=None,
200-
on_stream_text=None,
146+
self, task: Task, reset: bool = True, coordinator_context: str = ""
201147
) -> List[Task]:
202148
"""
203149
Override to support coordinator_context parameter.
@@ -207,8 +153,6 @@ async def handle_decompose_append_task(
207153
task: The task to be processed
208154
reset: Should trigger workforce reset (Workforce must not be running)
209155
coordinator_context: Optional context ONLY for coordinator during decomposition
210-
on_stream_batch: Optional callback for streaming batches signature (List[Task], bool)
211-
on_stream_text: Optional callback for raw streaming text chunks
212156
213157
Returns:
214158
List[Task]: The decomposed subtasks or the original task
@@ -242,23 +186,18 @@ async def handle_decompose_append_task(
242186
task.content = task_with_context
243187

244188
logger.info(f"[DECOMPOSE] Calling _decompose_task with context")
245-
subtasks_result = self._decompose_task(task, stream_callback=on_stream_text)
189+
subtasks_result = self._decompose_task(task)
246190

247191
task.content = original_content
248192
else:
249193
logger.info(f"[DECOMPOSE] Calling _decompose_task without context")
250-
subtasks_result = self._decompose_task(task, stream_callback=on_stream_text)
194+
subtasks_result = self._decompose_task(task)
251195

252196
logger.info(f"[DECOMPOSE] _decompose_task returned, processing results")
253197
if isinstance(subtasks_result, Generator):
254198
subtasks = []
255199
for new_tasks in subtasks_result:
256200
subtasks.extend(new_tasks)
257-
if on_stream_batch:
258-
try:
259-
on_stream_batch(new_tasks, False)
260-
except Exception as e:
261-
logger.warning(f"Streaming callback failed: {e}")
262201
logger.info(f"[DECOMPOSE] Collected {len(subtasks)} subtasks from generator")
263202
else:
264203
subtasks = subtasks_result
@@ -279,12 +218,6 @@ async def handle_decompose_append_task(
279218
subtasks = [fallback_task]
280219
logger.info(f"[DECOMPOSE] Created fallback task: {fallback_task.id}")
281220

282-
if on_stream_batch:
283-
try:
284-
on_stream_batch(subtasks, True)
285-
except Exception as e:
286-
logger.warning(f"Final streaming callback failed: {e}")
287-
288221
return subtasks
289222

290223
async def _find_assignee(self, tasks: List[Task]) -> TaskAssignResult:

src/components/ChatBox/ProjectSection.tsx

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -23,39 +23,7 @@ export const ProjectSection = React.forwardRef<HTMLDivElement, ProjectSectionPro
2323
onSkip,
2424
isPauseResumeLoading
2525
}, ref) => {
26-
// Subscribe to store changes with throttling to prevent excessive re-renders
27-
const [chatState, setChatState] = React.useState(() => chatStore.getState());
28-
29-
React.useEffect(() => {
30-
let timeoutId: NodeJS.Timeout | null = null;
31-
let latestState: any = null;
32-
33-
const unsubscribe = chatStore.subscribe((state) => {
34-
latestState = state;
35-
36-
// Throttle updates to max once per 100ms
37-
if (!timeoutId) {
38-
timeoutId = setTimeout(() => {
39-
if (latestState) {
40-
setChatState(latestState);
41-
}
42-
timeoutId = null;
43-
}, 100);
44-
}
45-
});
46-
47-
return () => {
48-
unsubscribe();
49-
if (timeoutId) {
50-
clearTimeout(timeoutId);
51-
// Apply final state on cleanup
52-
if (latestState) {
53-
setChatState(latestState);
54-
}
55-
}
56-
};
57-
}, [chatStore]);
58-
26+
const chatState = chatStore.getState();
5927
const activeTaskId = chatState.activeTaskId;
6028

6129
if (!activeTaskId || !chatState.tasks[activeTaskId]) {
@@ -65,17 +33,8 @@ export const ProjectSection = React.forwardRef<HTMLDivElement, ProjectSectionPro
6533
const task = chatState.tasks[activeTaskId];
6634
const messages = task.messages || [];
6735

68-
// Create a stable key based on messages content to prevent excessive re-renders
69-
const lastMessage = messages[messages.length - 1];
70-
const messagesKey = React.useMemo(() => {
71-
// Only re-compute when message count or last message changes
72-
return `${messages.length}-${lastMessage?.id || ''}-${lastMessage?.content?.length || 0}`;
73-
}, [messages.length, lastMessage?.id, lastMessage?.content?.length]);
74-
75-
// Memoize grouping to prevent re-creating objects on every render
76-
const queryGroups = React.useMemo(() => {
77-
return groupMessagesByQuery(messages);
78-
}, [messagesKey]);
36+
// Group messages by query cycles and show in chronological order (oldest first)
37+
const queryGroups = groupMessagesByQuery(messages);
7938

8039
return (
8140
<motion.div
@@ -193,7 +152,7 @@ function groupMessagesByQuery(messages: any[]) {
193152
otherMessages: []
194153
};
195154
}
196-
} else {
155+
} else {
197156
// Other messages (assistant responses, errors, etc.)
198157
if (currentGroup) {
199158
currentGroup.otherMessages.push(message);

0 commit comments

Comments
 (0)