Skip to content

Commit e5c9c23

Browse files
refactor(plan-and-task): migrate to workflow DSL with sentinel gate pattern
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
1 parent 3c45bd4 commit e5c9c23

5 files changed

Lines changed: 171 additions & 63 deletions

File tree

examples/e2e/plan_and_task/controller.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
)
1515
from examples.e2e.plan_and_task.state_models import RuntimeState, ReviewVerdict
1616
from examples.e2e.plan_and_task.state_machine import (
17-
VALID_TRANSITIONS,
1817
WorkflowStateMachine,
18+
_COMPILED_WORKFLOW,
1919
)
2020

2121
logger = get_logger(__name__)
@@ -539,7 +539,7 @@ def _append_task_event(
539539
)
540540

541541
def _allowed_transitions(self, state: RuntimeState) -> set[str]:
542-
return VALID_TRANSITIONS.get(state.phase, set())
542+
return {t.target_state_id for t in _COMPILED_WORKFLOW.transitions_by_state.get(state.phase, ())}
543543

544544
def _utcnow_isoformat(self) -> str:
545545
return datetime.datetime.now(datetime.UTC).isoformat()

examples/e2e/plan_and_task/main.py

Lines changed: 52 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
from ecs_agent.components import (
1616
ConversationComponent,
1717
LLMComponent,
18-
RenderedSystemPromptComponent,
1918
SubagentRegistryComponent,
2019
SubagentSessionTableComponent,
2120
ToolRegistryComponent,
2221
UserPromptConfigComponent,
22+
WorkflowRuntimeComponent,
2323
)
2424
from ecs_agent.components.definitions import ScriptHandler
2525
from ecs_agent.core import Runner, World
@@ -32,14 +32,15 @@
3232
from ecs_agent.providers import Model
3333
from ecs_agent.providers.config import ApiFormat
3434
from ecs_agent.providers.protocol import LLMModel
35+
from ecs_agent.systems import WorkflowStateSystem
3536
from ecs_agent.systems.error_handling import ErrorHandlingSystem
3637
from ecs_agent.tools import BuiltinToolsSkill
3738
from ecs_agent.skills.manager import SkillManager
3839
from ecs_agent.skills.discovery import discover_skills
3940
from ecs_agent.systems.memory import MemorySystem
4041
from ecs_agent.systems.reasoning import ReasoningSystem
4142
from ecs_agent.systems.subagent import SubagentSystem
42-
from ecs_agent.systems.system_prompt_render_system import SystemPromptRenderSystem, render_prompt_template
43+
from ecs_agent.systems.system_prompt_render_system import SystemPromptRenderSystem
4344
from ecs_agent.systems.tool_execution import ToolExecutionSystem
4445
from ecs_agent.systems.user_prompt_normalization_system import (
4546
UserPromptNormalizationSystem,
@@ -53,6 +54,7 @@
5354
)
5455

5556
from ecs_agent.accounting import AccountingSubscriber
57+
from ecs_agent.workflows import install_workflow
5658
from examples.e2e.plan_and_task.billing import BillingSubscriber
5759
from examples.e2e.plan_and_task.scratchbook_adapter import (
5860
PlanTaskScratchbookAdapter as ArtifactAdapter,
@@ -61,10 +63,8 @@
6163
from examples.e2e.plan_and_task.controller import PlanController, ResumeAction
6264
from examples.e2e.plan_and_task.prompts import (
6365
ADVISOR_SYSTEM_PROMPT,
64-
PLAN_MAIN_AGENT_SYSTEM_PROMPT,
6566
PLAN_QA_REVIEW_SYSTEM_PROMPT,
6667
QA_SYSTEM_PROMPT,
67-
TASK_MAIN_AGENT_SYSTEM_PROMPT,
6868
WRITE_PLAN_SYSTEM_PROMPT,
6969
build_write_plan_prompt,
7070
)
@@ -73,6 +73,7 @@
7373
derive_workflow_id_from_llm,
7474
)
7575
from examples.e2e.plan_and_task.state_models import RuntimeState
76+
from examples.e2e.plan_and_task.workflow_spec import PLAN_TASK_WORKFLOW_SPEC
7677

7778
logger = get_logger(__name__)
7879

