Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions apps/mesh/src/api/routes/decopilot/file-materializer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { describe, expect, it } from "bun:test";
import { messageHasAttachmentAnnotation } from "./file-materializer";

const msg = (text: string) =>
({ id: "m", role: "user", parts: [{ type: "text", text }] }) as never;

describe("messageHasAttachmentAnnotation", () => {
it("detects the sandbox-attachment annotation", () => {
expect(
messageHasAttachmentAnnotation(
msg(
"[Attached files — already inside your sandbox]\n- a.png: org/upload/a.png",
),
),
).toBe(true);
});
it("detects the uploaded-files annotation", () => {
expect(
messageHasAttachmentAnnotation(
msg(
"[Uploaded files — use these URLs when calling tools]\n- a.png: mesh-storage:k",
),
),
).toBe(true);
});
it("returns false for ordinary text", () => {
expect(messageHasAttachmentAnnotation(msg("hello world"))).toBe(false);
});
it("returns false when there are no text parts", () => {
expect(
messageHasAttachmentAnnotation({
id: "m",
role: "user",
parts: [{ type: "file", url: "mesh-storage:k" }],
} as never),
).toBe(false);
});
});
25 changes: 25 additions & 0 deletions apps/mesh/src/api/routes/decopilot/file-materializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,26 @@ export async function generatePresignedGetUrl(
// Phase 1 — uploadFileParts
// ============================================================================

/** Sentinels that uploadFileParts injects after a successful materialization. */
const ATTACHMENT_ANNOTATION_PREFIXES = ["[Attached files", "[Uploaded files"];

/**
* True when a message already carries an attachment annotation — i.e. it was
* already materialized by a prior uploadFileParts pass. Makes uploadFileParts
* idempotent so running it at POST *and* at dispatch can't double-annotate.
*/
export function messageHasAttachmentAnnotation(message: ChatMessage): boolean {
return message.parts.some(
(p) =>
p.type === "text" &&
"text" in p &&
typeof p.text === "string" &&
ATTACHMENT_ANNOTATION_PREFIXES.some((s) =>
p.text.trimStart().startsWith(s),
),
);
}

/** The composer's attachment marker: `[file:://<encodeURIComponent(name)>]`.
* File parts carry it as their `filename`; text-file attachments are decoded
* inline as a text part of `<marker>\n<content>` (see web derive-parts.ts). */
Expand Down Expand Up @@ -276,6 +296,11 @@ export async function uploadFileParts(
if (lastUserIdx === -1) return messages;

const message = messages[lastUserIdx]!;

// Idempotent: a message already annotated was materialized on a prior pass
// (e.g. at POST); skip so the dispatch-time pass can't re-upload / re-annotate.
if (messageHasAttachmentAnnotation(message)) return messages;

const dataUrlParts = message.parts.filter(
(p) =>
p.type === "file" &&
Expand Down
130 changes: 98 additions & 32 deletions apps/mesh/src/api/routes/decopilot/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { StreamRequestSchema } from "./schemas";
import type { ChatMessage, ModelsConfig } from "./types";
import type { DispatchRunInput } from "./dispatch-run";
import { resolveHarnessId } from "./dispatch-run";
import { uploadFileParts } from "./file-materializer";
import { stringifyError } from "@decocms/harness/decopilot/stream-error";
import { enqueueThreadRun } from "@/dispatch-queue";
import {
Expand All @@ -59,6 +60,11 @@ import type { HarnessId } from "@/harnesses";
import type { Thread } from "@/storage/types";
import { cancelThreadBackgroundJobs } from "@/harnesses/decopilot/background-tool-workflow";
import { abortBackgroundJobs } from "@/harnesses/decopilot/background-abort-registry";
import {
cancelThreadGateHead,
cancelThreadGateWorkflow,
listThreadGateQueue,
} from "@/dispatch-queue/thread-gate-queue";

// Per-connection /stream tail diagnostics. Flip to "1" in an environment where
// the live stream intermittently delivers no chunks — logs the resolved
Expand Down Expand Up @@ -671,11 +677,20 @@ export function createDecopilotRoutes(deps: DecopilotDeps) {
const target = resolveDispatchTarget({ sandboxProviderKind: pinnedKind });

const { abortSignal: _ignored, ...rest } = input;
// Externalize attachments NOW (data: base64 → mesh-storage: refs) so the
// gate workflow input persisted in dbos.workflow_inputs stays small. The
// dispatch-time uploadFileParts is now an idempotent no-op for these.
const materializedMessages = await uploadFileParts(input.messages, ctx, {
threadId: taskId,
});
const serializableRequest = {
...rest,
messages: materializedMessages,
target,
harnessId: pinnedHarness,
};
// computeIdempotencyKey reads input.messages (the originals) so the
// workflowID is unaffected by materialization (data: → mesh-storage: swap).
const lastMsg = input.messages[input.messages.length - 1];
const idempotencyKey = computeIdempotencyKey(lastMsg);
const workflowID = idempotencyKey
Expand Down Expand Up @@ -722,55 +737,46 @@ export function createDecopilotRoutes(deps: DecopilotDeps) {
// Cancel Endpoint — cancel ongoing run (local or via NATS to owning pod)
// ============================================================================

app.post("/:org/decopilot/cancel/:threadId", async (c) => {
const { ctx, taskId, thread, organization, userId } =
await validateThreadOwnership(c);

// Persist durable cancel flag so the ingest backstop rejects 409.
// Shared run-cancel teardown: durable cancel flag, background-job teardown,
// in-pod abort + cross-pod NATS broadcast, local turn cancel, ghost force-fail.
// Used by the stop endpoint and the per-item queue cancel (running head).
async function cancelActiveThreadRun(args: {
ctx: StudioContext;
taskId: string;
thread: Thread;
organization: { id: string };
userId: string;
}): Promise<void> {
const { ctx, taskId, thread, organization, userId } = args;
await ctx.storage.threads.setCancelRequested(taskId, organization.id);

// Tear down any background-tool workflows this thread started (image gen /
// backgrounded subtasks). They run on their own DBOS queue, so the
// in-memory run cancel below doesn't reach them. Non-fatal: a DBOS hiccup
// must not block the user-facing cancel.
await cancelThreadBackgroundJobs(taskId).catch((err) => {
console.error("[decopilot:cancel] failed to cancel background jobs", {
taskId,
err,
});
});

// Abort in-flight background work on this pod now, and fan the cancel out
// to every pod over NATS. Always broadcast: a background job runs on
// whichever pod DBOS dequeued it, so a locally-owned turn no longer implies
// no other pod is involved. Each pod's onCancel aborts its background
// controllers (`abortBackgroundJobs`) and cancels the live turn if it owns
// it; both are no-ops where they don't apply.
// Free a stranded/stuck PENDING gate head so the partition slot releases and
// the queue can proceed (an orphaned gate ignores the in-memory run cancel).
await cancelThreadGateHead(taskId).catch((err) => {
console.error("[decopilot:cancel] failed to cancel gate head", {
taskId,
err,
});
});
abortBackgroundJobs(taskId);
cancelBroadcast.broadcast(taskId);

// Try to cancel the live turn locally for an immediate response.
const cancelTransitions = await runRegistry.execute({
type: "CANCEL",
taskId,
});
if (cancelTransitions.some((t) => t.event.type === "RUN_FAILED")) {
cancelBroadcast.publishControlFrame(userId, {
type: "cancel",
runId: taskId,
});
return c.json({ cancelled: true });
}

cancelBroadcast.publishControlFrame(userId, {
type: "cancel",
runId: taskId,
});

// Ghost run: server restarted while a run was in progress. No pod has this
// run in memory, so the broadcast will never resolve. Force-fail the thread
// in the DB so the user can send new messages.
if (thread.status === "in_progress") {
const producedRunFailed = cancelTransitions.some(
(t) => t.event.type === "RUN_FAILED",
);
if (!producedRunFailed && thread.status === "in_progress") {
console.warn("[decopilot:cancel] Ghost run detected, force-failing", {
taskId,
});
Expand All @@ -791,10 +797,70 @@ export function createDecopilotRoutes(deps: DecopilotDeps) {
);
});
}
}

app.post("/:org/decopilot/cancel/:threadId", async (c) => {
const { ctx, taskId, thread, organization, userId } =
await validateThreadOwnership(c);
await cancelActiveThreadRun({ ctx, taskId, thread, organization, userId });
return c.json({ cancelled: true, async: true }, 202);
});

// ============================================================================
// Queue List Endpoint — list the thread's pending gate workflows
// ============================================================================
//
// GET /:org/decopilot/queue/:threadId
//
// List this thread's pending gate workflows (running head + queued tail) so
// the UI can show and cancel them. Owner-only.

app.get("/:org/decopilot/queue/:threadId", async (c) => {
const { taskId } = await validateThreadOwnership(c);
const items = await listThreadGateQueue(taskId);
return c.json({ items });
});

// ============================================================================
// Queue Per-Item Cancel Endpoint — cancel one pending gate workflow
// ============================================================================
//
// POST /:org/decopilot/queue/:threadId/cancel/:workflowId
//
// Cancel one pending gate item. A queued (ENQUEUED) item is just removed; the
// running head (PENDING) additionally runs the full run-cancel teardown.

app.post("/:org/decopilot/queue/:threadId/cancel/:workflowId", async (c) => {
const { ctx, taskId, thread, organization, userId } =
await validateThreadOwnership(c);
const workflowId = c.req.param("workflowId");

// The item must currently be pending for THIS thread (prefix-scoped list).
const items = await listThreadGateQueue(taskId);
const target = items.find((i) => i.workflowId === workflowId);
if (!target) return c.body(null, 404);

const ok = await cancelThreadGateWorkflow(taskId, workflowId);
if (!ok) return c.body(null, 404);

// For a running head, also run the full run-cancel teardown (abort + NATS
// broadcast + run-registry CANCEL). cancelActiveThreadRun re-cancels the
// gate head via cancelThreadGateHead — that re-cancel is idempotent
// (DBOS.cancelWorkflows on an already-cancelled workflow is a no-op), so the
// explicit cancelThreadGateWorkflow above covers the queued case and this
// covers the live run.
if (target.status === "running") {
await cancelActiveThreadRun({
ctx,
taskId,
thread,
organization,
userId,
});
}
return c.json({ cancelled: true }, 202);
});

// ============================================================================
// Stream Endpoint — tail the per-thread JetStream subject
// ============================================================================
Expand Down
93 changes: 93 additions & 0 deletions apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// apps/mesh/src/dispatch-queue/thread-gate-queue.test.ts
import { describe, expect, it } from "bun:test";
import {
extractUserMessageText,
gateStatusToQueueItem,
} from "./thread-gate-queue";

const userMsg = (text: string) => ({
id: "m1",
role: "user" as const,
parts: [{ type: "text" as const, text }],
});

describe("extractUserMessageText", () => {
it("returns the concatenated text parts of the last user message", () => {
const messages = [
{ id: "a", role: "assistant", parts: [{ type: "text", text: "hi" }] },
{
id: "u",
role: "user",
parts: [
{ type: "text", text: "hello " },
{ type: "file", url: "mesh-storage:k", mediaType: "image/png" },
{ type: "text", text: "world" },
],
},
] as never;
expect(extractUserMessageText(messages)).toBe("hello world");
});

it("returns empty string when there is no user message", () => {
const messages = [
{ id: "a", role: "assistant", parts: [{ type: "text", text: "hi" }] },
] as never;
expect(extractUserMessageText(messages)).toBe("");
});

it("returns empty string for an empty array", () => {
expect(extractUserMessageText([] as never)).toBe("");
});
});

describe("gateStatusToQueueItem", () => {
const threadId = "11bda36e";
const base = {
workflowID: `thread-run:${threadId}:msg-7`,
status: "ENQUEUED",
createdAt: 1782400000000,
input: [
{
threadId,
request: { messages: [userMsg("queued text")] },
source: "user-message",
},
],
};

it("maps an ENQUEUED gate to a queued item with parsed messageId + text", () => {
const item = gateStatusToQueueItem(base as never, threadId);
expect(item).toEqual({
workflowId: `thread-run:${threadId}:msg-7`,
messageId: "msg-7",
text: "queued text",
status: "queued",
enqueuedAt: 1782400000000,
});
});

it("maps a PENDING gate to status 'running'", () => {
const item = gateStatusToQueueItem(
{ ...base, status: "PENDING" } as never,
threadId,
);
expect(item?.status).toBe("running");
});

it("returns null when the workflowID does not match the thread prefix", () => {
const item = gateStatusToQueueItem(
{ ...base, workflowID: "thread-run:other:msg-7" } as never,
threadId,
);
expect(item).toBeNull();
});

it("tolerates a missing/unshaped input (empty text)", () => {
const item = gateStatusToQueueItem(
{ ...base, input: undefined } as never,
threadId,
);
expect(item?.text).toBe("");
expect(item?.messageId).toBe("msg-7");
});
});
Loading
Loading