Skip to content

feat(decopilot): in-UI message queue (surface + cancel) + externalize attachments at POST#4155

Open
tlgimenes wants to merge 13 commits into
mainfrom
feat/chat-queue-ui-cancel
Open

feat(decopilot): in-UI message queue (surface + cancel) + externalize attachments at POST#4155
tlgimenes wants to merge 13 commits into
mainfrom
feat/chat-queue-ui-cancel

Conversation

@tlgimenes

@tlgimenes tlgimenes commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Summary

Two related changes that give users control over a thread's pending message queue and stop bloating the durable workflow record.

1. In-UI message queue (surface + cancel)

A thread's messages run one-at-a-time behind a per-thread DBOS thread-gate workflow (partition concurrency = 1). When a deploy strands the head gate PENDING on a dead executor/version, it permanently holds the partition slot and every later message sits ENQUEUED forever ("accepted and queued"), with no UI recourse. This moves that recovery into the user's hands:

  • GET /:org/decopilot/queue/:threadId — lists the thread's pending gate workflows (running head + queued tail) with each message's text.
  • POST /:org/decopilot/queue/:threadId/cancel/:workflowId — cancels one item; a running head also runs the full run-cancel teardown.
  • Stop button now also frees a stranded PENDING gate head (today an orphaned gate ignores the in-memory run cancel).
  • Queue panel above the composer lists pending messages and cancels them per-item.

The queue is read from dbos.workflow_status via DBOS.listWorkflows (the workflow_queue table is empty in this DBOS version). Authz is owner-only (validateThreadOwnership) plus a thread-run:{threadId}: prefix guard — a direct clone of the existing bgtool: cancel pattern.

2. Externalize attachments at POST

The existing uploadFileParts materializer (base64 data: → short mesh-storage: refs) ran only at dispatch — so the raw blobs were already serialized into the DBOS workflow input at POST. It now also runs at POST, bounding dbos.workflow_status.inputs. A pure idempotency guard keeps the still-present dispatch-time call from double-annotating; computeIdempotencyKey stays computed from the original messages so the workflowID is stable.

Testing

  • Unit: 11/11 pass — pure mappers (thread-gate-queue.test.ts) + the attachment idempotency guard (file-materializer.test.ts, TDD).
  • Typecheck/lint: clean for changed files (one pre-existing, unrelated AJV dual-version error in packages/mcp-utils exists at the merge base).
  • E2E: packages/e2e/tests/decopilot-thread-queue.spec.ts is authored and typechecks but was not executed locally (needs Postgres + NATS + dev server). CI is the first real run — see follow-ups.

Reviewed

Per-task reviews + a final whole-branch review (no Critical/Important blockers; the attachment idempotency path and cancel-teardown preservation were traced and verified).

Follow-ups (deferred, flagged in code)

  • e2e hardening (first CI run may surface these): (a) the 403 test currently proves a non-member is rejected — add a non-owner member case for the real ownership boundary; (b) confirm connectDevDb and the e2e app server share one DATABASE_URL (else dbos.* asserts pass vacuously); (c) confirm the {"json":[ctx]} seed envelope parses via DBOS.listWorkflows({loadInput:true}) in SDK 4.21.6.
  • Larger storage redesign (separate PR): persist the user message to thread_messages at POST and pass only a reference into the workflow, with tombstone-on-cancel — making the queue list read thread_messages and shrinking the DBOS input to a pointer. This PR's list endpoint is intentionally source-swappable for that.

🤖 Generated with Claude Code


Summary by cubic

Adds an in-UI message queue for Decopilot threads with per-item cancel and “send while running” support; Stop also frees a stuck gate head. Attachments are externalized at POST so dbos.workflow_status.inputs stays small.

  • New Features

    • GET /:org/decopilot/queue/:threadId returns the thread’s running head and queued tail from DBOS.listWorkflows; the UI shows only waiting items (excludes the active head).
    • POST /:org/decopilot/queue/:threadId/cancel/:workflowId cancels a queue item; if it’s the running head, it runs the full stop teardown. The Stop button now also cancels a stranded PENDING gate head to unblock the queue.
    • UI: a queue panel above the composer lists queued messages with per-item cancel and refreshes via SSE; the composer stays enabled mid-run, and sending enqueues behind the current turn (no optimistic body render).
  • Performance

    • Run uploadFileParts at POST to convert data: blobs to mesh-storage: refs; idempotent via an annotation check and keeps the workflowID stable.