@@ -129,7 +130,7 @@ def build_plan_task_world(
129130
world.add_component(
130131
agent_id,
131132
SystemPromptConfigSpec(
132-
template_source=PromptTemplateSource(inline=PLAN_MAIN_AGENT_SYSTEM_PROMPT)
133+
template_source=PromptTemplateSource(inline="${_workflow_state_prompt}")
133134
),
134135
)
135136
world.add_component(agent_id, ToolRegistryComponent(tools={}, handlers={}))
@@ -201,39 +202,19 @@ def build_plan_task_world(
201202
runtime_state: list[RuntimeState | None] = [None]
202203
_base_dir = base_dir or _WORKFLOW_BASE_DIR
203204

204-
def _swap_to_task_prompt(w: World, eid: EntityId, trigger_text: str) -> None:
205-
spec = w.get_component(eid, SystemPromptConfigSpec)
206-
if spec is None:
207-
return
208-
if (spec.template_source.inline or "") == TASK_MAIN_AGENT_SYSTEM_PROMPT:
209-
return
210-
new_spec = SystemPromptConfigSpec(
211-
template_source=PromptTemplateSource(inline=TASK_MAIN_AGENT_SYSTEM_PROMPT)
212-
)
213-
w.add_component(eid, new_spec)
214-
rendered_text, snapshot = render_prompt_template(
215-
template=TASK_MAIN_AGENT_SYSTEM_PROMPT, world=w, entity=eid
216-
)
217-
w.add_component(
218-
eid,
219-
RenderedSystemPromptComponent(
220-
text=rendered_text,
221-
placeholder_snapshot=snapshot,
222-
),
223-
)
224-
llm = w.get_component(eid, LLMComponent)
225-
if llm is not None:
226-
llm.system_prompt = rendered_text
205+
def _sync_workflow_state(w: World, eid: EntityId, phase: str) -> None:
206+
runtime = w.get_component(eid, WorkflowRuntimeComponent)
207+
if runtime is not None:
208+
runtime.current_state_id = phase
209+
210+
def _activate_task_phase(
211+
w: World, eid: EntityId, phase: str, trigger_text: str
212+
) -> None:
213+
_sync_workflow_state(w, eid, phase)
227214
conv = w.get_component(eid, ConversationComponent)
228215
if conv is not None:
229216
conv.messages.clear()
230217
conv.messages.append(Message(role="user", content=trigger_text))
231-
logger.info(
232-
"plan_task_system_prompt_switched",
233-
entity_id=int(eid),
234-
from_prompt="PLAN_MAIN_AGENT",
235-
to_prompt="TASK_MAIN_AGENT",
236-
)
237218

238219

239220
def _load_workflow(w: World, eid: EntityId, workflow_id: str) -> RuntimeState:
@@ -243,6 +224,7 @@ def _load_workflow(w: World, eid: EntityId, workflow_id: str) -> RuntimeState:
243224
adapter_ref[0] = new_adapter
244225
runtime_state[0] = state
245226
w.add_component(eid, build_scratchbook_prompt_config(workflow_id))
227+
_sync_workflow_state(w, eid, state.phase)
246228
return state
247229

248230
async def _on_delegation_completed(event: DelegationCompletedEvent) -> None:
@@ -275,11 +257,13 @@ async def _on_delegation_completed(event: DelegationCompletedEvent) -> None:
275257
runtime_state[0] = controller.handle_advisor_review(
276258
current, adapter, verdict_str, notes=event.result[:500]
277259
)
260+
_sync_workflow_state(world, agent_id, _require_state(runtime_state[0]).phase)
278261
elif event.subagent_name == "qa":
279262
new_state = controller.handle_qa_review(
280263
current, adapter, verdict_str, notes=event.result[:500]
281264
)
282265
runtime_state[0] = new_state
266+
_sync_workflow_state(world, agent_id, new_state.phase)
283267
if new_state.phase == "WRITE_PLAN":
284268
conv = world.get_component(agent_id, ConversationComponent)
285269
if conv is not None:
@@ -298,10 +282,12 @@ async def _on_delegation_completed(event: DelegationCompletedEvent) -> None:
298282
runtime_state[0] = controller.handle_plan_qa_review(
299283
current, adapter, verdict_str, notes=event.result[:500]
300284
)
285+
_sync_workflow_state(world, agent_id, _require_state(runtime_state[0]).phase)
301286
elif event.subagent_name == "plan_writer":
302287
runtime_state[0] = controller.handle_write_plan_completed(
303288
current, adapter
304289
)
290+
_sync_workflow_state(world, agent_id, _require_state(runtime_state[0]).phase)
305291
except ValueError as exc:
306292
logger.error(
307293
"plan_task_verdict_recording_failed",
@@ -335,6 +321,7 @@ async def _handle_plan_start(
335321
_entity_id, build_scratchbook_prompt_config(derived_id)
336322
)
337323
runtime_state[0] = controller.handle_plan_start(adapter_ref[0], description)
324+
_sync_workflow_state(_world, _entity_id, _require_state(runtime_state[0]).phase)
338325
status = controller.get_plan_status(_require_state(runtime_state[0]))
339326
logger.info(
340327
"plan_task_command_plan_start",
@@ -365,6 +352,7 @@ async def _handle_plan_finalize(
365352
runtime_state[0] = controller.handle_plan_finalize(
366353
_require_state(runtime_state[0]), _require_adapter(adapter_ref[0])
367354
)
355+
_sync_workflow_state(_world, _entity_id, _require_state(runtime_state[0]).phase)
368356
logger.info(
369357
"plan_task_command_plan_finalize",
370358
workflow_id=_require_state(runtime_state[0]).workflow_id,
@@ -380,12 +368,15 @@ async def _handle_task_start(
380368
try:
381369
from examples.e2e.plan_and_task.task_exec import TaskExec
382370

383-
# Guard against re-triggering: after _swap_to_task_prompt the /task:start
384-
# message stays as the last role="user" entry (tool results use role="tool"),
385-
# so the trigger would fire on every subsequent tick. Skip re-initialization
386-
# when we are already running in TASK mode.
387-
spec = _world.get_component(_entity_id, SystemPromptConfigSpec)
388-
if spec is not None and (spec.template_source.inline or "") == TASK_MAIN_AGENT_SYSTEM_PROMPT:
371+
# Guard against re-triggering: the /task:start message stays as the last
372+
# role="user" entry (tool results use role="tool"), so the trigger would
373+
# fire on every subsequent tick. Skip re-initialization once task execution
374+
# is already active.
375+
workflow_runtime = _world.get_component(_entity_id, WorkflowRuntimeComponent)
376+
if (
377+
workflow_runtime is not None
378+
and workflow_runtime.current_state_id == "TASK_RUNNING"
379+
):
389380
return None
390381

391382
if runtime_state[0] is None:
@@ -409,7 +400,12 @@ async def _handle_task_start(
409400
runtime_state[0] = task_exec.initialize_task_queue(
410401
current, _require_adapter(adapter_ref[0])
411402
)
412-
_swap_to_task_prompt(_world, _entity_id, _user_text)
403+
_activate_task_phase(
404+
_world,
405+
_entity_id,
406+
_require_state(runtime_state[0]).phase,
407+
_user_text,
408+
)
413409
s = _require_state(runtime_state[0])
414410
logger.info(
415411
"plan_task_command_task_start",
@@ -458,10 +454,17 @@ async def _handle_task_resume(
458454
_world: World, _entity_id: EntityId, _user_text: str
459455
) -> str | None:
460456
try:
457+
if runtime_state[0] is not None and runtime_state[0].phase == "TASK_RUNNING":
458+
return None
461459
runtime_state[0] = controller.handle_task_resume(
462460
_require_state(runtime_state[0]), _require_adapter(adapter_ref[0])
463461
)
464-
_swap_to_task_prompt(_world, _entity_id, _user_text)
462+
_activate_task_phase(
463+
_world,
464+
_entity_id,
465+
_require_state(runtime_state[0]).phase,
466+
_user_text,
467+
)
465468
logger.info(
466469
"plan_task_command_task_resume",
467470
workflow_id=_require_state(runtime_state[0]).workflow_id,
@@ -484,6 +487,7 @@ async def _handle_task_replan(
484487
_require_adapter(adapter_ref[0]),
485488
reason,
486489
)
490+
_sync_workflow_state(_world, _entity_id, _require_state(runtime_state[0]).phase)
487491
s = _require_state(runtime_state[0])
488492
logger.info(
489493
"plan_task_command_task_replan",
@@ -504,6 +508,7 @@ async def _handle_task_abort(
504508
_require_adapter(adapter_ref[0]),
505509
reason="user abort",
506510
)
511+
_sync_workflow_state(_world, _entity_id, _require_state(runtime_state[0]).phase)
507512
s = _require_state(runtime_state[0])
508513
logger.info(
509514
"plan_task_command_task_abort",
@@ -562,6 +567,7 @@ async def _handle_plan_write(
562567
runtime_state[0] = controller.handle_write_plan(
563568
_require_state(runtime_state[0]), adapter
564569
)
570+
_sync_workflow_state(_world, _entity_id, _require_state(runtime_state[0]).phase)
565571
s = _require_state(runtime_state[0])
566572
logger.info("plan_task_command_plan_write", workflow_id=s.workflow_id)
567573
draft_path = str(
@@ -587,6 +593,7 @@ async def _handle_plan_qa_review(
587593
verdict,
588594
notes=notes,
589595
)
596+
_sync_workflow_state(_world, _entity_id, _require_state(runtime_state[0]).phase)
590597
s = _require_state(runtime_state[0])
591598
logger.info(
592599
"plan_task_command_plan_qa_review",
@@ -684,6 +691,9 @@ async def _handle_plan_qa_review(
684691
UserPromptConfigComponent(triggers=triggers, script_handlers=script_handlers),
685692
)
686693

694+
install_workflow(world, agent_id, PLAN_TASK_WORKFLOW_SPEC, agent_key="main")
695+
696+
world.register_system(WorkflowStateSystem(priority=-25), priority=-25)
687697
world.register_system(SystemPromptRenderSystem(priority=-20), priority=-20)
688698
world.register_system(UserPromptNormalizationSystem(priority=-10), priority=-10)
689699
subagent_system = SubagentSystem(priority=-1)

examples/e2e/plan_and_task/state_machine.py

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,17 @@
55
import datetime
66

77
from ecs_agent.logging import get_logger
8+
from ecs_agent.workflows.compiler import compile_workflow
89

910
from examples.e2e.plan_and_task.scratchbook_adapter import (
1011
PlanTaskScratchbookAdapter as ArtifactAdapter,
1112
)
1213
from examples.e2e.plan_and_task.state_models import RuntimeState
14+
from examples.e2e.plan_and_task.workflow_spec import PLAN_TASK_WORKFLOW_SPEC
1315

1416
logger = get_logger(__name__)
1517

16-
VALID_TRANSITIONS: dict[str, set[str]] = {
17-
"IDLE": {"DRAFT_INTERVIEW"},
18-
"DRAFT_INTERVIEW": {"DRAFT_ADVISOR_REVIEW", "DRAFT_QA_REVIEW"},
19-
"DRAFT_ADVISOR_REVIEW": {"DRAFT_QA_REVIEW", "DRAFT_INTERVIEW"},
20-
"DRAFT_QA_REVIEW": {"WRITE_PLAN", "DRAFT_INTERVIEW"},
21-
"WRITE_PLAN": {"PLAN_QA_REVIEW"},
22-
"PLAN_QA_REVIEW": {"PLAN_FINALIZED", "WRITE_PLAN"},
23-
"PLAN_FINALIZED": {"TASK_READY"},
24-
"TASK_READY": {"TASK_RUNNING"},
25-
"TASK_RUNNING": {"TASK_COMPLETED", "TASK_BLOCKED", "TASK_REPLAN", "TASK_ABORTED"},
26-
"TASK_BLOCKED": {"TASK_RUNNING", "TASK_REPLAN", "TASK_ABORTED"},
27-
"TASK_REPLAN": {"DRAFT_INTERVIEW", "DRAFT_ADVISOR_REVIEW", "TASK_RUNNING"},
28-
"TASK_COMPLETED": set(),
29-
"TASK_ABORTED": set(),
30-
}
18+
_COMPILED_WORKFLOW = compile_workflow(PLAN_TASK_WORKFLOW_SPEC)
3119

3220
_TERMINAL_PHASES: frozenset[str] = frozenset({"TASK_COMPLETED", "TASK_ABORTED"})
3321

@@ -48,7 +36,8 @@ def transition(self, state: RuntimeState, to_phase: str) -> RuntimeState:
4836
Raises:
4937
ValueError: If the transition is invalid.
5038
"""
51-
allowed = VALID_TRANSITIONS.get(state.phase, set())
39+
transitions = _COMPILED_WORKFLOW.transitions_by_state.get(state.phase, ())
40+
allowed = {transition.target_state_id for transition in transitions}
5241
if to_phase not in allowed:
5342
raise ValueError(f"Invalid transition: {state.phase}{to_phase}")
5443
state.phase = to_phase
@@ -113,7 +102,7 @@ def _force_phase(self, state: RuntimeState, phase: str) -> None:
113102
114103
This is an administrative-only bypass for exceptional recovery scenarios, such as
115104
marking in-flight tasks as blocked after a restart. Normal phase transitions must
116-
use the transition() method, which validates against VALID_TRANSITIONS.
105+
use the transition() method, which validates against the compiled workflow transition graph.
117106
118107
Args:
119108
state: Current runtime state to modify.

examples/e2e/plan_and_task/task_exec.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
TaskRecord,
2828
)
2929
from examples.e2e.plan_and_task.state_machine import (
30-
VALID_TRANSITIONS,
3130
WorkflowStateMachine,
31+
_COMPILED_WORKFLOW,
3232
)
3333

3434
logger = get_logger(__name__)
@@ -383,7 +383,7 @@ def _transition_to_running(self, state: RuntimeState) -> RuntimeState:
383383
return state
384384

385385
def _allowed_transitions(self, state: RuntimeState) -> set[str]:
386-
return VALID_TRANSITIONS.get(state.phase, set())
386+
return {t.target_state_id for t in _COMPILED_WORKFLOW.transitions_by_state.get(state.phase, ())}
387387

388388
def _utcnow_isoformat(self) -> str:
389389
return datetime.datetime.now(datetime.UTC).isoformat()

0 commit comments

Comments
 (0)