diff --git a/apps/mesh/src/api/routes/decopilot/file-materializer.test.ts b/apps/mesh/src/api/routes/decopilot/file-materializer.test.ts new file mode 100644 index 0000000000..5b583743ec --- /dev/null +++ b/apps/mesh/src/api/routes/decopilot/file-materializer.test.ts @@ -0,0 +1,38 @@ +import { describe, expect, it } from "bun:test"; +import { messageHasAttachmentAnnotation } from "./file-materializer"; + +const msg = (text: string) => + ({ id: "m", role: "user", parts: [{ type: "text", text }] }) as never; + +describe("messageHasAttachmentAnnotation", () => { + it("detects the sandbox-attachment annotation", () => { + expect( + messageHasAttachmentAnnotation( + msg( + "[Attached files — already inside your sandbox]\n- a.png: org/upload/a.png", + ), + ), + ).toBe(true); + }); + it("detects the uploaded-files annotation", () => { + expect( + messageHasAttachmentAnnotation( + msg( + "[Uploaded files — use these URLs when calling tools]\n- a.png: mesh-storage:k", + ), + ), + ).toBe(true); + }); + it("returns false for ordinary text", () => { + expect(messageHasAttachmentAnnotation(msg("hello world"))).toBe(false); + }); + it("returns false when there are no text parts", () => { + expect( + messageHasAttachmentAnnotation({ + id: "m", + role: "user", + parts: [{ type: "file", url: "mesh-storage:k" }], + } as never), + ).toBe(false); + }); +}); diff --git a/apps/mesh/src/api/routes/decopilot/file-materializer.ts b/apps/mesh/src/api/routes/decopilot/file-materializer.ts index 3b9c3b9e15..ce1448ec22 100644 --- a/apps/mesh/src/api/routes/decopilot/file-materializer.ts +++ b/apps/mesh/src/api/routes/decopilot/file-materializer.ts @@ -156,6 +156,26 @@ export async function generatePresignedGetUrl( // Phase 1 — uploadFileParts // ============================================================================ +/** Sentinels that uploadFileParts injects after a successful materialization. */ +const ATTACHMENT_ANNOTATION_PREFIXES = ["[Attached files", "[Uploaded files"]; + +/** + * True when a message already carries an attachment annotation — i.e. it was + * already materialized by a prior uploadFileParts pass. Makes uploadFileParts + * idempotent so running it at POST *and* at dispatch can't double-annotate. + */ +export function messageHasAttachmentAnnotation(message: ChatMessage): boolean { + return message.parts.some( + (p) => + p.type === "text" && + "text" in p && + typeof p.text === "string" && + ATTACHMENT_ANNOTATION_PREFIXES.some((s) => + p.text.trimStart().startsWith(s), + ), + ); +} + /** The composer's attachment marker: `[file:://]`. * File parts carry it as their `filename`; text-file attachments are decoded * inline as a text part of `\n` (see web derive-parts.ts). */ @@ -276,6 +296,11 @@ export async function uploadFileParts( if (lastUserIdx === -1) return messages; const message = messages[lastUserIdx]!; + + // Idempotent: a message already annotated was materialized on a prior pass + // (e.g. at POST); skip so the dispatch-time pass can't re-upload / re-annotate. + if (messageHasAttachmentAnnotation(message)) return messages; + const dataUrlParts = message.parts.filter( (p) => p.type === "file" && diff --git a/apps/mesh/src/api/routes/decopilot/routes.ts b/apps/mesh/src/api/routes/decopilot/routes.ts index 56ece62bf1..ed64c7349f 100644 --- a/apps/mesh/src/api/routes/decopilot/routes.ts +++ b/apps/mesh/src/api/routes/decopilot/routes.ts @@ -43,6 +43,7 @@ import { StreamRequestSchema } from "./schemas"; import type { ChatMessage, ModelsConfig } from "./types"; import type { DispatchRunInput } from "./dispatch-run"; import { resolveHarnessId } from "./dispatch-run"; +import { uploadFileParts } from "./file-materializer"; import { stringifyError } from "@decocms/harness/decopilot/stream-error"; import { enqueueThreadRun } from "@/dispatch-queue"; import { @@ -59,6 +60,11 @@ import type { HarnessId } from "@/harnesses"; import type { Thread } from "@/storage/types"; import { cancelThreadBackgroundJobs } from "@/harnesses/decopilot/background-tool-workflow"; import { abortBackgroundJobs } from "@/harnesses/decopilot/background-abort-registry"; +import { + cancelThreadGateHead, + cancelThreadGateWorkflow, + listThreadGateQueue, +} from "@/dispatch-queue/thread-gate-queue"; // Per-connection /stream tail diagnostics. Flip to "1" in an environment where // the live stream intermittently delivers no chunks — logs the resolved @@ -671,11 +677,20 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { const target = resolveDispatchTarget({ sandboxProviderKind: pinnedKind }); const { abortSignal: _ignored, ...rest } = input; + // Externalize attachments NOW (data: base64 → mesh-storage: refs) so the + // gate workflow input persisted in dbos.workflow_inputs stays small. The + // dispatch-time uploadFileParts is now an idempotent no-op for these. + const materializedMessages = await uploadFileParts(input.messages, ctx, { + threadId: taskId, + }); const serializableRequest = { ...rest, + messages: materializedMessages, target, harnessId: pinnedHarness, }; + // computeIdempotencyKey reads input.messages (the originals) so the + // workflowID is unaffected by materialization (data: → mesh-storage: swap). const lastMsg = input.messages[input.messages.length - 1]; const idempotencyKey = computeIdempotencyKey(lastMsg); const workflowID = idempotencyKey @@ -722,55 +737,46 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { // Cancel Endpoint — cancel ongoing run (local or via NATS to owning pod) // ============================================================================ - app.post("/:org/decopilot/cancel/:threadId", async (c) => { - const { ctx, taskId, thread, organization, userId } = - await validateThreadOwnership(c); - - // Persist durable cancel flag so the ingest backstop rejects 409. + // Shared run-cancel teardown: durable cancel flag, background-job teardown, + // in-pod abort + cross-pod NATS broadcast, local turn cancel, ghost force-fail. + // Used by the stop endpoint and the per-item queue cancel (running head). + async function cancelActiveThreadRun(args: { + ctx: StudioContext; + taskId: string; + thread: Thread; + organization: { id: string }; + userId: string; + }): Promise { + const { ctx, taskId, thread, organization, userId } = args; await ctx.storage.threads.setCancelRequested(taskId, organization.id); - - // Tear down any background-tool workflows this thread started (image gen / - // backgrounded subtasks). They run on their own DBOS queue, so the - // in-memory run cancel below doesn't reach them. Non-fatal: a DBOS hiccup - // must not block the user-facing cancel. await cancelThreadBackgroundJobs(taskId).catch((err) => { console.error("[decopilot:cancel] failed to cancel background jobs", { taskId, err, }); }); - - // Abort in-flight background work on this pod now, and fan the cancel out - // to every pod over NATS. Always broadcast: a background job runs on - // whichever pod DBOS dequeued it, so a locally-owned turn no longer implies - // no other pod is involved. Each pod's onCancel aborts its background - // controllers (`abortBackgroundJobs`) and cancels the live turn if it owns - // it; both are no-ops where they don't apply. + // Free a stranded/stuck PENDING gate head so the partition slot releases and + // the queue can proceed (an orphaned gate ignores the in-memory run cancel). + await cancelThreadGateHead(taskId).catch((err) => { + console.error("[decopilot:cancel] failed to cancel gate head", { + taskId, + err, + }); + }); abortBackgroundJobs(taskId); cancelBroadcast.broadcast(taskId); - - // Try to cancel the live turn locally for an immediate response. const cancelTransitions = await runRegistry.execute({ type: "CANCEL", taskId, }); - if (cancelTransitions.some((t) => t.event.type === "RUN_FAILED")) { - cancelBroadcast.publishControlFrame(userId, { - type: "cancel", - runId: taskId, - }); - return c.json({ cancelled: true }); - } - cancelBroadcast.publishControlFrame(userId, { type: "cancel", runId: taskId, }); - - // Ghost run: server restarted while a run was in progress. No pod has this - // run in memory, so the broadcast will never resolve. Force-fail the thread - // in the DB so the user can send new messages. - if (thread.status === "in_progress") { + const producedRunFailed = cancelTransitions.some( + (t) => t.event.type === "RUN_FAILED", + ); + if (!producedRunFailed && thread.status === "in_progress") { console.warn("[decopilot:cancel] Ghost run detected, force-failing", { taskId, }); @@ -791,10 +797,70 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { ); }); } + } + app.post("/:org/decopilot/cancel/:threadId", async (c) => { + const { ctx, taskId, thread, organization, userId } = + await validateThreadOwnership(c); + await cancelActiveThreadRun({ ctx, taskId, thread, organization, userId }); return c.json({ cancelled: true, async: true }, 202); }); + // ============================================================================ + // Queue List Endpoint — list the thread's pending gate workflows + // ============================================================================ + // + // GET /:org/decopilot/queue/:threadId + // + // List this thread's pending gate workflows (running head + queued tail) so + // the UI can show and cancel them. Owner-only. + + app.get("/:org/decopilot/queue/:threadId", async (c) => { + const { taskId } = await validateThreadOwnership(c); + const items = await listThreadGateQueue(taskId); + return c.json({ items }); + }); + + // ============================================================================ + // Queue Per-Item Cancel Endpoint — cancel one pending gate workflow + // ============================================================================ + // + // POST /:org/decopilot/queue/:threadId/cancel/:workflowId + // + // Cancel one pending gate item. A queued (ENQUEUED) item is just removed; the + // running head (PENDING) additionally runs the full run-cancel teardown. + + app.post("/:org/decopilot/queue/:threadId/cancel/:workflowId", async (c) => { + const { ctx, taskId, thread, organization, userId } = + await validateThreadOwnership(c); + const workflowId = c.req.param("workflowId"); + + // The item must currently be pending for THIS thread (prefix-scoped list). + const items = await listThreadGateQueue(taskId); + const target = items.find((i) => i.workflowId === workflowId); + if (!target) return c.body(null, 404); + + const ok = await cancelThreadGateWorkflow(taskId, workflowId); + if (!ok) return c.body(null, 404); + + // For a running head, also run the full run-cancel teardown (abort + NATS + // broadcast + run-registry CANCEL). cancelActiveThreadRun re-cancels the + // gate head via cancelThreadGateHead — that re-cancel is idempotent + // (DBOS.cancelWorkflows on an already-cancelled workflow is a no-op), so the + // explicit cancelThreadGateWorkflow above covers the queued case and this + // covers the live run. + if (target.status === "running") { + await cancelActiveThreadRun({ + ctx, + taskId, + thread, + organization, + userId, + }); + } + return c.json({ cancelled: true }, 202); + }); + // ============================================================================ // Stream Endpoint — tail the per-thread JetStream subject // ============================================================================ diff --git a/apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts b/apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts new file mode 100644 index 0000000000..7e24a91822 --- /dev/null +++ b/apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts @@ -0,0 +1,93 @@ +// apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts +import { describe, expect, it } from "bun:test"; +import { + extractUserMessageText, + gateStatusToQueueItem, +} from "./thread-gate-queue"; + +const userMsg = (text: string) => ({ + id: "m1", + role: "user" as const, + parts: [{ type: "text" as const, text }], +}); + +describe("extractUserMessageText", () => { + it("returns the concatenated text parts of the last user message", () => { + const messages = [ + { id: "a", role: "assistant", parts: [{ type: "text", text: "hi" }] }, + { + id: "u", + role: "user", + parts: [ + { type: "text", text: "hello " }, + { type: "file", url: "mesh-storage:k", mediaType: "image/png" }, + { type: "text", text: "world" }, + ], + }, + ] as never; + expect(extractUserMessageText(messages)).toBe("hello world"); + }); + + it("returns empty string when there is no user message", () => { + const messages = [ + { id: "a", role: "assistant", parts: [{ type: "text", text: "hi" }] }, + ] as never; + expect(extractUserMessageText(messages)).toBe(""); + }); + + it("returns empty string for an empty array", () => { + expect(extractUserMessageText([] as never)).toBe(""); + }); +}); + +describe("gateStatusToQueueItem", () => { + const threadId = "11bda36e"; + const base = { + workflowID: `thread-run:${threadId}:msg-7`, + status: "ENQUEUED", + createdAt: 1782400000000, + input: [ + { + threadId, + request: { messages: [userMsg("queued text")] }, + source: "user-message", + }, + ], + }; + + it("maps an ENQUEUED gate to a queued item with parsed messageId + text", () => { + const item = gateStatusToQueueItem(base as never, threadId); + expect(item).toEqual({ + workflowId: `thread-run:${threadId}:msg-7`, + messageId: "msg-7", + text: "queued text", + status: "queued", + enqueuedAt: 1782400000000, + }); + }); + + it("maps a PENDING gate to status 'running'", () => { + const item = gateStatusToQueueItem( + { ...base, status: "PENDING" } as never, + threadId, + ); + expect(item?.status).toBe("running"); + }); + + it("returns null when the workflowID does not match the thread prefix", () => { + const item = gateStatusToQueueItem( + { ...base, workflowID: "thread-run:other:msg-7" } as never, + threadId, + ); + expect(item).toBeNull(); + }); + + it("tolerates a missing/unshaped input (empty text)", () => { + const item = gateStatusToQueueItem( + { ...base, input: undefined } as never, + threadId, + ); + expect(item?.text).toBe(""); + expect(item?.messageId).toBe("msg-7"); + }); +}); diff --git a/apps/mesh/src/dispatch-queue/thread-gate-queue.ts b/apps/mesh/src/dispatch-queue/thread-gate-queue.ts new file mode 100644 index 0000000000..8d364675a6 --- /dev/null +++ b/apps/mesh/src/dispatch-queue/thread-gate-queue.ts @@ -0,0 +1,109 @@ +// apps/mesh/src/dispatch-queue/thread-gate-queue.ts +import { DBOS } from "@dbos-inc/dbos-sdk"; +import type { WorkflowStatus } from "@dbos-inc/dbos-sdk"; +import type { ChatMessage } from "@/api/routes/decopilot/types"; +import type { ThreadGateContext } from "./thread-gate-workflow"; + +/** A pending message in a thread's gate queue, surfaced to the UI. */ +export interface ThreadQueueItem { + /** Full DBOS workflow id: `thread-run:{threadId}:{messageId}`. */ + workflowId: string; + /** Trailing segment of the workflow id (the user message id). */ + messageId: string; + /** Display text of the queued user message. */ + text: string; + /** PENDING (slot holder / running or stuck) → running; ENQUEUED → queued. */ + status: "running" | "queued"; + /** Epoch ms the gate was created/enqueued. */ + enqueuedAt: number; +} + +/** Concatenate the text parts of the last user message, concatenated and trimmed for display. Pure + total. */ +export function extractUserMessageText(messages: ChatMessage[]): string { + if (!Array.isArray(messages) || messages.length === 0) return ""; + const lastUserIdx = messages.findLastIndex((m) => m?.role === "user"); + if (lastUserIdx === -1) return ""; + const parts = messages[lastUserIdx]?.parts ?? []; + return parts + .map((p) => + p && typeof p === "object" && (p as { type?: string }).type === "text" + ? ((p as { text?: string }).text ?? "") + : "", + ) + .join("") + .trim(); +} + +/** + * Map one `thread-gate` WorkflowStatus row to a queue item. Returns null when + * the workflow id does not carry this thread's `thread-run:{threadId}:` prefix + * (defensive — listWorkflows is already prefix-filtered). + */ +export function gateStatusToQueueItem( + status: WorkflowStatus, + threadId: string, +): ThreadQueueItem | null { + const prefix = `thread-run:${threadId}:`; + if (!status.workflowID.startsWith(prefix)) return null; + const gateCtx = status.input?.[0] as ThreadGateContext | undefined; + const text = extractUserMessageText(gateCtx?.request?.messages ?? []); + return { + workflowId: status.workflowID, + messageId: status.workflowID.slice(prefix.length), + text, + // Caller (listThreadGateQueue) pre-filters to PENDING/ENQUEUED, so the + // else branch is ENQUEUED → "queued". + status: status.status === "PENDING" ? "running" : "queued", + enqueuedAt: status.createdAt, + }; +} + +/** + * List a thread's pending gate workflows (PENDING head + ENQUEUED tail) as + * UI queue items, oldest first. Reads `workflow_status` via DBOS.listWorkflows + * (the queue lives there — `workflow_queue` is unused in this DBOS version). + */ +export async function listThreadGateQueue( + threadId: string, +): Promise { + const rows = await DBOS.listWorkflows({ + workflow_id_prefix: `thread-run:${threadId}:`, + status: ["PENDING", "ENQUEUED"], + loadInput: true, + loadOutput: false, + }); + return rows + .map((r) => gateStatusToQueueItem(r, threadId)) + .filter((i): i is ThreadQueueItem => i !== null) + .sort((a, b) => a.enqueuedAt - b.enqueuedAt); +} + +/** + * Cancel one gate workflow, guarded by the thread prefix (the tenant authz — + * the caller has already verified thread ownership). Returns false when the id + * is not scoped to this thread (caller should 404). + */ +export async function cancelThreadGateWorkflow( + threadId: string, + workflowId: string, +): Promise { + if (!workflowId.startsWith(`thread-run:${threadId}:`)) return false; + await DBOS.cancelWorkflows([workflowId]); + return true; +} + +/** + * Cancel the thread's PENDING gate(s) — the slot holder(s). Used by the stop + * button so "stop" frees a stuck/wedged head; ENQUEUED items are left intact so + * the queue continues (Codex semantics). No-op when nothing is PENDING. + */ +export async function cancelThreadGateHead(threadId: string): Promise { + const pending = await DBOS.listWorkflows({ + workflow_id_prefix: `thread-run:${threadId}:`, + status: ["PENDING"], + loadInput: false, + loadOutput: false, + }); + if (pending.length === 0) return; + await DBOS.cancelWorkflows(pending.map((w) => w.workflowID)); +} diff --git a/apps/mesh/src/web/components/chat/chat-context.tsx b/apps/mesh/src/web/components/chat/chat-context.tsx index dff6d91f20..f76d7b849b 100644 --- a/apps/mesh/src/web/components/chat/chat-context.tsx +++ b/apps/mesh/src/web/components/chat/chat-context.tsx @@ -94,6 +94,18 @@ function statusToString(s: ConnStatus): ChatStreamContextValue["status"] { return s.kind; } +/** Concatenate the text parts of a message for the queue-row display. */ +function queueItemTextFromParts(parts: ChatMessage["parts"]): string { + return parts + .map((p) => + p && typeof p === "object" && (p as { type?: string }).type === "text" + ? ((p as { text?: string }).text ?? "") + : "", + ) + .join("") + .trim(); +} + import { useChatNavigation } from "./hooks/use-chat-navigation"; import { useThreadActions, useThreadManager } from "./store/hooks"; import { derivePartsFromTiptapDoc } from "./derive-parts"; @@ -105,6 +117,7 @@ import type { RunStatusStage } from "./run-status"; import { useLocalStorage } from "../../hooks/use-local-storage"; import { LOCALSTORAGE_KEYS } from "../../lib/localstorage-keys"; import { KEYS } from "../../lib/query-keys"; +import { enqueueMessage, refreshMessageQueue } from "./message-queue-store"; import { formatDeckTabId } from "@/web/layouts/main-panel-tabs/tab-id"; import { useSimpleMode } from "../../hooks/use-organization-settings"; @@ -937,6 +950,9 @@ export function ActiveTaskProvider({ }, onFinish: (message, _messages, finishReason) => { const cb = cbRef.current; + // Terminal event: the gate advanced — re-sync the queue so the + // dequeued message drops and the next one surfaces. + if (cb.taskId) void refreshMessageQueue(cb.orgSlug, cb.taskId); if (cb.taskId) { cb.manager.patchThread({ id: cb.taskId, @@ -999,8 +1015,30 @@ export function ActiveTaskProvider({ onToolCall: (event) => cbRef.current.onToolCall(event as never), }; conn.observer = observer; + + // Seed the frontend message queue from the gate on (re)mount so a reload + // or thread switch still shows what's queued. + void refreshMessageQueue(cbRef.current.orgSlug, cbRef.current.taskId); + + // Re-sync the queue on every SSE run start/end edge. A gate dequeue flips + // conn.status idle→active (the next queued message began streaming); a + // finish flips active→idle (the slot freed). This is the SSE-driven + // refresh — the queue panel updates exactly when the thread gate advances, + // off the same stream the chat uses. + const isActiveStatus = (k: ConnStatus["kind"]) => + k === "submitted" || k === "streaming"; + let prevActive = isActiveStatus(conn.status.get().kind); + const unsubStatus = conn.status.subscribe(() => { + const active = isActiveStatus(conn.status.get().kind); + if (active === prevActive) return; + prevActive = active; + const cb = cbRef.current; + void refreshMessageQueue(cb.orgSlug, cb.taskId); + }); + return () => { if (conn.observer === observer) conn.observer = null; + unsubStatus(); }; }, [conn]); @@ -1080,32 +1118,60 @@ export function ActiveTaskProvider({ metadata: messageMetadata, }; - await conn.submit( - { kind: "message", message: userMessage }, - { - tier: activeTier, - mode: modeToSend, - toolApprovalLevel: - preferences.toolApprovalLevel ?? readToolApprovalLevel(), - system: system || undefined, - agent: { id: capturedVirtualMcpId }, - thread_id: capturedTaskId, - ...resolveSubmitSettings({ - thread: activeTask - ? { - harness_id: activeTask.harness_id ?? null, - sandbox_provider_kind: activeTask.sandbox_provider_kind ?? null, - branch: activeTask.branch ?? null, - } - : null, - globals: { - harnessId: pendingHarnessId ?? undefined, - sandboxProviderKind: pendingSandboxProviderKind ?? undefined, - branch: currentBranch, - }, - }), - }, - ); + const requestOptions = { + tier: activeTier, + mode: modeToSend, + toolApprovalLevel: + preferences.toolApprovalLevel ?? readToolApprovalLevel(), + system: system || undefined, + agent: { id: capturedVirtualMcpId }, + thread_id: capturedTaskId, + ...resolveSubmitSettings({ + thread: activeTask + ? { + harness_id: activeTask.harness_id ?? null, + sandbox_provider_kind: activeTask.sandbox_provider_kind ?? null, + branch: activeTask.branch ?? null, + } + : null, + globals: { + harnessId: pendingHarnessId ?? undefined, + sandboxProviderKind: pendingSandboxProviderKind ?? undefined, + branch: currentBranch, + }, + }), + }; + + // A run is already streaming (this thread) or in progress (hosted): the + // message can't be the current turn, so queue it behind the running one. + // POST it to the gate WITHOUT an optimistic body row, then mirror it into + // the frontend message queue (it renders in the body when its turn + // dequeues and streams). Otherwise this is the current turn — submit + // normally so it renders in the body and streams immediately. + if (isStreaming || isRunInProgress) { + try { + await conn.enqueue(userMessage, requestOptions); + enqueueMessage(capturedTaskId, { + workflowId: `thread-run:${capturedTaskId}:${userMessage.id}`, + messageId: userMessage.id, + text: queueItemTextFromParts(parts), + status: "queued", + enqueuedAt: Date.now(), + }); + } catch (err) { + setChatError(err instanceof Error ? err : new Error(String(err))); + toast.error( + err instanceof Error ? err.message : "Failed to queue message", + ); + } + // Reconcile the optimistic row against the gate's authoritative list. + void refreshMessageQueue(org.slug, capturedTaskId); + } else { + await conn.submit( + { kind: "message", message: userMessage }, + requestOptions, + ); + } } // Cancel run diff --git a/apps/mesh/src/web/components/chat/composer-action.test.ts b/apps/mesh/src/web/components/chat/composer-action.test.ts new file mode 100644 index 0000000000..83f71e9123 --- /dev/null +++ b/apps/mesh/src/web/components/chat/composer-action.test.ts @@ -0,0 +1,64 @@ +import { describe, expect, it } from "bun:test"; +import { resolveComposerAction } from "./composer-action"; + +describe("resolveComposerAction", () => { + it("sends when there is a draft, even while a run streams (enqueue)", () => { + expect( + resolveComposerAction({ + hasDraft: true, + isStreaming: true, + isRunInProgress: false, + }), + ).toBe("send"); + }); + + it("sends when there is a draft and a hosted run is in progress", () => { + expect( + resolveComposerAction({ + hasDraft: true, + isStreaming: false, + isRunInProgress: true, + }), + ).toBe("send"); + }); + + it("sends when there is a draft and nothing is running", () => { + expect( + resolveComposerAction({ + hasDraft: true, + isStreaming: false, + isRunInProgress: false, + }), + ).toBe("send"); + }); + + it("offers stop when streaming with no draft", () => { + expect( + resolveComposerAction({ + hasDraft: false, + isStreaming: true, + isRunInProgress: false, + }), + ).toBe("stop"); + }); + + it("offers stop when a hosted run is in progress with no draft", () => { + expect( + resolveComposerAction({ + hasDraft: false, + isStreaming: false, + isRunInProgress: true, + }), + ).toBe("stop"); + }); + + it("is disabled when idle with no draft", () => { + expect( + resolveComposerAction({ + hasDraft: false, + isStreaming: false, + isRunInProgress: false, + }), + ).toBe("disabled"); + }); +}); diff --git a/apps/mesh/src/web/components/chat/composer-action.ts b/apps/mesh/src/web/components/chat/composer-action.ts new file mode 100644 index 0000000000..06b6c5b7d6 --- /dev/null +++ b/apps/mesh/src/web/components/chat/composer-action.ts @@ -0,0 +1,20 @@ +/** The composer's primary-button action. */ +export type ComposerAction = "send" | "stop" | "disabled"; + +/** + * Decide the composer's primary-button action. + * + * A draft always sends — even while a run streams or a hosted run is in + * progress — because a second message enqueues behind the running one on the + * thread gate (concurrency=1 serializes them). With no draft, an active run + * offers stop; otherwise the button is disabled. Pure + total. + */ +export function resolveComposerAction(state: { + hasDraft: boolean; + isStreaming: boolean; + isRunInProgress: boolean; +}): ComposerAction { + if (state.hasDraft) return "send"; + if (state.isStreaming || state.isRunInProgress) return "stop"; + return "disabled"; +} diff --git a/apps/mesh/src/web/components/chat/input.tsx b/apps/mesh/src/web/components/chat/input.tsx index 346b30c8c2..eca1b33f93 100644 --- a/apps/mesh/src/web/components/chat/input.tsx +++ b/apps/mesh/src/web/components/chat/input.tsx @@ -67,6 +67,8 @@ import { ConnectionsBanner } from "./connections-banner"; import { useVoiceInput } from "@/web/hooks/use-voice-input.ts"; import { VoiceWaveform } from "./voice-input"; import { shouldRenderInlineModeRow } from "./input-mode-row"; +import { ThreadQueuePanel } from "./queue-panel"; +import { resolveComposerAction } from "./composer-action"; // ============================================================================ // useWindowFileDrop - Reusable hook for window-level file drag & drop @@ -405,23 +407,25 @@ export function ChatInput({ // model's context is much fuller than it actually is. const lastTotalTokens = lastUsage?.contextTokens ?? 0; - const canSubmit = - !isStreaming && !isModelsLoading && !isTiptapDocEmpty(tiptapDoc); - - const showStopOrCancel = isStreaming || isRunInProgress; + // A draft sends even mid-run — the new message enqueues behind the running + // gate (concurrency=1 serializes the thread). Stop is offered only when + // there's nothing to send. `canSubmit`/`showStopOrCancel` are kept as the + // names the button/render logic below already references. + const hasDraft = !isModelsLoading && !isTiptapDocEmpty(tiptapDoc); + const composerAction = resolveComposerAction({ + hasDraft, + isStreaming, + isRunInProgress, + }); + const canSubmit = composerAction === "send"; + const showStopOrCancel = composerAction === "stop"; const showInlineModeRow = shouldRenderInlineModeRow({ messageCount: messages.length, showConnectionsBanner, }); const handleSubmit = (e?: FormEvent) => { e?.preventDefault(); - if (isStreaming) { - track("chat_message_stopped", { thread_id: taskId }); - stop(); - } else if (isRunInProgress) { - track("chat_message_stopped", { thread_id: taskId }); - stop(); - } else if (canSubmit && tiptapDoc) { + if (composerAction === "send" && tiptapDoc) { track("chat_message_sent", { thread_id: taskId || null, mode: chatMode, @@ -437,6 +441,9 @@ export function ChatInput({ } clearChatDraft(sessionStorage, locator, draftKey); setTiptapDoc(undefined); + } else if (composerAction === "stop") { + track("chat_message_stopped", { thread_id: taskId }); + stop(); } }; @@ -460,11 +467,15 @@ export function ChatInput({ absent on the home composer. */} {stream && taskCtx && } + {stream && taskCtx && taskId ? ( + + ) : null} + @@ -482,7 +493,7 @@ export function ChatInput({
{ track("connections_dialog_opened", { source: "tools_popover", @@ -565,7 +576,6 @@ export function ChatInput({ {chatMode === "gen-image" && imageModel && ( + + ))} + +
+ ); +} diff --git a/apps/mesh/src/web/components/chat/store/thread-connection.ts b/apps/mesh/src/web/components/chat/store/thread-connection.ts index 122957d073..62f6e1e23f 100644 --- a/apps/mesh/src/web/components/chat/store/thread-connection.ts +++ b/apps/mesh/src/web/components/chat/store/thread-connection.ts @@ -389,6 +389,17 @@ export class ThreadConnection { } } + /** + * Queue a user message behind the active run: POST it to the thread gate + * WITHOUT the optimistic local render or any run-status change. The message + * surfaces in the frontend message queue and renders in the body only when + * its turn dequeues and streams. Throws if the POST is rejected so the + * caller can skip the optimistic enqueue. + */ + async enqueue(message: UIMessage, opts: RequestOptions): Promise { + await this.post(message, opts, undefined, true); + } + stop(): void { this.inflightPost?.abort(); const s = this.status.get(); @@ -616,6 +627,7 @@ export class ThreadConnection { message: UIMessage, opts: RequestOptions, signal?: AbortSignal, + quiet = false, ): Promise { const { system, ...rest } = opts; // Attach the system prompt only on a user turn. Tool-output / approval @@ -661,7 +673,8 @@ export class ThreadConnection { const text = await resp.text().catch(() => ""); throw new Error(text || `POST /messages failed (${resp.status})`); } - if (this.runStatusStage.get() !== null) { + // A queued (quiet) POST must not touch the running turn's status display. + if (!quiet && this.runStatusStage.get() !== null) { this.setRunStatusStage("received"); } } diff --git a/apps/mesh/src/web/components/chat/use-message-queue.ts b/apps/mesh/src/web/components/chat/use-message-queue.ts new file mode 100644 index 0000000000..d1c565b620 --- /dev/null +++ b/apps/mesh/src/web/components/chat/use-message-queue.ts @@ -0,0 +1,55 @@ +import { useSyncExternalStore } from "react"; +import { useProjectContext } from "@decocms/mesh-sdk"; +import { toast } from "sonner"; +import type { QueueItemDTO } from "./queue-items"; +import { + enqueueMessage, + messageQueueStore, + refreshMessageQueue, + removeMessage, +} from "./message-queue-store"; + +/** + * Subscribe to a thread's frontend message queue (all pending gate items, + * head included — the panel narrows to the waiting tail via + * `selectWaitingQueueItems`). Re-renders whenever the store changes. + */ +export function useMessageQueue(threadId: string): QueueItemDTO[] { + const store = messageQueueStore(threadId); + return useSyncExternalStore(store.subscribe, store.get, store.get); +} + +export interface MessageQueueActions { + /** Optimistically append a just-sent message. */ + enqueue: (threadId: string, item: QueueItemDTO) => void; + /** Cancel a queued message: drop it locally, POST the cancel, re-sync. */ + cancel: (threadId: string, messageId: string) => Promise; + /** Re-fetch the server's queue and replace the local store. */ + refresh: (threadId: string) => Promise; +} + +/** Mutators over the frontend message queue, bound to the active org. */ +export function useMessageQueueActions(): MessageQueueActions { + const { org } = useProjectContext(); + return { + enqueue: enqueueMessage, + refresh: (threadId) => refreshMessageQueue(org.slug, threadId), + cancel: async (threadId, messageId) => { + const workflowId = `thread-run:${threadId}:${messageId}`; + removeMessage(threadId, messageId); // optimistic + try { + const res = await fetch( + `/api/${encodeURIComponent(org.slug)}/decopilot/queue/${encodeURIComponent(threadId)}/cancel/${encodeURIComponent(workflowId)}`, + { method: "POST", credentials: "include" }, + ); + if (!res.ok && res.status !== 404) { + throw new Error(`Cancel failed: ${res.status}`); + } + } catch (err) { + toast.error(err instanceof Error ? err.message : "Failed to cancel"); + } finally { + await refreshMessageQueue(org.slug, threadId); + } + }, + }; +} diff --git a/packages/e2e/tests/decopilot-thread-queue.spec.ts b/packages/e2e/tests/decopilot-thread-queue.spec.ts new file mode 100644 index 0000000000..733c752a90 --- /dev/null +++ b/packages/e2e/tests/decopilot-thread-queue.spec.ts @@ -0,0 +1,296 @@ +// packages/e2e/tests/decopilot-thread-queue.spec.ts +import { test, expect, newApiContext } from "../fixtures/test"; +import { signUpViaApi } from "../fixtures/auth-api"; +import { connectDevDb } from "../fixtures/db"; +import { callSelfMcpTool } from "../fixtures/mcp-tools"; +import type { APIRequestContext } from "@playwright/test"; + +// NOTE: org/user/thread setup pattern mirrors decopilot-messages.spec.ts. + +/** Create a real agent (virtual MCP) + thread row; returns both ids. */ +async function createAgentAndThread( + api: APIRequestContext, + orgSlug: string, +): Promise<{ agentId: string; threadId: string }> { + const agent = await callSelfMcpTool<{ item: { id: string } }>( + api, + orgSlug, + "COLLECTION_VIRTUAL_MCP_CREATE", + { + data: { + title: "E2E Thread Queue Agent", + connections: [], + status: "active", + pinned: false, + }, + }, + ); + const thread = await callSelfMcpTool<{ item: { id: string } }>( + api, + orgSlug, + "COLLECTION_THREADS_CREATE", + { + data: { virtual_mcp_id: agent.item.id, title: "E2E Thread Queue Thread" }, + }, + ); + return { agentId: agent.item.id, threadId: thread.item.id }; +} + +/** + * Seed a thread-gate workflow_status row directly (DBOS-aware fixture). `inputs` + * MUST be the DBOS serialization envelope {"json":[]} so + * listWorkflows({loadInput:true}) can parse it. Column set matches + * decopilot-projector-dbos.spec.ts conventions. + */ +async function seedGate( + db: { + query: (sql: string, params: unknown[]) => Promise<{ rows: unknown[] }>; + }, + threadId: string, + o: { id: string; status: "PENDING" | "ENQUEUED"; at: number; text: string }, +): Promise { + const wfId = `thread-run:${threadId}:${o.id}`; + const ctx = { + threadId, + request: { + messages: [ + { id: o.id, role: "user", parts: [{ type: "text", text: o.text }] }, + ], + }, + source: "user-message", + }; + await db.query( + `INSERT INTO dbos.workflow_status + (workflow_uuid, status, name, class_name, config_name, queue_name, + executor_id, application_version, application_id, recovery_attempts, + priority, created_at, updated_at, inputs) + VALUES ($1,$2,'threadGateWorkflow','','','thread-gate', + 'seed-exec','seed-version','seed-app',0,0,$3,$3,$4)`, + [wfId, o.status, String(o.at), JSON.stringify({ json: [ctx] })], + ); + return wfId; +} + +test("stop cancels a stuck PENDING gate head (frees the partition slot)", async ({ + authedPage, +}) => { + const { page, orgSlug } = authedPage; + const api = page.context().request; + const db = await connectDevDb(); + + try { + const { threadId } = await createAgentAndThread(api, orgSlug); + + // Arrange: a deploy-stranded head holding the slot. + const wfId = await seedGate(db, threadId, { + id: "stuck-head", + status: "PENDING", + at: Date.now(), + text: "stuck", + }); + + // Act: hit the stop endpoint. + const res = await api.post(`/api/${orgSlug}/decopilot/cancel/${threadId}`); + expect(res.status()).toBe(202); + + // Assert: the gate head is now CANCELLED (slot freed). + const { rows } = await db.query( + `SELECT status FROM dbos.workflow_status WHERE workflow_uuid = $1`, + [wfId], + ); + expect((rows[0] as { status: string }).status).toBe("CANCELLED"); + } finally { + await db.end(); + } +}); + +test("GET queue lists PENDING + ENQUEUED gate messages, oldest first", async ({ + authedPage, +}) => { + const { page, orgSlug } = authedPage; + const api = page.context().request; + const db = await connectDevDb(); + + try { + const { threadId } = await createAgentAndThread(api, orgSlug); + + await seedGate(db, threadId, { + id: "head", + status: "PENDING", + at: 1000, + text: "running one", + }); + await seedGate(db, threadId, { + id: "q2", + status: "ENQUEUED", + at: 3000, + text: "second", + }); + await seedGate(db, threadId, { + id: "q1", + status: "ENQUEUED", + at: 2000, + text: "first", + }); + + const res = await api.get(`/api/${orgSlug}/decopilot/queue/${threadId}`); + expect(res.status()).toBe(200); + const body = (await res.json()) as { + items: Array<{ messageId: string; status: string; text: string }>; + }; + expect(body.items.map((i) => i.messageId)).toEqual(["head", "q1", "q2"]); + expect(body.items[0]).toMatchObject({ + status: "running", + text: "running one", + }); + expect(body.items[1]).toMatchObject({ status: "queued", text: "first" }); + } finally { + await db.end(); + } +}); + +test("GET queue 403s for a non-owner", async ({ authedPage, playwright }) => { + const { page, orgSlug } = authedPage; + const ownerApi = page.context().request; + + // Create a thread owned by the primary user. + const { threadId } = await createAgentAndThread(ownerApi, orgSlug); + + // A second user who has no membership in orgSlug. + const otherCtx = await newApiContext(playwright); + await signUpViaApi(otherCtx); + + try { + // TODO(e2e-hardening): this proves a NON-MEMBER gets 403 (org-membership + // middleware). The owner-only guard (validateThreadOwnership: created_by != + // userId) for a non-owner who IS a member of the same org is not yet + // covered — add a second org member via an invite/accept fixture and assert + // 403 on another member's thread when this suite runs against real infra. + // The other user tries to read the owner's thread queue — should 403. + const res = await otherCtx.get( + `/api/${orgSlug}/decopilot/queue/${threadId}`, + ); + expect(res.status()).toBe(403); + } finally { + await otherCtx.dispose(); + } +}); + +test("POST cancel removes a queued item", async ({ authedPage }) => { + const { page, orgSlug } = authedPage; + const api = page.context().request; + const db = await connectDevDb(); + + try { + const { threadId } = await createAgentAndThread(api, orgSlug); + + const wfId = await seedGate(db, threadId, { + id: "to-cancel", + status: "ENQUEUED", + at: Date.now(), + text: "x", + }); + + const res = await api.post( + `/api/${orgSlug}/decopilot/queue/${threadId}/cancel/${encodeURIComponent(wfId)}`, + ); + expect(res.status()).toBe(202); + + const { rows } = await db.query( + `SELECT status FROM dbos.workflow_status WHERE workflow_uuid = $1`, + [wfId], + ); + expect((rows[0] as { status: string }).status).toBe("CANCELLED"); + } finally { + await db.end(); + } +}); + +test("POST cancel 404s for a workflowId scoped to another thread", async ({ + authedPage, +}) => { + const { page, orgSlug } = authedPage; + const api = page.context().request; + + const { threadId } = await createAgentAndThread(api, orgSlug); + + const foreign = encodeURIComponent("thread-run:someone-else:msg-1"); + const res = await api.post( + `/api/${orgSlug}/decopilot/queue/${threadId}/cancel/${foreign}`, + ); + expect(res.status()).toBe(404); +}); + +test("POST externalizes a data: attachment out of the DBOS workflow input", async ({ + authedPage, +}) => { + // Object storage (S3 + org-fs) must be available for this assertion to hold: + // uploadFileParts only materializes when ctx.orgFs is present, which requires + // a real S3-backed objectStorage. Skip on plain local dev without MinIO. + test.skip( + !( + process.env.S3_ENDPOINT && + process.env.S3_BUCKET && + process.env.S3_ACCESS_KEY_ID && + process.env.S3_SECRET_ACCESS_KEY + ), + "requires S3 env (S3_ENDPOINT/S3_BUCKET/S3_ACCESS_KEY_ID/S3_SECRET_ACCESS_KEY); see e2e.yml MinIO setup", + ); + + const { page, orgSlug } = authedPage; + const api = page.context().request; + const db = await connectDevDb(); + + try { + const { agentId, threadId } = await createAgentAndThread(api, orgSlug); + + // A tiny 1×1 PNG as a data: URL — represents a user-attached image. + const tinyPng = + "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="; + const messageId = `img-${Date.now()}`; + + const res = await api.post( + `/api/${orgSlug}/decopilot/threads/${threadId}/messages`, + { + data: { + messages: [ + { + id: messageId, + role: "user", + parts: [ + { type: "text", text: "describe this" }, + { + type: "file", + url: tinyPng, + mediaType: "image/png", + filename: "a.png", + }, + ], + }, + ], + agent: { id: agentId }, + branch: "ephemeral", + harnessId: "decopilot", + temperature: 0.5, + }, + headers: { "content-type": "application/json" }, + }, + ); + expect(res.status()).toBe(202); + + // The persisted gate input must NOT contain the base64 data: blob. + // `inputs` is a TEXT column (the {"json":[ctx]} envelope) — read as string. + const wfId = `thread-run:${threadId}:${messageId}`; + const { rows } = await db.query( + `SELECT inputs FROM dbos.workflow_status WHERE workflow_uuid = $1`, + [wfId], + ); + const serialized = String( + (rows[0] as { inputs: string } | undefined)?.inputs ?? "", + ); + expect(serialized).not.toContain("data:image/png;base64,"); + expect(serialized).toContain("mesh-storage:"); + } finally { + await db.end(); + } +});