feat(decopilot): unified per-run projection workflow (consume step + hosted child harness)#4179
Open
tlgimenes wants to merge 23 commits into
Open
feat(decopilot): unified per-run projection workflow (consume step + hosted child harness)#4179tlgimenes wants to merge 23 commits into
tlgimenes wants to merge 23 commits into
Conversation
Add three pure helper functions for the consume loop: isTerminalStatus, consumeDurableName, and classifyDrainMessage. These validate envelope types and msgId structure before consumption. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add requiresActionIfInProgress to SqlThreadStorage and OrgScopedThreadStorage, expose markRunRequiresAction on ProjectorWorkflowRuntime interface, wire it up in app.ts mirroring completeRunIfNotCompleted (storage call + emitTerminalThreadStatus). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Added a brief inline comment in threads.ts requiresActionIfInProgress() to clarify that run_owner_pod is intentionally NOT cleared when transitioning to requires_action, because this is a pause state (run resumes after tool approval), not a terminal state. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ion) in projector - Extend ProjectChunksResult with finalParts (captured from onFinish hook in projectChunks — no DB round-trip, taken directly from the fold's UIMessage) - Rewrite runProjectorWorkflowBody terminal branch: outcome.failed → "failed"; else resolveThreadStatus(finishReason, finalParts) → "completed" | "requires_action" | "failed"; undefined outcome → "completed" (no-info default) - requires_action path: markRunRequiresAction only, no completion analytics - Gate recordCompleted/recordFailed on truthy flip return (idempotent analytics) - purgeRun now runs on all three terminal outcomes including requires_action - Add 4 new unit tests covering the three-way mapping + no-analytics invariant Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…Parts; test undefined-outcome - Catch block in runProjectorWorkflowBody now captures markRunFailed's return and gates recordFailed on the truthy flip, closing the same recovery-double-emit gap already fixed on the harness path. - project-chunks.ts: replace blanket `as` cast on message.parts with a filter (p != null && typeof p === "object" && "type" in p) so malformed parts cannot reach resolveThreadStatus. - Undefined-outcome test already present in projector-workflow.test.ts (added by ac19806); no additional test needed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Export projectFromJetStreamStep, projectCheckpointFromJetStreamStep, and add getProjectorWorkflowRuntime accessor to support consume-run-projection.ts. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Appends consumeRunProjection() to consume-run-projection.ts — the per-run DBOS step body that binds a durable JetStream consumer, drains it with an idle-timeout watchdog, projects incrementally (checkpoints) and terminally (done), and is the sole terminal-status writer. Uses manual iterator.next() + Promise.race against a sleep() idle watchdog, matching the established createTailStream pattern. Consumer deleted in finally to prevent leaks. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Factor the hosted (in-process) agent-loop execution out of the thread
gate's `!useLink` branch into a single callable `runHostedHarness`, wired
via a module-level `HostedHarnessRuntime` registry (mirroring the thread
gate). The gate now calls `runHostedHarness` inline with its
already-resolved StudioContext — a pure refactor, behavior-identical.
Add the `hostedHarnessWorkflow` DBOS child workflow (one
`runHostedHarness` step, `maxRecoveryAttempts: 1000`) + `enqueueHostedHarness`
(workflow ID `decopilot-hosted:<runId>:<fence>`, HOSTED_HARNESS_QUEUE
partitioned by threadId, concurrency 1). The child + enqueue are unused
until Task 7 turns the gate into a fire-and-forget enqueuer.
No DB terminal-status writes in the hosted path — the consume/projector
step owns terminal status; NATS streaming + {done} publish unchanged.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…h topologies Mint the run fence token + setRunFence at the thread-gate (dispatchRunAndWaitStep), before dispatching the producer, uniformly for the hosted and desktop branches. Thread the gate-minted token down on request.runFenceToken so prepareRun USES it instead of minting its own; keep a mint+write fallback for standalone/legacy callers. run_fence_token is now in the DB before any harness code runs (prerequisite for the async hosted child + consume step in 7b). Behavior-preserving: same fence value reaches the wire harness input, NATS msg ids, and projector/consume checks; only the mint site moves earlier and is deduplicated to one place. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…p live status flip)
Remove the early `storage.update({ status, run_config: null, run_started_at: null })`
from `handleTerminalStatus` in run-reactor.ts for RUN_COMPLETED/RUN_REQUIRES_ACTION.
The consume-run-projection step now owns the terminal DB write; the live reactor only
emits SSE events for instant UX. RUN_FAILED is unchanged (it skips the projector path).
Update integration test expectations to reflect the new contract: row stays in_progress
after RUN_COMPLETED/RUN_REQUIRES_ACTION (consume step writes the terminal status later).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…status (Task 7b) Gate dispatch is now START-only for both topologies: hosted enqueues the hosted-harness child (fire-and-forget), desktop publishes the work item. Completion flows through a new consumeRunProjection step in runDispatchSteps (sole terminal-status writer). Removes the gate status-poll and the dead ThreadGateRuntime.dispatchRunFn. Adds best-effort DBOS.cancelWorkflow of the hosted child in the cancel handler (additive to the in-memory cancel). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…g landed Task 7b wired the hosted harness workflow: the gate's dispatchRunAndWaitStep now enqueues hostedHarnessWorkflow fire-and-forget (instead of inline), and the gate immediately proceeds to its consume step to write terminal status for both hosted and desktop topologies. Updated stale "unused until Task 7" comments in: - hosted-harness-workflow.ts file docstring + enqueueHostedHarness JSDoc - app.ts runtime setup comment - thread-gate-workflow.ts: added note that consume step is skipped on setup failure Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Remove projector-consumer.ts (+ test), start-durable-projector.ts, and the DBOS projectRunWorkflow/projectCheckpointWorkflow registrations from projector-workflow.ts. The consume-run-projection step (Task 7b) is now the projector; projection primitives (projectFromJetStreamStep, projectCheckpointFromJetStreamStep, runProjectorWorkflowBody, runtime setters) are kept as the consume step depends on them. Keeps streamBuffer.init() in app.ts; removes the projectorStarted/projectorHandle startup block. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ECTOR_QUEUE Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds packages/e2e/tests/decopilot-projection.spec.ts with: - Desktop happy path (LIVE): relay body → thread_message_parts + completed - Entry guard (LIVE): already-terminal run is not re-projected - Hosted happy path (LIVE, gated on E2E_ANTHROPIC_KEY) - requires_action, parent recovery, idle-timeout, idempotency (SKIPPED with precise TODOs for each missing harness injection point) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…y-safe) Wrap jsm.consumers.add() in try/catch to swallow "already in use" / "already exists" errors. On DBOS step retry after crash post-add, re-adding with identical config is idempotent but NATS may return benign existence error. Pattern mirrors deleted projector-consumer.ts; config is deterministic so retry is safe. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…p + workflow snapshot DBOS forbids DBOS.startWorkflow from within a step/transaction. Change dispatchRunAndWaitStep to return a HostedEnqueueDescriptor instead of calling enqueueHostedHarness directly; runDispatchSteps (workflow body) performs the actual enqueue after the step returns — legal workflow context. Knip: remove recordLag (dead after projector-consumer deletion), remove export from runHostedHarness/hostedHarnessWorkflow/hostedHarnessWorkflowId (used only within hosted-harness-workflow.ts), trim index.ts barrel accordingly. Regen workflow-source-guard snapshot: adds hosted-harness-workflow.ts, drops projector-workflow.ts, updates thread-gate-workflow.ts hash. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… regression)
Relay/desktop runs end on a {done} marker with no AI-SDK finish chunk, so
finishReason is undefined; resolveThreadStatus(undefined) returns 'failed',
wrongly failing every clean relay run (e2e: harness-conformance,
cli-session-resume, decopilot-projection). Short-circuit absent finishReason to
'completed' (pre-unification projector behavior); explicit reasons still route
through resolveThreadStatus. + regression unit test.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…sume step replaces standalone projector) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…p projects them Automation runs don't go through the routes.ts first-message site that pins user-message threads v2, so createAutomationRunThread defaulted to v1. The consume step (the sole terminal-status writer after the live-FINISH write was removed) skips v1 runs (`row.version !== 2`), so automation runs never reached 'completed' (multi-pod webhook-trigger timeouts). Also: the mismatched-fence e2e negative tests now POST a real message, so the user's own parts persist — assert no ASSISTANT/relay parts landed instead of zero parts total. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Replaces the standalone decopilot projector with a per-run consume step folded into the dispatch workflow, and unifies hosted and desktop runs so they take the same path. One workflow owns each run end-to-end; NATS is a pure transport; projection is reliable and the terminal status has a single writer.
New flow (identical for both topologies): parent dispatch workflow =
dispatch (start a producer that streams to NATS) → consume.hostedHarnessWorkflow,HOSTED_HARNESS_QUEUE) that runs the agent loop. No longer runs inline.consume-run-projection.ts) binds a per-run durable NATS consumer, projects incrementally (checkpoints) + terminally ({done}) by reusing the existing fold engine, and is the sole terminal-status writer, mapping finish-reason →completed | requires_action | failed.Key changes
setRunFence) hoisted to the gate for both topologies, sorun_fence_tokenis written before the consumer reads it.run-reactor.ts handleTerminalStatusdeleted → consume is the only writer ofcompleted/requires_action(SSE/metrics kept). This makes the simpleif terminal → returnentry guard correct.markRunRequiresActionruntime method added; finish-reason → status via existingresolveThreadStatus.projectRunWorkflow/projectCheckpointWorkflow+ startup) deleted.PROJECTOR_QUEUEremoved;HOSTED_HARNESS_QUEUEadded toinitDbos(partition bythreadId, concurrency 1) andRUN_QUEUES— both parent and child run on worker pods.DBOS_WORKFLOW_VERSIONbumped to"2"(parent step sequence changed + new registered workflow).Design + plan:
docs/superpowers/specs/2026-06-26-decopilot-projection-single-workflow-design.md,docs/superpowers/plans/2026-06-26-decopilot-projection-single-workflow.md.Testing
tsctypecheck green across the branch; pure-logic unit tests pass (consume-run-projection.test.ts7/7); each task's relevant unit/integration tests passed at implementation time.packages/e2e/tests/decopilot-projection.spec.ts) could not run locally (no Postgres/NATS/dev-server in the dev sandbox) — please ensure CI runs the full e2e stack. 3 live cases (desktop happy-path, entry-guard, hosted gated onE2E_ANTHROPIC_KEY); 5 casestest.skipwith precise TODOs naming the missing test-injection hooks.Affected areas
apps/meshdispatch-queue (thread-gate + new hosted-harness workflow), decopilot projection (consume step, projector-workflow primitives), run-reactor, storage/threads, app.ts DBOS wiring, index.ts RUN_QUEUES.Follow-ups (non-blocking)
RUN_FAILEDpurges the stream while consume may project the same in-band-error run. Bounded + self-correcting (DB alreadyfailed,markRunFailedidempotent); worst case is truncated failed-run parts or a consume idle-timeout holding the per-thread slot up to 30 min. Not a regression (old poll cap was 1h). Fix: pass an abort signal toconsumeRunProjectionon terminal-failure and/or defer theRUN_FAILEDpurge to consume'spurgeRun.idleTimeoutMsoverride, a raw-publish dedup bypass, and multi-pod kill controls).🤖 Generated with Claude Code
Summary by cubic
Unifies decopilot runs behind a per-run consume step that drains NATS and writes the terminal status. Hosted and desktop now share the same path via a hosted child workflow; the standalone projector is removed.
New Features
consume-run-projection.tsto the dispatch workflow: per-run durable JetStream consumer, incremental checkpoints, final projection, and the only writer ofcompleted | requires_action | failed(with pure helpersisTerminalStatus,consumeDurableName,classifyDrainMessageand retry-safe consumer add).hostedHarnessWorkflow) onHOSTED_HARNESS_QUEUE(partitioned bythreadId, concurrency 1); the parent enqueues the child, starts the producer, then runs the consume step. Cancel routes best-effort cancel the child (cancelHostedHarness).runFenceTokenintoprepareRun.PROJECTOR_QUEUE; kept projection primitives.run-reactor.ts(reactor now emits SSE only).completed | requires_action | failed; whenfinishReasonis absent (desktop/relay), default tocompleted. AddedmarkRunRequiresAction(keepsrun_owner_podon pause).link-ingest.spec.ts.Migration
HOSTED_HARNESS_QUEUEand removePROJECTOR_QUEUEfromRUN_QUEUES.DBOS_WORKFLOW_VERSIONset to "2".E2E_ANTHROPIC_KEYto run hosted-path tests (optional).Written for commit d8c4100. Summary will update on new commits.