Written for commit bba7ddb. Summary will update on new commits.

Review in cubic

tlgimenes and others added 13 commits June 25, 2026 22:21
Extract cancelActiveThreadRun helper inside createDecopilotRoutes (closes
over cancelBroadcast + runRegistry) and add cancelThreadGateHead call so
the stop endpoint also cancels any PENDING gate head stranded across a
deploy. Collapse the old 200-vs-202 split to a single 202 (frontend only
checks res.ok/404). Add e2e spec decopilot-thread-queue.spec.ts with
seedGate helper + "stop frees stuck PENDING head" case.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add GET /:org/decopilot/queue/:threadId that returns the thread's pending
gate workflows (PENDING head + ENQUEUED tail) as ThreadQueueItem[]. Owner-only
via validateThreadOwnership. E2E cases: list ordering + 403 for non-member.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… DBOS input

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…p TODO

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…SSE refresh

Address three UI remarks on the thread-queue panel (#4155):

1. The queue panel now lists only *waiting* (ENQUEUED) messages. The
   running/PENDING head is excluded — it's already rendered in the chat
   body, so showing it in the queue too was duplicative.

2. The composer is no longer disabled while a run is in progress. A draft
   sends even mid-run (the message enqueues behind the running gate;
   concurrency=1 serializes the thread), so users can stack follow-ups.
   New pure helper `resolveComposerAction({hasDraft,isStreaming,
   isRunInProgress})` → send | stop | disabled, unit-tested; the primary
   button flips Stop→Send the moment there's a draft.

3. The queue is now refreshed from SSE, not a 3s poll. Dropped
   `refetchInterval`; the query re-fetches via `KEYS.threadQueue`
   invalidation on send, on cancel, and on every `conn.status` run
   start/end edge (the SSE stream the chat already consumes).

Verified live against a seeded stranded gate (1 PENDING head + 2 ENQUEUED):
panel shows only the 2 queued, input editable mid-run, per-item cancel +
refresh, and no recurring /queue/ polling in the network log.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The queue panel was duplicating the current message: a just-sent message
sits ENQUEUED ("accepted and queued") for a window before it flips to
PENDING, and the previous filter only excluded PENDING — so the active
head leaked into the panel while also rendering in the chat body.

Define the active run as the *oldest non-terminal* gate workflow (PENDING
or ENQUEUED) and exclude it: new pure `selectWaitingQueueItems(items)`
sorts by enqueuedAt and drops the head. The current message keeps
rendering in the body as before; the panel shows only what comes next.

Verified live: a lone ENQUEUED head shows no panel (only the body), and a
head + one waiting item shows just the waiting one.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…disabled input

Reworks the queue UX per design feedback:

- **Send renders in the body.** A message sent to an idle thread submits
  normally and renders in the thread's user-message place + streams.

- **No disabled composer state, only a streaming state.** Tools, the
  microphone, and the mode pills are always enabled now; only the primary
  button reflects streaming (Send when there's a draft, Stop otherwise).

- **Send while a run is active → the queue, not the body.** New
  `conn.enqueue()` POSTs the message to the gate quietly (no optimistic body
  row, no run-status change); the message is mirrored into a per-thread
  frontend queue and surfaces in the panel. It renders in the body only when
  its turn dequeues and streams.

- **Frontend message queue.** New module-scoped per-thread store
  (`message-queue-store.ts`) with `useMessageQueue(threadId)` +
  `useMessageQueueActions()` hooks (enqueue / cancel / refresh). Seeded from
  the gate on mount, written optimistically on send, and re-synced on every
  SSE run start/end edge and on terminal `onFinish`. Replaces the
  React-Query `useThreadQueue` (deleted) and the `KEYS.threadQueue` key.

`selectWaitingQueueItems` still excludes the active head so the running
message never double-renders.

Verified live (Codex-desktop thread): message renders in the body + streams;
Tools/mic stay enabled mid-run; a message sent behind a running gate lands in
the panel (not the body) and cancels cleanly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@tlgimenes tlgimenes force-pushed the feat/chat-queue-ui-cancel branch from b7b8010 to bba7ddb Compare June 26, 2026 01:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant