Skip to content

feat(decopilot): unified per-run projection workflow (consume step + hosted child harness)#4179

Open
tlgimenes wants to merge 23 commits into
mainfrom
eta-telescopii
Open

feat(decopilot): unified per-run projection workflow (consume step + hosted child harness)#4179
tlgimenes wants to merge 23 commits into
mainfrom
eta-telescopii

Conversation

@tlgimenes

@tlgimenes tlgimenes commented Jun 27, 2026

Copy link
Copy Markdown
Contributor

Summary

Replaces the standalone decopilot projector with a per-run consume step folded into the dispatch workflow, and unifies hosted and desktop runs so they take the same path. One workflow owns each run end-to-end; NATS is a pure transport; projection is reliable and the terminal status has a single writer.

New flow (identical for both topologies): parent dispatch workflow = dispatch (start a producer that streams to NATS) → consume.

  • Desktop publishes a work item to the daemon (as before).
  • Hosted enqueues a fire-and-forget DBOS child workflow (hostedHarnessWorkflow, HOSTED_HARNESS_QUEUE) that runs the agent loop. No longer runs inline.
  • The consume step (consume-run-projection.ts) binds a per-run durable NATS consumer, projects incrementally (checkpoints) + terminally ({done}) by reusing the existing fold engine, and is the sole terminal-status writer, mapping finish-reason → completed | requires_action | failed.

Key changes

  • Run-claim (fence mint + setRunFence) hoisted to the gate for both topologies, so run_fence_token is written before the consumer reads it.
  • Early terminal-status write in run-reactor.ts handleTerminalStatus deleted → consume is the only writer of completed/requires_action (SSE/metrics kept). This makes the simple if terminal → return entry guard correct.
  • markRunRequiresAction runtime method added; finish-reason → status via existing resolveThreadStatus.
  • Standalone projector (consumer + projectRunWorkflow/projectCheckpointWorkflow + startup) deleted.
  • Queues: PROJECTOR_QUEUE removed; HOSTED_HARNESS_QUEUE added to initDbos (partition by threadId, concurrency 1) and RUN_QUEUESboth parent and child run on worker pods.
  • DBOS_WORKFLOW_VERSION bumped to "2" (parent step sequence changed + new registered workflow).

Design + plan: docs/superpowers/specs/2026-06-26-decopilot-projection-single-workflow-design.md, docs/superpowers/plans/2026-06-26-decopilot-projection-single-workflow.md.

Testing

  • tsc typecheck green across the branch; pure-logic unit tests pass (consume-run-projection.test.ts 7/7); each task's relevant unit/integration tests passed at implementation time.
  • E2E (packages/e2e/tests/decopilot-projection.spec.ts) could not run locally (no Postgres/NATS/dev-server in the dev sandbox) — please ensure CI runs the full e2e stack. 3 live cases (desktop happy-path, entry-guard, hosted gated on E2E_ANTHROPIC_KEY); 5 cases test.skip with precise TODOs naming the missing test-injection hooks.
  • A known Bun 1.3.14 segfault when batching the whole decopilot dir — tests were run per-file.

Affected areas

apps/mesh dispatch-queue (thread-gate + new hosted-harness workflow), decopilot projection (consume step, projector-workflow primitives), run-reactor, storage/threads, app.ts DBOS wiring, index.ts RUN_QUEUES.

Follow-ups (non-blocking)

  • Hosted-failure purge/consume race: RUN_FAILED purges the stream while consume may project the same in-band-error run. Bounded + self-correcting (DB already failed, markRunFailed idempotent); worst case is truncated failed-run parts or a consume idle-timeout holding the per-thread slot up to 30 min. Not a regression (old poll cap was 1h). Fix: pass an abort signal to consumeRunProjection on terminal-failure and/or defer the RUN_FAILED purge to consume's purgeRun.
  • Extract a shared types-only module for 3 duplicated workflow types (currently duplicated to break a DBOS register-at-import cycle).
  • Wire the 5 skipped e2e cases (need an idleTimeoutMs override, a raw-publish dedup bypass, and multi-pod kill controls).

🤖 Generated with Claude Code


Summary by cubic

