Skip to content

Commit bba7ddb

Browse files
tlgimenesclaude
andcommitted
feat(chat): frontend message queue — send-while-streaming queues, no disabled input
Reworks the queue UX per design feedback: - **Send renders in the body.** A message sent to an idle thread submits normally and renders in the thread's user-message place + streams. - **No disabled composer state, only a streaming state.** Tools, the microphone, and the mode pills are always enabled now; only the primary button reflects streaming (Send when there's a draft, Stop otherwise). - **Send while a run is active → the queue, not the body.** New `conn.enqueue()` POSTs the message to the gate quietly (no optimistic body row, no run-status change); the message is mirrored into a per-thread frontend queue and surfaces in the panel. It renders in the body only when its turn dequeues and streams. - **Frontend message queue.** New module-scoped per-thread store (`message-queue-store.ts`) with `useMessageQueue(threadId)` + `useMessageQueueActions()` hooks (enqueue / cancel / refresh). Seeded from the gate on mount, written optimistically on send, and re-synced on every SSE run start/end edge and on terminal `onFinish`. Replaces the React-Query `useThreadQueue` (deleted) and the `KEYS.threadQueue` key. `selectWaitingQueueItems` still excludes the active head so the running message never double-renders. Verified live (Codex-desktop thread): message renders in the body + streams; Tools/mic stay enabled mid-run; a message sent behind a running gate lands in the panel (not the body) and cancels cleanly. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 6a4ef53 commit bba7ddb

10 files changed

Lines changed: 289 additions & 114 deletions

File tree

apps/mesh/src/web/components/chat/chat-context.tsx

Lines changed: 80 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,18 @@ function statusToString(s: ConnStatus): ChatStreamContextValue["status"] {
9494
return s.kind;
9595
}
9696

97+
/** Concatenate the text parts of a message for the queue-row display. */
98+
function queueItemTextFromParts(parts: ChatMessage["parts"]): string {
99+
return parts
100+
.map((p) =>
101+
p && typeof p === "object" && (p as { type?: string }).type === "text"
102+
? ((p as { text?: string }).text ?? "")
103+
: "",
104+
)
105+
.join("")
106+
.trim();
107+
}
108+
97109
import { useChatNavigation } from "./hooks/use-chat-navigation";
98110
import { useThreadActions, useThreadManager } from "./store/hooks";
99111
import { derivePartsFromTiptapDoc } from "./derive-parts";
@@ -105,6 +117,7 @@ import type { RunStatusStage } from "./run-status";
105117
import { useLocalStorage } from "../../hooks/use-local-storage";
106118
import { LOCALSTORAGE_KEYS } from "../../lib/localstorage-keys";
107119
import { KEYS } from "../../lib/query-keys";
120+
import { enqueueMessage, refreshMessageQueue } from "./message-queue-store";
108121
import { formatDeckTabId } from "@/web/layouts/main-panel-tabs/tab-id";
109122
import { useSimpleMode } from "../../hooks/use-organization-settings";
110123

@@ -937,6 +950,9 @@ export function ActiveTaskProvider({
937950
},
938951
onFinish: (message, _messages, finishReason) => {
939952
const cb = cbRef.current;
953+
// Terminal event: the gate advanced — re-sync the queue so the
954+
// dequeued message drops and the next one surfaces.
955+
if (cb.taskId) void refreshMessageQueue(cb.orgSlug, cb.taskId);
940956
if (cb.taskId) {
941957
cb.manager.patchThread({
942958
id: cb.taskId,
@@ -1000,11 +1016,15 @@ export function ActiveTaskProvider({
10001016
};
10011017
conn.observer = observer;
10021018

1003-
// Refresh the pending-message queue on every SSE run start/end edge. A
1004-
// gate dequeue flips conn.status idle→active (the next queued message
1005-
// began streaming); a finish flips active→idle (the slot freed). This is
1006-
// the SSE-driven replacement for the old 3s poll — the queue panel updates
1007-
// exactly when the thread gate advances, off the same stream the chat uses.
1019+
// Seed the frontend message queue from the gate on (re)mount so a reload
1020+
// or thread switch still shows what's queued.
1021+
void refreshMessageQueue(cbRef.current.orgSlug, cbRef.current.taskId);
1022+
1023+
// Re-sync the queue on every SSE run start/end edge. A gate dequeue flips
1024+
// conn.status idle→active (the next queued message began streaming); a
1025+
// finish flips active→idle (the slot freed). This is the SSE-driven
1026+
// refresh — the queue panel updates exactly when the thread gate advances,
1027+
// off the same stream the chat uses.
10081028
const isActiveStatus = (k: ConnStatus["kind"]) =>
10091029
k === "submitted" || k === "streaming";
10101030
let prevActive = isActiveStatus(conn.status.get().kind);
@@ -1013,9 +1033,7 @@ export function ActiveTaskProvider({
10131033
if (active === prevActive) return;
10141034
prevActive = active;
10151035
const cb = cbRef.current;
1016-
cb.queryClient.invalidateQueries({
1017-
queryKey: KEYS.threadQueue(cb.taskId),
1018-
});
1036+
void refreshMessageQueue(cb.orgSlug, cb.taskId);
10191037
});
10201038

10211039
return () => {
@@ -1100,35 +1118,60 @@ export function ActiveTaskProvider({
11001118
metadata: messageMetadata,
11011119
};
11021120

1103-
await conn.submit(
1104-
{ kind: "message", message: userMessage },
1105-
{
1106-
tier: activeTier,
1107-
mode: modeToSend,
1108-
toolApprovalLevel:
1109-
preferences.toolApprovalLevel ?? readToolApprovalLevel(),
1110-
system: system || undefined,
1111-
agent: { id: capturedVirtualMcpId },
1112-
thread_id: capturedTaskId,
1113-
...resolveSubmitSettings({
1114-
thread: activeTask
1115-
? {
1116-
harness_id: activeTask.harness_id ?? null,
1117-
sandbox_provider_kind: activeTask.sandbox_provider_kind ?? null,
1118-
branch: activeTask.branch ?? null,
1119-
}
1120-
: null,
1121-
globals: {
1122-
harnessId: pendingHarnessId ?? undefined,
1123-
sandboxProviderKind: pendingSandboxProviderKind ?? undefined,
1124-
branch: currentBranch,
1125-
},
1126-
}),
1127-
},
1128-
);
1129-
queryClient.invalidateQueries({
1130-
queryKey: KEYS.threadQueue(capturedTaskId),
1131-
});
1121+
const requestOptions = {
1122+
tier: activeTier,
1123+
mode: modeToSend,
1124+
toolApprovalLevel:
1125+
preferences.toolApprovalLevel ?? readToolApprovalLevel(),
1126+
system: system || undefined,
1127+
agent: { id: capturedVirtualMcpId },
1128+
thread_id: capturedTaskId,
1129+
...resolveSubmitSettings({
1130+
thread: activeTask
1131+
? {
1132+
harness_id: activeTask.harness_id ?? null,
1133+
sandbox_provider_kind: activeTask.sandbox_provider_kind ?? null,
1134+
branch: activeTask.branch ?? null,
1135+
}
1136+
: null,
1137+
globals: {
1138+
harnessId: pendingHarnessId ?? undefined,
1139+
sandboxProviderKind: pendingSandboxProviderKind ?? undefined,
1140+
branch: currentBranch,
1141+
},
1142+
}),
1143+
};
1144+
1145+
// A run is already streaming (this thread) or in progress (hosted): the
1146+
// message can't be the current turn, so queue it behind the running one.
1147+
// POST it to the gate WITHOUT an optimistic body row, then mirror it into
1148+
// the frontend message queue (it renders in the body when its turn
1149+
// dequeues and streams). Otherwise this is the current turn — submit
1150+
// normally so it renders in the body and streams immediately.
1151+
if (isStreaming || isRunInProgress) {
1152+
try {
1153+
await conn.enqueue(userMessage, requestOptions);
1154+
enqueueMessage(capturedTaskId, {
1155+
workflowId: `thread-run:${capturedTaskId}:${userMessage.id}`,
1156+
messageId: userMessage.id,
1157+
text: queueItemTextFromParts(parts),
1158+
status: "queued",
1159+
enqueuedAt: Date.now(),
1160+
});
1161+
} catch (err) {
1162+
setChatError(err instanceof Error ? err : new Error(String(err)));
1163+
toast.error(
1164+
err instanceof Error ? err.message : "Failed to queue message",
1165+
);
1166+
}
1167+
// Reconcile the optimistic row against the gate's authoritative list.
1168+
void refreshMessageQueue(org.slug, capturedTaskId);
1169+
} else {
1170+
await conn.submit(
1171+
{ kind: "message", message: userMessage },
1172+
requestOptions,
1173+
);
1174+
}
11321175
}
11331176

11341177
// Cancel run

apps/mesh/src/web/components/chat/input.tsx

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@ export function ChatInput({
534534
{/* Left Actions (+, Tools, active tool pills, stats) */}
535535
<div className="flex items-center gap-1.5 min-w-0">
536536
<ToolsPopover
537-
disabled={isStreaming}
537+
disabled={false}
538538
onOpenConnections={() => {
539539
track("connections_dialog_opened", {
540540
source: "tools_popover",
@@ -576,7 +576,6 @@ export function ChatInput({
576576
{chatMode === "gen-image" && imageModel && (
577577
<button
578578
type="button"
579-
disabled={isStreaming}
580579
onClick={() => {
581580
playSwitchSound();
582581
track("chat_mode_changed", {
@@ -603,7 +602,6 @@ export function ChatInput({
603602
{chatMode === "web-search" && webSearchModel && (
604603
<button
605604
type="button"
606-
disabled={isStreaming}
607605
onClick={() => {
608606
playSwitchSound();
609607
track("chat_mode_changed", {
@@ -630,7 +628,6 @@ export function ChatInput({
630628
{chatMode === "deep-research" && deepResearchModel && (
631629
<button
632630
type="button"
633-
disabled={isStreaming}
634631
onClick={() => {
635632
playSwitchSound();
636633
track("chat_mode_changed", {
@@ -674,14 +671,14 @@ export function ChatInput({
674671
)}
675672
<TierTrigger />
676673

677-
{/* Microphone button — kept mounted (and disabled)
678-
during streaming/run to avoid layout shift when
679-
the send button morphs into stop/cancel. */}
674+
{/* Microphone button — always enabled; the composer has
675+
no disabled state, only a streaming state reflected by
676+
the send/stop button. */}
680677
{voice.isSupported && (
681678
<Button
682679
type="button"
683680
onClick={handleVoiceStart}
684-
disabled={isStreaming || isRunInProgress}
681+
disabled={false}
685682
variant="ghost"
686683
size="icon"
687684
className={cn(
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import {
2+
dropQueueItem,
3+
upsertQueueItem,
4+
type QueueItemDTO,
5+
} from "./queue-items";
6+
import { Store } from "./store/store-primitive";
7+
8+
/**
9+
* Per-thread frontend message queue. A module-scoped registry of Stores keyed
10+
* by thread id, so a thread's queue outlives any single chat mount (switching
11+
* threads and back keeps the optimistic queue). Seeded from the server
12+
* `/queue` endpoint and written optimistically when the user sends a message
13+
* behind a running turn. The panel reads this — not the network — so a queued
14+
* message appears instantly.
15+
*/
16+
const stores = new Map<string, Store<QueueItemDTO[]>>();
17+
18+
export function messageQueueStore(threadId: string): Store<QueueItemDTO[]> {
19+
let store = stores.get(threadId);
20+
if (!store) {
21+
store = new Store<QueueItemDTO[]>([]);
22+
stores.set(threadId, store);
23+
}
24+
return store;
25+
}
26+
27+
/** Optimistically append a just-sent message (idempotent by messageId). */
28+
export function enqueueMessage(threadId: string, item: QueueItemDTO): void {
29+
messageQueueStore(threadId).update((cur) => upsertQueueItem(cur, item));
30+
}
31+
32+
/** Drop a message — cancelled, or dequeued into the running slot. */
33+
export function removeMessage(threadId: string, messageId: string): void {
34+
messageQueueStore(threadId).update((cur) => dropQueueItem(cur, messageId));
35+
}
36+
37+
/** Replace a thread's queue with the server's authoritative list. */
38+
export function setQueue(threadId: string, items: QueueItemDTO[]): void {
39+
messageQueueStore(threadId).set(items);
40+
}
41+
42+
/**
43+
* Fetch the server's gate queue for a thread and replace the local store.
44+
* Best-effort: on any error the optimistic store stands until the next call.
45+
*/
46+
export async function refreshMessageQueue(
47+
orgSlug: string,
48+
threadId: string,
49+
): Promise<void> {
50+
try {
51+
const res = await fetch(
52+
`/api/${encodeURIComponent(orgSlug)}/decopilot/queue/${encodeURIComponent(threadId)}`,
53+
{ credentials: "include" },
54+
);
55+
if (!res.ok) return;
56+
const body = (await res.json()) as { items?: QueueItemDTO[] };
57+
setQueue(threadId, body.items ?? []);
58+
} catch {
59+
// best-effort — leave the current store contents in place
60+
}
61+
}

apps/mesh/src/web/components/chat/queue-items.test.ts

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { describe, expect, it } from "bun:test";
2-
import { selectWaitingQueueItems } from "./queue-items";
3-
import type { QueueItemDTO } from "./use-thread-queue";
2+
import {
3+
dropQueueItem,
4+
selectWaitingQueueItems,
5+
upsertQueueItem,
6+
type QueueItemDTO,
7+
} from "./queue-items";
48

59
const item = (
610
id: string,
@@ -55,3 +59,36 @@ describe("selectWaitingQueueItems", () => {
5559
expect(out.map((i) => i.messageId)).toEqual(["q1", "q2"]);
5660
});
5761
});
62+
63+
describe("upsertQueueItem", () => {
64+
it("appends a new item", () => {
65+
const out = upsertQueueItem(
66+
[item("a", "queued", 1)],
67+
item("b", "queued", 2),
68+
);
69+
expect(out.map((i) => i.messageId)).toEqual(["a", "b"]);
70+
});
71+
72+
it("is idempotent by messageId (no duplicate when re-enqueued)", () => {
73+
const out = upsertQueueItem(
74+
[item("a", "queued", 1)],
75+
item("a", "queued", 9),
76+
);
77+
expect(out.map((i) => i.messageId)).toEqual(["a"]);
78+
});
79+
});
80+
81+
describe("dropQueueItem", () => {
82+
it("removes the matching messageId", () => {
83+
const out = dropQueueItem(
84+
[item("a", "queued", 1), item("b", "queued", 2)],
85+
"a",
86+
);
87+
expect(out.map((i) => i.messageId)).toEqual(["b"]);
88+
});
89+
90+
it("is a no-op when the id is absent", () => {
91+
const out = dropQueueItem([item("a", "queued", 1)], "zzz");
92+
expect(out.map((i) => i.messageId)).toEqual(["a"]);
93+
});
94+
});

apps/mesh/src/web/components/chat/queue-items.ts

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,16 @@
1-
import type { QueueItemDTO } from "./use-thread-queue";
1+
/** A pending message in a thread's gate queue, as surfaced to the UI. */
2+
export interface QueueItemDTO {
3+
/** Full DBOS workflow id: `thread-run:{threadId}:{messageId}`. */
4+
workflowId: string;
5+
/** Trailing segment of the workflow id (the user message id). */
6+
messageId: string;
7+
/** Display text of the queued user message. */
8+
text: string;
9+
/** PENDING ("running") → being processed; ENQUEUED ("queued") → waiting. */
10+
status: "running" | "queued";
11+
/** Epoch ms the gate was created/enqueued. */
12+
enqueuedAt: number;
13+
}
214

315
/**
416
* The messages *waiting behind* the active run, for the queue panel.
@@ -14,3 +26,21 @@ export function selectWaitingQueueItems(items: QueueItemDTO[]): QueueItemDTO[] {
1426
const sorted = [...items].sort((a, b) => a.enqueuedAt - b.enqueuedAt);
1527
return sorted.slice(1);
1628
}
29+
30+
/** Append an item unless its messageId is already present (idempotent). */
31+
export function upsertQueueItem(
32+
items: QueueItemDTO[],
33+
item: QueueItemDTO,
34+
): QueueItemDTO[] {
35+
return items.some((i) => i.messageId === item.messageId)
36+
? items
37+
: [...items, item];
38+
}
39+
40+
/** Remove the item with the given messageId. */
41+
export function dropQueueItem(
42+
items: QueueItemDTO[],
43+
messageId: string,
44+
): QueueItemDTO[] {
45+
return items.filter((i) => i.messageId !== messageId);
46+
}

apps/mesh/src/web/components/chat/queue-panel.tsx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Button } from "@deco/ui/components/button.tsx";
22
import { X } from "@untitledui/icons";
33
import { selectWaitingQueueItems } from "./queue-items";
4-
import { useCancelQueuedMessage, useThreadQueue } from "./use-thread-queue";
4+
import { useMessageQueue, useMessageQueueActions } from "./use-message-queue";
55

66
/**
77
* Pending-message queue above the composer. Lists only the messages *waiting*
@@ -10,8 +10,8 @@ import { useCancelQueuedMessage, useThreadQueue } from "./use-thread-queue";
1010
* chat body. Each row cancels its own gate workflow. Hidden when nothing waits.
1111
*/
1212
export function ThreadQueuePanel({ taskId }: { taskId: string }) {
13-
const { items } = useThreadQueue(taskId);
14-
const cancel = useCancelQueuedMessage(taskId);
13+
const items = useMessageQueue(taskId);
14+
const { cancel } = useMessageQueueActions();
1515
const queued = selectWaitingQueueItems(items);
1616
if (queued.length === 0) return null;
1717

@@ -38,7 +38,7 @@ export function ThreadQueuePanel({ taskId }: { taskId: string }) {
3838
size="icon"
3939
className="size-6 shrink-0 text-muted-foreground hover:text-foreground"
4040
title="Remove from queue"
41-
onClick={() => cancel(item.workflowId)}
41+
onClick={() => cancel(taskId, item.messageId)}
4242
>
4343
<X size={14} />
4444
</Button>

0 commit comments

Comments
 (0)