From 5e527a427f73b26f27d02612eda651e1a859da7d Mon Sep 17 00:00:00 2001 From: gimenes Date: Thu, 25 Jun 2026 18:22:00 -0300 Subject: [PATCH 01/13] feat(decopilot): pure mappers for the thread-gate queue --- .../dispatch-queue/thread-gate-queue.test.ts | 93 +++++++++++++++++++ .../src/dispatch-queue/thread-gate-queue.ts | 56 +++++++++++ 2 files changed, 149 insertions(+) create mode 100644 apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts create mode 100644 apps/mesh/src/dispatch-queue/thread-gate-queue.ts diff --git a/apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts b/apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts new file mode 100644 index 0000000000..b755b2e374 --- /dev/null +++ b/apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts @@ -0,0 +1,93 @@ +// apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts +import { describe, expect, it } from "bun:test"; +import { + extractUserMessageText, + gateStatusToQueueItem, +} from "./thread-gate-queue"; + +const userMsg = (text: string) => ({ + id: "m1", + role: "user" as const, + parts: [{ type: "text" as const, text }], +}); + +describe("extractUserMessageText", () => { + it("returns the concatenated text parts of the last user message", () => { + const messages = [ + { id: "a", role: "assistant", parts: [{ type: "text", text: "hi" }] }, + { + id: "u", + role: "user", + parts: [ + { type: "text", text: "hello " }, + { type: "file", url: "mesh-storage:k", mediaType: "image/png" }, + { type: "text", text: "world" }, + ], + }, + ] as never; + expect(extractUserMessageText(messages)).toBe("hello world"); + }); + + it("returns empty string when there is no user message", () => { + const messages = [ + { id: "a", role: "assistant", parts: [{ type: "text", text: "hi" }] }, + ] as never; + expect(extractUserMessageText(messages)).toBe(""); + }); + + it("returns empty string for an empty array", () => { + expect(extractUserMessageText([] as never)).toBe(""); + }); +}); + +describe("gateStatusToQueueItem", () => { + const threadId = "11bda36e"; + const base = { + workflowID: `thread-run:${threadId}:msg-7`, + status: "ENQUEUED", + createdAt: 1782400000000, + input: [ + { + threadId, + request: { messages: [userMsg("queued text")] }, + source: "user-message", + }, + ], + } as never; + + it("maps an ENQUEUED gate to a queued item with parsed messageId + text", () => { + const item = gateStatusToQueueItem(base, threadId); + expect(item).toEqual({ + workflowId: `thread-run:${threadId}:msg-7`, + messageId: "msg-7", + text: "queued text", + status: "queued", + enqueuedAt: 1782400000000, + }); + }); + + it("maps a PENDING gate to status 'running'", () => { + const item = gateStatusToQueueItem( + { ...base, status: "PENDING" } as never, + threadId, + ); + expect(item?.status).toBe("running"); + }); + + it("returns null when the workflowID does not match the thread prefix", () => { + const item = gateStatusToQueueItem( + { ...base, workflowID: "thread-run:other:msg-7" } as never, + threadId, + ); + expect(item).toBeNull(); + }); + + it("tolerates a missing/unshaped input (empty text)", () => { + const item = gateStatusToQueueItem( + { ...base, input: undefined } as never, + threadId, + ); + expect(item?.text).toBe(""); + expect(item?.messageId).toBe("msg-7"); + }); +}); diff --git a/apps/mesh/src/dispatch-queue/thread-gate-queue.ts b/apps/mesh/src/dispatch-queue/thread-gate-queue.ts new file mode 100644 index 0000000000..19e0cfc172 --- /dev/null +++ b/apps/mesh/src/dispatch-queue/thread-gate-queue.ts @@ -0,0 +1,56 @@ +// apps/mesh/src/dispatch-queue/thread-gate-queue.ts +import type { WorkflowStatus } from "@dbos-inc/dbos-sdk"; +import type { ChatMessage } from "@/api/routes/decopilot/types"; +import type { ThreadGateContext } from "./thread-gate-workflow"; + +/** A pending message in a thread's gate queue, surfaced to the UI. */ +export interface ThreadQueueItem { + /** Full DBOS workflow id: `thread-run:{threadId}:{messageId}`. */ + workflowId: string; + /** Trailing segment of the workflow id (the user message id). */ + messageId: string; + /** Display text of the queued user message. */ + text: string; + /** PENDING (slot holder / running or stuck) → running; ENQUEUED → queued. */ + status: "running" | "queued"; + /** Epoch ms the gate was created/enqueued. */ + enqueuedAt: number; +} + +/** Concatenate the text parts of the last user message. Pure + total. */ +export function extractUserMessageText(messages: ChatMessage[]): string { + if (!Array.isArray(messages) || messages.length === 0) return ""; + const lastUserIdx = messages.findLastIndex((m) => m?.role === "user"); + if (lastUserIdx === -1) return ""; + const parts = messages[lastUserIdx]?.parts ?? []; + return parts + .map((p) => + p && typeof p === "object" && (p as { type?: string }).type === "text" + ? ((p as { text?: string }).text ?? "") + : "", + ) + .join("") + .trim(); +} + +/** + * Map one `thread-gate` WorkflowStatus row to a queue item. Returns null when + * the workflow id does not carry this thread's `thread-run:{threadId}:` prefix + * (defensive — listWorkflows is already prefix-filtered). + */ +export function gateStatusToQueueItem( + status: WorkflowStatus, + threadId: string, +): ThreadQueueItem | null { + const prefix = `thread-run:${threadId}:`; + if (!status.workflowID.startsWith(prefix)) return null; + const gateCtx = status.input?.[0] as ThreadGateContext | undefined; + const text = extractUserMessageText(gateCtx?.request?.messages ?? []); + return { + workflowId: status.workflowID, + messageId: status.workflowID.slice(prefix.length), + text, + status: status.status === "PENDING" ? "running" : "queued", + enqueuedAt: status.createdAt, + }; +} From b76e9a334ff434ab11cfa3332f616d6edb4454e3 Mon Sep 17 00:00:00 2001 From: gimenes Date: Thu, 25 Jun 2026 18:25:36 -0300 Subject: [PATCH 02/13] fix(decopilot): type test stubs so the gate-queue mapper test typechecks --- apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts b/apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts index b755b2e374..7e24a91822 100644 --- a/apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts +++ b/apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts @@ -53,10 +53,10 @@ describe("gateStatusToQueueItem", () => { source: "user-message", }, ], - } as never; + }; it("maps an ENQUEUED gate to a queued item with parsed messageId + text", () => { - const item = gateStatusToQueueItem(base, threadId); + const item = gateStatusToQueueItem(base as never, threadId); expect(item).toEqual({ workflowId: `thread-run:${threadId}:msg-7`, messageId: "msg-7", From 1cb51b985ccee00d7748c597ace708edf34df4dd Mon Sep 17 00:00:00 2001 From: gimenes Date: Thu, 25 Jun 2026 18:29:51 -0300 Subject: [PATCH 03/13] feat(decopilot): DBOS list/cancel primitives for the thread-gate queue --- .../src/dispatch-queue/thread-gate-queue.ts | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/apps/mesh/src/dispatch-queue/thread-gate-queue.ts b/apps/mesh/src/dispatch-queue/thread-gate-queue.ts index 19e0cfc172..2343af1fde 100644 --- a/apps/mesh/src/dispatch-queue/thread-gate-queue.ts +++ b/apps/mesh/src/dispatch-queue/thread-gate-queue.ts @@ -1,4 +1,5 @@ // apps/mesh/src/dispatch-queue/thread-gate-queue.ts +import { DBOS } from "@dbos-inc/dbos-sdk"; import type { WorkflowStatus } from "@dbos-inc/dbos-sdk"; import type { ChatMessage } from "@/api/routes/decopilot/types"; import type { ThreadGateContext } from "./thread-gate-workflow"; @@ -54,3 +55,53 @@ export function gateStatusToQueueItem( enqueuedAt: status.createdAt, }; } + +/** + * List a thread's pending gate workflows (PENDING head + ENQUEUED tail) as + * UI queue items, oldest first. Reads `workflow_status` via DBOS.listWorkflows + * (the queue lives there — `workflow_queue` is unused in this DBOS version). + */ +export async function listThreadGateQueue( + threadId: string, +): Promise { + const rows = await DBOS.listWorkflows({ + workflow_id_prefix: `thread-run:${threadId}:`, + status: ["PENDING", "ENQUEUED"], + loadInput: true, + loadOutput: false, + }); + return rows + .map((r) => gateStatusToQueueItem(r, threadId)) + .filter((i): i is ThreadQueueItem => i !== null) + .sort((a, b) => a.enqueuedAt - b.enqueuedAt); +} + +/** + * Cancel one gate workflow, guarded by the thread prefix (the tenant authz — + * the caller has already verified thread ownership). Returns false when the id + * is not scoped to this thread (caller should 404). + */ +export async function cancelThreadGateWorkflow( + threadId: string, + workflowId: string, +): Promise { + if (!workflowId.startsWith(`thread-run:${threadId}:`)) return false; + await DBOS.cancelWorkflows([workflowId]); + return true; +} + +/** + * Cancel the thread's PENDING gate(s) — the slot holder(s). Used by the stop + * button so "stop" frees a stuck/wedged head; ENQUEUED items are left intact so + * the queue continues (Codex semantics). No-op when nothing is PENDING. + */ +export async function cancelThreadGateHead(threadId: string): Promise { + const pending = await DBOS.listWorkflows({ + workflow_id_prefix: `thread-run:${threadId}:`, + status: ["PENDING"], + loadInput: false, + loadOutput: false, + }); + if (pending.length === 0) return; + await DBOS.cancelWorkflows(pending.map((w) => w.workflowID)); +} From 891cbbf847c85be99ee80c4fc37394daf35b5b00 Mon Sep 17 00:00:00 2001 From: gimenes Date: Thu, 25 Jun 2026 18:36:18 -0300 Subject: [PATCH 04/13] feat(decopilot): stop button frees a stuck thread-gate head Extract cancelActiveThreadRun helper inside createDecopilotRoutes (closes over cancelBroadcast + runRegistry) and add cancelThreadGateHead call so the stop endpoint also cancels any PENDING gate head stranded across a deploy. Collapse the old 200-vs-202 split to a single 202 (frontend only checks res.ok/404). Add e2e spec decopilot-thread-queue.spec.ts with seedGate helper + "stop frees stuck PENDING head" case. Co-Authored-By: Claude Sonnet 4.6 --- apps/mesh/src/api/routes/decopilot/routes.ts | 61 +++++----- .../e2e/tests/decopilot-thread-queue.spec.ts | 104 ++++++++++++++++++ 2 files changed, 133 insertions(+), 32 deletions(-) create mode 100644 packages/e2e/tests/decopilot-thread-queue.spec.ts diff --git a/apps/mesh/src/api/routes/decopilot/routes.ts b/apps/mesh/src/api/routes/decopilot/routes.ts index 56ece62bf1..107b0d7689 100644 --- a/apps/mesh/src/api/routes/decopilot/routes.ts +++ b/apps/mesh/src/api/routes/decopilot/routes.ts @@ -59,6 +59,7 @@ import type { HarnessId } from "@/harnesses"; import type { Thread } from "@/storage/types"; import { cancelThreadBackgroundJobs } from "@/harnesses/decopilot/background-tool-workflow"; import { abortBackgroundJobs } from "@/harnesses/decopilot/background-abort-registry"; +import { cancelThreadGateHead } from "@/dispatch-queue/thread-gate-queue"; // Per-connection /stream tail diagnostics. Flip to "1" in an environment where // the live stream intermittently delivers no chunks — logs the resolved @@ -722,55 +723,46 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { // Cancel Endpoint — cancel ongoing run (local or via NATS to owning pod) // ============================================================================ - app.post("/:org/decopilot/cancel/:threadId", async (c) => { - const { ctx, taskId, thread, organization, userId } = - await validateThreadOwnership(c); - - // Persist durable cancel flag so the ingest backstop rejects 409. + // Shared run-cancel teardown: durable cancel flag, background-job teardown, + // in-pod abort + cross-pod NATS broadcast, local turn cancel, ghost force-fail. + // Used by the stop endpoint and the per-item queue cancel (running head). + async function cancelActiveThreadRun(args: { + ctx: StudioContext; + taskId: string; + thread: Thread; + organization: { id: string }; + userId: string; + }): Promise { + const { ctx, taskId, thread, organization, userId } = args; await ctx.storage.threads.setCancelRequested(taskId, organization.id); - - // Tear down any background-tool workflows this thread started (image gen / - // backgrounded subtasks). They run on their own DBOS queue, so the - // in-memory run cancel below doesn't reach them. Non-fatal: a DBOS hiccup - // must not block the user-facing cancel. await cancelThreadBackgroundJobs(taskId).catch((err) => { console.error("[decopilot:cancel] failed to cancel background jobs", { taskId, err, }); }); - - // Abort in-flight background work on this pod now, and fan the cancel out - // to every pod over NATS. Always broadcast: a background job runs on - // whichever pod DBOS dequeued it, so a locally-owned turn no longer implies - // no other pod is involved. Each pod's onCancel aborts its background - // controllers (`abortBackgroundJobs`) and cancels the live turn if it owns - // it; both are no-ops where they don't apply. + // Free a stranded/stuck PENDING gate head so the partition slot releases and + // the queue can proceed (an orphaned gate ignores the in-memory run cancel). + await cancelThreadGateHead(taskId).catch((err) => { + console.error("[decopilot:cancel] failed to cancel gate head", { + taskId, + err, + }); + }); abortBackgroundJobs(taskId); cancelBroadcast.broadcast(taskId); - - // Try to cancel the live turn locally for an immediate response. const cancelTransitions = await runRegistry.execute({ type: "CANCEL", taskId, }); - if (cancelTransitions.some((t) => t.event.type === "RUN_FAILED")) { - cancelBroadcast.publishControlFrame(userId, { - type: "cancel", - runId: taskId, - }); - return c.json({ cancelled: true }); - } - cancelBroadcast.publishControlFrame(userId, { type: "cancel", runId: taskId, }); - - // Ghost run: server restarted while a run was in progress. No pod has this - // run in memory, so the broadcast will never resolve. Force-fail the thread - // in the DB so the user can send new messages. - if (thread.status === "in_progress") { + const producedRunFailed = cancelTransitions.some( + (t) => t.event.type === "RUN_FAILED", + ); + if (!producedRunFailed && thread.status === "in_progress") { console.warn("[decopilot:cancel] Ghost run detected, force-failing", { taskId, }); @@ -791,7 +783,12 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { ); }); } + } + app.post("/:org/decopilot/cancel/:threadId", async (c) => { + const { ctx, taskId, thread, organization, userId } = + await validateThreadOwnership(c); + await cancelActiveThreadRun({ ctx, taskId, thread, organization, userId }); return c.json({ cancelled: true, async: true }, 202); }); diff --git a/packages/e2e/tests/decopilot-thread-queue.spec.ts b/packages/e2e/tests/decopilot-thread-queue.spec.ts new file mode 100644 index 0000000000..b42355886f --- /dev/null +++ b/packages/e2e/tests/decopilot-thread-queue.spec.ts @@ -0,0 +1,104 @@ +// packages/e2e/tests/decopilot-thread-queue.spec.ts +import { test, expect } from "../fixtures/test"; +import { connectDevDb } from "../fixtures/db"; +import { callSelfMcpTool } from "../fixtures/mcp-tools"; +import type { APIRequestContext } from "@playwright/test"; + +// NOTE: org/user/thread setup pattern mirrors decopilot-messages.spec.ts. + +/** Create a real agent (virtual MCP) + thread row; returns both ids. */ +async function createAgentAndThread( + api: APIRequestContext, + orgSlug: string, +): Promise<{ agentId: string; threadId: string }> { + const agent = await callSelfMcpTool<{ item: { id: string } }>( + api, + orgSlug, + "COLLECTION_VIRTUAL_MCP_CREATE", + { + data: { + title: "E2E Thread Queue Agent", + connections: [], + status: "active", + pinned: false, + }, + }, + ); + const thread = await callSelfMcpTool<{ item: { id: string } }>( + api, + orgSlug, + "COLLECTION_THREADS_CREATE", + { + data: { virtual_mcp_id: agent.item.id, title: "E2E Thread Queue Thread" }, + }, + ); + return { agentId: agent.item.id, threadId: thread.item.id }; +} + +/** + * Seed a thread-gate workflow_status row directly (DBOS-aware fixture). `inputs` + * MUST be the DBOS serialization envelope {"json":[]} so + * listWorkflows({loadInput:true}) can parse it. Column set matches + * decopilot-projector-dbos.spec.ts conventions. + */ +async function seedGate( + db: { + query: (sql: string, params: unknown[]) => Promise<{ rows: unknown[] }>; + }, + threadId: string, + o: { id: string; status: "PENDING" | "ENQUEUED"; at: number; text: string }, +): Promise { + const wfId = `thread-run:${threadId}:${o.id}`; + const ctx = { + threadId, + request: { + messages: [ + { id: o.id, role: "user", parts: [{ type: "text", text: o.text }] }, + ], + }, + source: "user-message", + }; + await db.query( + `INSERT INTO dbos.workflow_status + (workflow_uuid, status, name, class_name, config_name, queue_name, + executor_id, application_version, application_id, recovery_attempts, + priority, created_at, updated_at, inputs) + VALUES ($1,$2,'threadGateWorkflow','','','thread-gate', + 'seed-exec','seed-version','seed-app',0,0,$3,$3,$4)`, + [wfId, o.status, String(o.at), JSON.stringify({ json: [ctx] })], + ); + return wfId; +} + +test("stop cancels a stuck PENDING gate head (frees the partition slot)", async ({ + authedPage, +}) => { + const { page, orgSlug } = authedPage; + const api = page.context().request; + const db = await connectDevDb(); + + try { + const { threadId } = await createAgentAndThread(api, orgSlug); + + // Arrange: a deploy-stranded head holding the slot. + const wfId = await seedGate(db, threadId, { + id: "stuck-head", + status: "PENDING", + at: Date.now(), + text: "stuck", + }); + + // Act: hit the stop endpoint. + const res = await api.post(`/api/${orgSlug}/decopilot/cancel/${threadId}`); + expect(res.status()).toBe(202); + + // Assert: the gate head is now CANCELLED (slot freed). + const { rows } = await db.query( + `SELECT status FROM dbos.workflow_status WHERE workflow_uuid = $1`, + [wfId], + ); + expect((rows[0] as { status: string }).status).toBe("CANCELLED"); + } finally { + await db.end(); + } +}); From 835e97d97cb01bf4acdc5c6238b3d98f07f5c212 Mon Sep 17 00:00:00 2001 From: gimenes Date: Thu, 25 Jun 2026 18:41:47 -0300 Subject: [PATCH 05/13] feat(decopilot): GET thread queue endpoint Add GET /:org/decopilot/queue/:threadId that returns the thread's pending gate workflows (PENDING head + ENQUEUED tail) as ThreadQueueItem[]. Owner-only via validateThreadOwnership. E2E cases: list ordering + 403 for non-member. Co-Authored-By: Claude Sonnet 4.6 --- apps/mesh/src/api/routes/decopilot/routes.ts | 20 +++++- .../e2e/tests/decopilot-thread-queue.spec.ts | 70 ++++++++++++++++++- 2 files changed, 88 insertions(+), 2 deletions(-) diff --git a/apps/mesh/src/api/routes/decopilot/routes.ts b/apps/mesh/src/api/routes/decopilot/routes.ts index 107b0d7689..20069e99bd 100644 --- a/apps/mesh/src/api/routes/decopilot/routes.ts +++ b/apps/mesh/src/api/routes/decopilot/routes.ts @@ -59,7 +59,10 @@ import type { HarnessId } from "@/harnesses"; import type { Thread } from "@/storage/types"; import { cancelThreadBackgroundJobs } from "@/harnesses/decopilot/background-tool-workflow"; import { abortBackgroundJobs } from "@/harnesses/decopilot/background-abort-registry"; -import { cancelThreadGateHead } from "@/dispatch-queue/thread-gate-queue"; +import { + cancelThreadGateHead, + listThreadGateQueue, +} from "@/dispatch-queue/thread-gate-queue"; // Per-connection /stream tail diagnostics. Flip to "1" in an environment where // the live stream intermittently delivers no chunks — logs the resolved @@ -792,6 +795,21 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { return c.json({ cancelled: true, async: true }, 202); }); + // ============================================================================ + // Queue List Endpoint — list the thread's pending gate workflows + // ============================================================================ + // + // GET /:org/decopilot/queue/:threadId + // + // List this thread's pending gate workflows (running head + queued tail) so + // the UI can show and cancel them. Owner-only. + + app.get("/:org/decopilot/queue/:threadId", async (c) => { + const { taskId } = await validateThreadOwnership(c); + const items = await listThreadGateQueue(taskId); + return c.json({ items }); + }); + // ============================================================================ // Stream Endpoint — tail the per-thread JetStream subject // ============================================================================ diff --git a/packages/e2e/tests/decopilot-thread-queue.spec.ts b/packages/e2e/tests/decopilot-thread-queue.spec.ts index b42355886f..76148936c2 100644 --- a/packages/e2e/tests/decopilot-thread-queue.spec.ts +++ b/packages/e2e/tests/decopilot-thread-queue.spec.ts @@ -1,5 +1,6 @@ // packages/e2e/tests/decopilot-thread-queue.spec.ts -import { test, expect } from "../fixtures/test"; +import { test, expect, newApiContext } from "../fixtures/test"; +import { signUpViaApi } from "../fixtures/auth-api"; import { connectDevDb } from "../fixtures/db"; import { callSelfMcpTool } from "../fixtures/mcp-tools"; import type { APIRequestContext } from "@playwright/test"; @@ -102,3 +103,70 @@ test("stop cancels a stuck PENDING gate head (frees the partition slot)", async await db.end(); } }); + +test("GET queue lists PENDING + ENQUEUED gate messages, oldest first", async ({ + authedPage, +}) => { + const { page, orgSlug } = authedPage; + const api = page.context().request; + const db = await connectDevDb(); + + try { + const { threadId } = await createAgentAndThread(api, orgSlug); + + await seedGate(db, threadId, { + id: "head", + status: "PENDING", + at: 1000, + text: "running one", + }); + await seedGate(db, threadId, { + id: "q2", + status: "ENQUEUED", + at: 3000, + text: "second", + }); + await seedGate(db, threadId, { + id: "q1", + status: "ENQUEUED", + at: 2000, + text: "first", + }); + + const res = await api.get(`/api/${orgSlug}/decopilot/queue/${threadId}`); + expect(res.status()).toBe(200); + const body = (await res.json()) as { + items: Array<{ messageId: string; status: string; text: string }>; + }; + expect(body.items.map((i) => i.messageId)).toEqual(["head", "q1", "q2"]); + expect(body.items[0]).toMatchObject({ + status: "running", + text: "running one", + }); + expect(body.items[1]).toMatchObject({ status: "queued", text: "first" }); + } finally { + await db.end(); + } +}); + +test("GET queue 403s for a non-owner", async ({ authedPage, playwright }) => { + const { page, orgSlug } = authedPage; + const ownerApi = page.context().request; + + // Create a thread owned by the primary user. + const { threadId } = await createAgentAndThread(ownerApi, orgSlug); + + // A second user who has no membership in orgSlug. + const otherCtx = await newApiContext(playwright); + await signUpViaApi(otherCtx); + + try { + // The other user tries to read the owner's thread queue — should 403. + const res = await otherCtx.get( + `/api/${orgSlug}/decopilot/queue/${threadId}`, + ); + expect(res.status()).toBe(403); + } finally { + await otherCtx.dispose(); + } +}); From ee8005fefe3e602fe25a7ce9cc8fd3eb2bf24da8 Mon Sep 17 00:00:00 2001 From: gimenes Date: Thu, 25 Jun 2026 18:46:45 -0300 Subject: [PATCH 06/13] feat(decopilot): per-item thread queue cancel endpoint Co-Authored-By: Claude Sonnet 4.6 --- apps/mesh/src/api/routes/decopilot/routes.ts | 35 +++++++++++++++ .../e2e/tests/decopilot-thread-queue.spec.ts | 45 +++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/apps/mesh/src/api/routes/decopilot/routes.ts b/apps/mesh/src/api/routes/decopilot/routes.ts index 20069e99bd..c69ad97d30 100644 --- a/apps/mesh/src/api/routes/decopilot/routes.ts +++ b/apps/mesh/src/api/routes/decopilot/routes.ts @@ -61,6 +61,7 @@ import { cancelThreadBackgroundJobs } from "@/harnesses/decopilot/background-too import { abortBackgroundJobs } from "@/harnesses/decopilot/background-abort-registry"; import { cancelThreadGateHead, + cancelThreadGateWorkflow, listThreadGateQueue, } from "@/dispatch-queue/thread-gate-queue"; @@ -810,6 +811,40 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { return c.json({ items }); }); + // ============================================================================ + // Queue Per-Item Cancel Endpoint — cancel one pending gate workflow + // ============================================================================ + // + // POST /:org/decopilot/queue/:threadId/cancel/:workflowId + // + // Cancel one pending gate item. A queued (ENQUEUED) item is just removed; the + // running head (PENDING) additionally runs the full run-cancel teardown. + + app.post("/:org/decopilot/queue/:threadId/cancel/:workflowId", async (c) => { + const { ctx, taskId, thread, organization, userId } = + await validateThreadOwnership(c); + const workflowId = c.req.param("workflowId"); + + // The item must currently be pending for THIS thread (prefix-scoped list). + const items = await listThreadGateQueue(taskId); + const target = items.find((i) => i.workflowId === workflowId); + if (!target) return c.body(null, 404); + + const ok = await cancelThreadGateWorkflow(taskId, workflowId); + if (!ok) return c.body(null, 404); + + if (target.status === "running") { + await cancelActiveThreadRun({ + ctx, + taskId, + thread, + organization, + userId, + }); + } + return c.json({ cancelled: true }, 202); + }); + // ============================================================================ // Stream Endpoint — tail the per-thread JetStream subject // ============================================================================ diff --git a/packages/e2e/tests/decopilot-thread-queue.spec.ts b/packages/e2e/tests/decopilot-thread-queue.spec.ts index 76148936c2..0211c766d1 100644 --- a/packages/e2e/tests/decopilot-thread-queue.spec.ts +++ b/packages/e2e/tests/decopilot-thread-queue.spec.ts @@ -170,3 +170,48 @@ test("GET queue 403s for a non-owner", async ({ authedPage, playwright }) => { await otherCtx.dispose(); } }); + +test("POST cancel removes a queued item", async ({ authedPage }) => { + const { page, orgSlug } = authedPage; + const api = page.context().request; + const db = await connectDevDb(); + + try { + const { threadId } = await createAgentAndThread(api, orgSlug); + + const wfId = await seedGate(db, threadId, { + id: "to-cancel", + status: "ENQUEUED", + at: Date.now(), + text: "x", + }); + + const res = await api.post( + `/api/${orgSlug}/decopilot/queue/${threadId}/cancel/${encodeURIComponent(wfId)}`, + ); + expect(res.status()).toBe(202); + + const { rows } = await db.query( + `SELECT status FROM dbos.workflow_status WHERE workflow_uuid = $1`, + [wfId], + ); + expect((rows[0] as { status: string }).status).toBe("CANCELLED"); + } finally { + await db.end(); + } +}); + +test("POST cancel 404s for a workflowId scoped to another thread", async ({ + authedPage, +}) => { + const { page, orgSlug } = authedPage; + const api = page.context().request; + + const { threadId } = await createAgentAndThread(api, orgSlug); + + const foreign = encodeURIComponent("thread-run:someone-else:msg-1"); + const res = await api.post( + `/api/${orgSlug}/decopilot/queue/${threadId}/cancel/${foreign}`, + ); + expect(res.status()).toBe(404); +}); From 0cdc3d2b1ddbfc00be313c327abcf78f1e101497 Mon Sep 17 00:00:00 2001 From: gimenes Date: Thu, 25 Jun 2026 18:53:32 -0300 Subject: [PATCH 07/13] feat(decopilot): externalize message attachments at POST to bound the DBOS input Co-Authored-By: Claude Sonnet 4.6 --- .../decopilot/file-materializer.test.ts | 38 ++++++++++ .../api/routes/decopilot/file-materializer.ts | 25 +++++++ apps/mesh/src/api/routes/decopilot/routes.ts | 10 +++ .../e2e/tests/decopilot-thread-queue.spec.ts | 74 +++++++++++++++++++ 4 files changed, 147 insertions(+) create mode 100644 apps/mesh/src/api/routes/decopilot/file-materializer.test.ts diff --git a/apps/mesh/src/api/routes/decopilot/file-materializer.test.ts b/apps/mesh/src/api/routes/decopilot/file-materializer.test.ts new file mode 100644 index 0000000000..5b583743ec --- /dev/null +++ b/apps/mesh/src/api/routes/decopilot/file-materializer.test.ts @@ -0,0 +1,38 @@ +import { describe, expect, it } from "bun:test"; +import { messageHasAttachmentAnnotation } from "./file-materializer"; + +const msg = (text: string) => + ({ id: "m", role: "user", parts: [{ type: "text", text }] }) as never; + +describe("messageHasAttachmentAnnotation", () => { + it("detects the sandbox-attachment annotation", () => { + expect( + messageHasAttachmentAnnotation( + msg( + "[Attached files — already inside your sandbox]\n- a.png: org/upload/a.png", + ), + ), + ).toBe(true); + }); + it("detects the uploaded-files annotation", () => { + expect( + messageHasAttachmentAnnotation( + msg( + "[Uploaded files — use these URLs when calling tools]\n- a.png: mesh-storage:k", + ), + ), + ).toBe(true); + }); + it("returns false for ordinary text", () => { + expect(messageHasAttachmentAnnotation(msg("hello world"))).toBe(false); + }); + it("returns false when there are no text parts", () => { + expect( + messageHasAttachmentAnnotation({ + id: "m", + role: "user", + parts: [{ type: "file", url: "mesh-storage:k" }], + } as never), + ).toBe(false); + }); +}); diff --git a/apps/mesh/src/api/routes/decopilot/file-materializer.ts b/apps/mesh/src/api/routes/decopilot/file-materializer.ts index 3b9c3b9e15..ce1448ec22 100644 --- a/apps/mesh/src/api/routes/decopilot/file-materializer.ts +++ b/apps/mesh/src/api/routes/decopilot/file-materializer.ts @@ -156,6 +156,26 @@ export async function generatePresignedGetUrl( // Phase 1 — uploadFileParts // ============================================================================ +/** Sentinels that uploadFileParts injects after a successful materialization. */ +const ATTACHMENT_ANNOTATION_PREFIXES = ["[Attached files", "[Uploaded files"]; + +/** + * True when a message already carries an attachment annotation — i.e. it was + * already materialized by a prior uploadFileParts pass. Makes uploadFileParts + * idempotent so running it at POST *and* at dispatch can't double-annotate. + */ +export function messageHasAttachmentAnnotation(message: ChatMessage): boolean { + return message.parts.some( + (p) => + p.type === "text" && + "text" in p && + typeof p.text === "string" && + ATTACHMENT_ANNOTATION_PREFIXES.some((s) => + p.text.trimStart().startsWith(s), + ), + ); +} + /** The composer's attachment marker: `[file:://]`. * File parts carry it as their `filename`; text-file attachments are decoded * inline as a text part of `\n` (see web derive-parts.ts). */ @@ -276,6 +296,11 @@ export async function uploadFileParts( if (lastUserIdx === -1) return messages; const message = messages[lastUserIdx]!; + + // Idempotent: a message already annotated was materialized on a prior pass + // (e.g. at POST); skip so the dispatch-time pass can't re-upload / re-annotate. + if (messageHasAttachmentAnnotation(message)) return messages; + const dataUrlParts = message.parts.filter( (p) => p.type === "file" && diff --git a/apps/mesh/src/api/routes/decopilot/routes.ts b/apps/mesh/src/api/routes/decopilot/routes.ts index c69ad97d30..73ec394e66 100644 --- a/apps/mesh/src/api/routes/decopilot/routes.ts +++ b/apps/mesh/src/api/routes/decopilot/routes.ts @@ -43,6 +43,7 @@ import { StreamRequestSchema } from "./schemas"; import type { ChatMessage, ModelsConfig } from "./types"; import type { DispatchRunInput } from "./dispatch-run"; import { resolveHarnessId } from "./dispatch-run"; +import { uploadFileParts } from "./file-materializer"; import { stringifyError } from "@decocms/harness/decopilot/stream-error"; import { enqueueThreadRun } from "@/dispatch-queue"; import { @@ -676,11 +677,20 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { const target = resolveDispatchTarget({ sandboxProviderKind: pinnedKind }); const { abortSignal: _ignored, ...rest } = input; + // Externalize attachments NOW (data: base64 → mesh-storage: refs) so the + // gate workflow input persisted in dbos.workflow_inputs stays small. The + // dispatch-time uploadFileParts is now an idempotent no-op for these. + const materializedMessages = await uploadFileParts(input.messages, ctx, { + threadId: taskId, + }); const serializableRequest = { ...rest, + messages: materializedMessages, target, harnessId: pinnedHarness, }; + // computeIdempotencyKey reads input.messages (the originals) so the + // workflowID is unaffected by materialization (data: → mesh-storage: swap). const lastMsg = input.messages[input.messages.length - 1]; const idempotencyKey = computeIdempotencyKey(lastMsg); const workflowID = idempotencyKey diff --git a/packages/e2e/tests/decopilot-thread-queue.spec.ts b/packages/e2e/tests/decopilot-thread-queue.spec.ts index 0211c766d1..271937f933 100644 --- a/packages/e2e/tests/decopilot-thread-queue.spec.ts +++ b/packages/e2e/tests/decopilot-thread-queue.spec.ts @@ -215,3 +215,77 @@ test("POST cancel 404s for a workflowId scoped to another thread", async ({ ); expect(res.status()).toBe(404); }); + +test("POST externalizes a data: attachment out of the DBOS workflow input", async ({ + authedPage, +}) => { + // Object storage (S3 + org-fs) must be available for this assertion to hold: + // uploadFileParts only materializes when ctx.orgFs is present, which requires + // a real S3-backed objectStorage. Skip on plain local dev without MinIO. + test.skip( + !( + process.env.S3_ENDPOINT && + process.env.S3_BUCKET && + process.env.S3_ACCESS_KEY_ID && + process.env.S3_SECRET_ACCESS_KEY + ), + "requires S3 env (S3_ENDPOINT/S3_BUCKET/S3_ACCESS_KEY_ID/S3_SECRET_ACCESS_KEY); see e2e.yml MinIO setup", + ); + + const { page, orgSlug } = authedPage; + const api = page.context().request; + const db = await connectDevDb(); + + try { + const { agentId, threadId } = await createAgentAndThread(api, orgSlug); + + // A tiny 1×1 PNG as a data: URL — represents a user-attached image. + const tinyPng = + "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="; + const messageId = `img-${Date.now()}`; + + const res = await api.post( + `/api/${orgSlug}/decopilot/threads/${threadId}/messages`, + { + data: { + messages: [ + { + id: messageId, + role: "user", + parts: [ + { type: "text", text: "describe this" }, + { + type: "file", + url: tinyPng, + mediaType: "image/png", + filename: "a.png", + }, + ], + }, + ], + agent: { id: agentId }, + branch: "ephemeral", + harnessId: "decopilot", + temperature: 0.5, + }, + headers: { "content-type": "application/json" }, + }, + ); + expect(res.status()).toBe(202); + + // The persisted gate input must NOT contain the base64 data: blob. + // `inputs` is a TEXT column (the {"json":[ctx]} envelope) — read as string. + const wfId = `thread-run:${threadId}:${messageId}`; + const { rows } = await db.query( + `SELECT inputs FROM dbos.workflow_status WHERE workflow_uuid = $1`, + [wfId], + ); + const serialized = String( + (rows[0] as { inputs: string } | undefined)?.inputs ?? "", + ); + expect(serialized).not.toContain("data:image/png;base64,"); + expect(serialized).toContain("mesh-storage:"); + } finally { + await db.end(); + } +}); From 82866ea2ea2ec30a8dcb3d6958e9f1cd1675c15e Mon Sep 17 00:00:00 2001 From: gimenes Date: Thu, 25 Jun 2026 18:58:12 -0300 Subject: [PATCH 08/13] feat(chat): thread queue data hooks + invalidate on send Co-Authored-By: Claude Sonnet 4.6 --- .../src/web/components/chat/chat-context.tsx | 3 + .../web/components/chat/use-thread-queue.ts | 67 +++++++++++++++++++ apps/mesh/src/web/lib/query-keys.ts | 2 + 3 files changed, 72 insertions(+) create mode 100644 apps/mesh/src/web/components/chat/use-thread-queue.ts diff --git a/apps/mesh/src/web/components/chat/chat-context.tsx b/apps/mesh/src/web/components/chat/chat-context.tsx index dff6d91f20..e6eb6da775 100644 --- a/apps/mesh/src/web/components/chat/chat-context.tsx +++ b/apps/mesh/src/web/components/chat/chat-context.tsx @@ -1106,6 +1106,9 @@ export function ActiveTaskProvider({ }), }, ); + queryClient.invalidateQueries({ + queryKey: KEYS.threadQueue(capturedTaskId), + }); } // Cancel run diff --git a/apps/mesh/src/web/components/chat/use-thread-queue.ts b/apps/mesh/src/web/components/chat/use-thread-queue.ts new file mode 100644 index 0000000000..4090729921 --- /dev/null +++ b/apps/mesh/src/web/components/chat/use-thread-queue.ts @@ -0,0 +1,67 @@ +import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; +import { useProjectContext } from "@decocms/mesh-sdk"; +import { toast } from "sonner"; +import { KEYS } from "../../lib/query-keys"; + +export interface QueueItemDTO { + workflowId: string; + messageId: string; + text: string; + status: "running" | "queued"; + enqueuedAt: number; +} + +/** + * Poll this thread's pending gate queue. Polls every 3s while the run is active + * or items exist (cheap, bounded), otherwise idles (re-fetched on send via the + * KEYS.threadQueue invalidation in chat-context). No useEffect (lint ban). + */ +export function useThreadQueue( + taskId: string, + opts: { active: boolean }, +): { items: QueueItemDTO[] } { + const { org } = useProjectContext(); + const { data } = useQuery({ + queryKey: KEYS.threadQueue(taskId), + enabled: Boolean(taskId), + staleTime: 0, + refetchInterval: (query) => { + const n = + (query.state.data as { items?: QueueItemDTO[] } | undefined)?.items + ?.length ?? 0; + return opts.active || n > 0 ? 3000 : false; + }, + queryFn: async (): Promise<{ items: QueueItemDTO[] }> => { + const res = await fetch(`/api/${org.slug}/decopilot/queue/${taskId}`, { + credentials: "include", + }); + if (!res.ok) return { items: [] }; + return (await res.json()) as { items: QueueItemDTO[] }; + }, + }); + return { items: data?.items ?? [] }; +} + +/** Cancel one queued/running gate item, then refresh the queue. */ +export function useCancelQueuedMessage( + taskId: string, +): (workflowId: string) => void { + const { org } = useProjectContext(); + const queryClient = useQueryClient(); + const mutation = useMutation({ + mutationFn: async (workflowId: string) => { + const res = await fetch( + `/api/${org.slug}/decopilot/queue/${taskId}/cancel/${encodeURIComponent(workflowId)}`, + { method: "POST", credentials: "include" }, + ); + if (!res.ok && res.status !== 404) { + throw new Error(`Cancel failed: ${res.status}`); + } + }, + onSettled: () => + queryClient.invalidateQueries({ queryKey: KEYS.threadQueue(taskId) }), + onError: (err) => + toast.error(err instanceof Error ? err.message : "Failed to cancel"), + }); + return (workflowId: string) => mutation.mutate(workflowId); +} diff --git a/apps/mesh/src/web/lib/query-keys.ts b/apps/mesh/src/web/lib/query-keys.ts index ba97b81609..2aa756694c 100644 --- a/apps/mesh/src/web/lib/query-keys.ts +++ b/apps/mesh/src/web/lib/query-keys.ts @@ -188,6 +188,8 @@ export const KEYS = { threadSandbox: (orgKey: string, taskId: string | undefined) => ["thread-sandbox", "v2", orgKey, taskId] as const, threadOutputs: (threadId: string) => ["thread-outputs", threadId] as const, + threadQueue: (threadId: string) => + ["decopilot", "thread-queue", threadId] as const, // Fetched text content of a previewed file (FilePreview), keyed by URL. fileText: (downloadUrl: string) => ["file-text", downloadUrl] as const, // First bytes of a CSV/TSV file for the card thumbnail (range request). From d8498ec6355698c1b01aa0bfbfb46d8730315c58 Mon Sep 17 00:00:00 2001 From: gimenes Date: Thu, 25 Jun 2026 19:01:57 -0300 Subject: [PATCH 09/13] feat(chat): queued-messages panel above the composer Co-Authored-By: Claude Sonnet 4.6 --- apps/mesh/src/web/components/chat/input.tsx | 8 +++ .../src/web/components/chat/queue-panel.tsx | 66 +++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 apps/mesh/src/web/components/chat/queue-panel.tsx diff --git a/apps/mesh/src/web/components/chat/input.tsx b/apps/mesh/src/web/components/chat/input.tsx index 346b30c8c2..6d51ffb9f7 100644 --- a/apps/mesh/src/web/components/chat/input.tsx +++ b/apps/mesh/src/web/components/chat/input.tsx @@ -67,6 +67,7 @@ import { ConnectionsBanner } from "./connections-banner"; import { useVoiceInput } from "@/web/hooks/use-voice-input.ts"; import { VoiceWaveform } from "./voice-input"; import { shouldRenderInlineModeRow } from "./input-mode-row"; +import { ThreadQueuePanel } from "./queue-panel"; // ============================================================================ // useWindowFileDrop - Reusable hook for window-level file drag & drop @@ -460,6 +461,13 @@ export function ChatInput({ absent on the home composer. */} {stream && taskCtx && } + {stream && taskCtx && taskId ? ( + + ) : null} +