Unifies decopilot runs behind a per-run consume step that drains NATS and writes the terminal status. Hosted and desktop now share the same path via a hosted child workflow; the standalone projector is removed.

  • New Features

    • Added consume-run-projection.ts to the dispatch workflow: per-run durable JetStream consumer, incremental checkpoints, final projection, and the only writer of completed | requires_action | failed (with pure helpers isTerminalStatus, consumeDurableName, classifyDrainMessage and retry-safe consumer add).
    • Hosted runs now execute in a DBOS child workflow (hostedHarnessWorkflow) on HOSTED_HARNESS_QUEUE (partitioned by threadId, concurrency 1); the parent enqueues the child, starts the producer, then runs the consume step. Cancel routes best-effort cancel the child (cancelHostedHarness).
    • Hoisted run-claim to the thread gate and threaded runFenceToken into prepareRun.
    • Removed standalone projector consumer/workflows and PROJECTOR_QUEUE; kept projection primitives.
    • Removed early terminal-status write from run-reactor.ts (reactor now emits SSE only).
    • Mapped finish-reason to completed | requires_action | failed; when finishReason is absent (desktop/relay), default to completed. Added markRunRequiresAction (keeps run_owner_pod on pause).
    • E2E: relay-ingest tests now drive real dispatch to exercise the consume step; desktop happy-path coverage moved to link-ingest.spec.ts.
  • Migration

    • Update workers to dequeue HOSTED_HARNESS_QUEUE and remove PROJECTOR_QUEUE from RUN_QUEUES.
    • Deploy with DBOS_WORKFLOW_VERSION set to "2".
    • Ensure CI/e2e runs Postgres + NATS; set E2E_ANTHROPIC_KEY to run hosted-path tests (optional).
    • Automation runs now create v2 threads so the consume step can project them (no manual action required).

Written for commit d8c4100. Summary will update on new commits.

Review in cubic

tlgimenes and others added 23 commits June 26, 2026 18:47
Add three pure helper functions for the consume loop: isTerminalStatus,
consumeDurableName, and classifyDrainMessage. These validate envelope types
and msgId structure before consumption.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add requiresActionIfInProgress to SqlThreadStorage and OrgScopedThreadStorage,
expose markRunRequiresAction on ProjectorWorkflowRuntime interface, wire it up
in app.ts mirroring completeRunIfNotCompleted (storage call + emitTerminalThreadStatus).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Added a brief inline comment in threads.ts requiresActionIfInProgress() to clarify
that run_owner_pod is intentionally NOT cleared when transitioning to requires_action,
because this is a pause state (run resumes after tool approval), not a terminal state.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ion) in projector

- Extend ProjectChunksResult with finalParts (captured from onFinish hook in
  projectChunks — no DB round-trip, taken directly from the fold's UIMessage)
- Rewrite runProjectorWorkflowBody terminal branch: outcome.failed → "failed";
  else resolveThreadStatus(finishReason, finalParts) → "completed" |
  "requires_action" | "failed"; undefined outcome → "completed" (no-info default)
- requires_action path: markRunRequiresAction only, no completion analytics
- Gate recordCompleted/recordFailed on truthy flip return (idempotent analytics)
- purgeRun now runs on all three terminal outcomes including requires_action
- Add 4 new unit tests covering the three-way mapping + no-analytics invariant

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…Parts; test undefined-outcome

- Catch block in runProjectorWorkflowBody now captures markRunFailed's
  return and gates recordFailed on the truthy flip, closing the same
  recovery-double-emit gap already fixed on the harness path.
- project-chunks.ts: replace blanket `as` cast on message.parts with a
  filter (p != null && typeof p === "object" && "type" in p) so malformed
  parts cannot reach resolveThreadStatus.
- Undefined-outcome test already present in projector-workflow.test.ts
  (added by ac19806); no additional test needed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Export projectFromJetStreamStep, projectCheckpointFromJetStreamStep, and add
getProjectorWorkflowRuntime accessor to support consume-run-projection.ts.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Appends consumeRunProjection() to consume-run-projection.ts — the per-run
DBOS step body that binds a durable JetStream consumer, drains it with an
idle-timeout watchdog, projects incrementally (checkpoints) and terminally
(done), and is the sole terminal-status writer. Uses manual iterator.next() +
Promise.race against a sleep() idle watchdog, matching the established
createTailStream pattern. Consumer deleted in finally to prevent leaks.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Factor the hosted (in-process) agent-loop execution out of the thread
gate's `!useLink` branch into a single callable `runHostedHarness`, wired
via a module-level `HostedHarnessRuntime` registry (mirroring the thread
gate). The gate now calls `runHostedHarness` inline with its
already-resolved StudioContext — a pure refactor, behavior-identical.

Add the `hostedHarnessWorkflow` DBOS child workflow (one
`runHostedHarness` step, `maxRecoveryAttempts: 1000`) + `enqueueHostedHarness`
(workflow ID `decopilot-hosted:<runId>:<fence>`, HOSTED_HARNESS_QUEUE
partitioned by threadId, concurrency 1). The child + enqueue are unused
until Task 7 turns the gate into a fire-and-forget enqueuer.

No DB terminal-status writes in the hosted path — the consume/projector
step owns terminal status; NATS streaming + {done} publish unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…h topologies

Mint the run fence token + setRunFence at the thread-gate
(dispatchRunAndWaitStep), before dispatching the producer, uniformly for the
hosted and desktop branches. Thread the gate-minted token down on
request.runFenceToken so prepareRun USES it instead of minting its own; keep a
mint+write fallback for standalone/legacy callers. run_fence_token is now in the
DB before any harness code runs (prerequisite for the async hosted child +
consume step in 7b). Behavior-preserving: same fence value reaches the wire
harness input, NATS msg ids, and projector/consume checks; only the mint site
moves earlier and is deduplicated to one place.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…p live status flip)

Remove the early `storage.update({ status, run_config: null, run_started_at: null })`
from `handleTerminalStatus` in run-reactor.ts for RUN_COMPLETED/RUN_REQUIRES_ACTION.
The consume-run-projection step now owns the terminal DB write; the live reactor only
emits SSE events for instant UX. RUN_FAILED is unchanged (it skips the projector path).

Update integration test expectations to reflect the new contract: row stays in_progress
after RUN_COMPLETED/RUN_REQUIRES_ACTION (consume step writes the terminal status later).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…status (Task 7b)

Gate dispatch is now START-only for both topologies: hosted enqueues the
hosted-harness child (fire-and-forget), desktop publishes the work item.
Completion flows through a new consumeRunProjection step in runDispatchSteps
(sole terminal-status writer). Removes the gate status-poll and the dead
ThreadGateRuntime.dispatchRunFn. Adds best-effort DBOS.cancelWorkflow of the
hosted child in the cancel handler (additive to the in-memory cancel).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…g landed

Task 7b wired the hosted harness workflow: the gate's dispatchRunAndWaitStep now
enqueues hostedHarnessWorkflow fire-and-forget (instead of inline), and the gate
immediately proceeds to its consume step to write terminal status for both hosted
and desktop topologies. Updated stale "unused until Task 7" comments in:
- hosted-harness-workflow.ts file docstring + enqueueHostedHarness JSDoc
- app.ts runtime setup comment
- thread-gate-workflow.ts: added note that consume step is skipped on setup failure

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Remove projector-consumer.ts (+ test), start-durable-projector.ts, and the
DBOS projectRunWorkflow/projectCheckpointWorkflow registrations from
projector-workflow.ts. The consume-run-projection step (Task 7b) is now the
projector; projection primitives (projectFromJetStreamStep,
projectCheckpointFromJetStreamStep, runProjectorWorkflowBody, runtime setters)
are kept as the consume step depends on them. Keeps streamBuffer.init() in
app.ts; removes the projectorStarted/projectorHandle startup block.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ECTOR_QUEUE

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds packages/e2e/tests/decopilot-projection.spec.ts with:
- Desktop happy path (LIVE): relay body → thread_message_parts + completed
- Entry guard (LIVE): already-terminal run is not re-projected
- Hosted happy path (LIVE, gated on E2E_ANTHROPIC_KEY)
- requires_action, parent recovery, idle-timeout, idempotency (SKIPPED with
  precise TODOs for each missing harness injection point)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…y-safe)

Wrap jsm.consumers.add() in try/catch to swallow "already in use" / "already exists"
errors. On DBOS step retry after crash post-add, re-adding with identical config is
idempotent but NATS may return benign existence error. Pattern mirrors deleted
projector-consumer.ts; config is deterministic so retry is safe.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…p + workflow snapshot

DBOS forbids DBOS.startWorkflow from within a step/transaction. Change
dispatchRunAndWaitStep to return a HostedEnqueueDescriptor instead of calling
enqueueHostedHarness directly; runDispatchSteps (workflow body) performs the
actual enqueue after the step returns — legal workflow context.

Knip: remove recordLag (dead after projector-consumer deletion), remove export
from runHostedHarness/hostedHarnessWorkflow/hostedHarnessWorkflowId (used only
within hosted-harness-workflow.ts), trim index.ts barrel accordingly.

Regen workflow-source-guard snapshot: adds hosted-harness-workflow.ts, drops
projector-workflow.ts, updates thread-gate-workflow.ts hash.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… regression)

Relay/desktop runs end on a {done} marker with no AI-SDK finish chunk, so
finishReason is undefined; resolveThreadStatus(undefined) returns 'failed',
wrongly failing every clean relay run (e2e: harness-conformance,
cli-session-resume, decopilot-projection). Short-circuit absent finishReason to
'completed' (pre-unification projector behavior); explicit reasons still route
through resolveThreadStatus. + regression unit test.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…sume step replaces standalone projector)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…p projects them

Automation runs don't go through the routes.ts first-message site that pins
user-message threads v2, so createAutomationRunThread defaulted to v1. The
consume step (the sole terminal-status writer after the live-FINISH write was
removed) skips v1 runs (`row.version !== 2`), so automation runs never reached
'completed' (multi-pod webhook-trigger timeouts). Also: the mismatched-fence e2e
negative tests now POST a real message, so the user's own parts persist — assert
no ASSISTANT/relay parts landed instead of zero parts total.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant