Skip to content

Commit ffb682b

Browse files
authored
fix(cloud-agent): settle accepted messages on wrapper complete to prevent webhook hang (#3680)
* fix(cloud-agent-next): settle accepted messages on wrapper complete to prevent webhook hang * test(cloud-agent): update integration tests for wrapper complete reconciliation
1 parent 26b63d6 commit ffb682b

5 files changed

Lines changed: 197 additions & 59 deletions

File tree

services/cloud-agent-next/src/session/wrapper-runtime-state.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,15 @@ export async function recordRootSessionIdle(
451451
* this, the deadline would be cleared forever after the first event, and a
452452
* wrapper whose kilo-server SSE subscription silently stalls would remain
453453
* live without failing its accepted messages.
454+
*
455+
* This intentionally does NOT clear the idle-reconciliation fields
456+
* (`lastWrapperIdleAt`/`idleReconcileAfter`/`wrapperIdleDeadlineAt`). Those are
457+
* only ever armed by `recordRootSessionIdle` once the root session goes idle,
458+
* so any wrapper output observed afterwards is post-completion infrastructure
459+
* work (autocommit, condense, log upload) — not a new agent turn. Clearing the
460+
* idle deadline here would disarm the reconciler that finalizes the in-flight
461+
* message, stranding it non-terminal and hanging the callback. A genuinely new
462+
* turn clears idle via `recordWrapperAcceptedMessage` instead.
454463
*/
455464
export async function recordMeaningfulWrapperOutput(
456465
storage: DurableObjectStorage,
@@ -465,9 +474,6 @@ export async function recordMeaningfulWrapperOutput(
465474
lastWrapperMessageAt: now,
466475
noOutputDeadlineAt,
467476
nextPingAt: current.pingDeadlineAt === undefined ? nextPingAt : undefined,
468-
lastWrapperIdleAt: undefined,
469-
idleReconcileAfter: undefined,
470-
wrapperIdleDeadlineAt: undefined,
471477
}));
472478
}
473479

services/cloud-agent-next/src/session/wrapper-supervisor.test.ts

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
type WrapperSupervisorStorage,
1717
} from './wrapper-supervisor.js';
1818
import { getWrapperLease, getWrapperRuntimeState } from './wrapper-runtime-state.js';
19+
import type { LatestAssistantMessage } from './types.js';
1920

2021
vi.mock('@cloudflare/sandbox', () => ({
2122
getSandbox: vi.fn(),
@@ -103,8 +104,15 @@ function createHarness(
103104
options?: {
104105
metadata?: SessionMetadata;
105106
storageHooks?: { beforeList?: (prefix: string) => Promise<void> };
107+
getAssistantMessageForUserMessage?: (
108+
sessionId: string,
109+
kiloSessionId: string,
110+
parentMessageId: string
111+
) => LatestAssistantMessage | null;
106112
}
107113
) {
114+
const getAssistantMessageForUserMessage =
115+
options?.getAssistantMessageForUserMessage ?? (() => null);
108116
const storage = createMemoryStorage(initialEntries, options?.storageHooks);
109117
const events: MessageEvent[] = [];
110118
const callbackJobs: CallbackJob[] = [];
@@ -144,7 +152,7 @@ function createHarness(
144152
messageSettlementOutbox: settlementOutbox,
145153
sessionMessageQueue: { requestPendingDrainIfNeeded },
146154
getMetadata: async () => currentMetadata,
147-
getAssistantMessageForUserMessage: () => null,
155+
getAssistantMessageForUserMessage,
148156
hasActiveIngestConnection: async () => false,
149157
clearInterruptRequest: async () => {},
150158
stopWrappers,
@@ -631,7 +639,7 @@ describe('WrapperSupervisor', () => {
631639
await expect(getSessionMessageState(harness.storage, MESSAGE_ID)).resolves.toMatchObject({
632640
status: 'failed',
633641
failureReason: 'missing_assistant_reply',
634-
error: 'No assistant reply found after idle timeout',
642+
error: 'No assistant reply found during reconciliation',
635643
completionSource: 'idle_reconciliation',
636644
failureStage: 'post_dispatch_no_activity',
637645
failureCode: 'missing_assistant_reply',
@@ -679,6 +687,66 @@ describe('WrapperSupervisor', () => {
679687
});
680688
});
681689

690+
it('settles a still-accepted message and enqueues its callback when the wrapper completes without a terminal assistant event', async () => {
691+
// Reproduces the webhook hang: the final assistant message.updated never
692+
// carried time.completed (so assistant_message_event never settled it), and
693+
// post-idle autocommit refreshed liveness while clearing the idle-reconcile
694+
// deadline (idle fields absent below). The race-free `complete` event must
695+
// still terminalize the message and release the gated callback.
696+
const assistantMessageId = 'ase_complete_reconcile';
697+
const harness = createHarness([liveRuntimeState(), OWNED_WRAPPER_LEASE], {
698+
getAssistantMessageForUserMessage: () =>
699+
({
700+
info: { id: assistantMessageId, role: 'assistant' },
701+
parts: [],
702+
}) as unknown as LatestAssistantMessage,
703+
});
704+
await putSessionMessageState(harness.storage, {
705+
...acceptedMessage(),
706+
callbackRequired: true,
707+
callbackTarget: { url: 'https://example.com/complete-reconcile' },
708+
});
709+
710+
await harness.supervisor.onTerminalEvent({
711+
wrapperRunId: WRAPPER_RUN_ID,
712+
status: 'completed',
713+
});
714+
715+
await expect(getSessionMessageState(harness.storage, MESSAGE_ID)).resolves.toMatchObject({
716+
status: 'completed',
717+
completionSource: 'idle_reconciliation',
718+
assistantMessageId,
719+
});
720+
expect(harness.callbackJobs).toHaveLength(1);
721+
expect(harness.callbackJobs[0].payload).toMatchObject({
722+
messageId: MESSAGE_ID,
723+
status: 'completed',
724+
});
725+
expect(harness.requestPendingDrainIfNeeded).toHaveBeenCalledOnce();
726+
});
727+
728+
it('keeps the idle-reconcile deadline armed when post-completion output is observed', async () => {
729+
// Layer 2: autocommit/condense events after session.idle must refresh
730+
// liveness without disarming the reconciler that finalizes the in-flight
731+
// message. Before the fix, observeMeaningfulOutput cleared these fields.
732+
const harness = createHarness([
733+
liveRuntimeState({
734+
lastWrapperIdleAt: 1_000,
735+
idleReconcileAfter: 9_000,
736+
wrapperIdleDeadlineAt: 50_000,
737+
}),
738+
]);
739+
740+
await harness.supervisor.observeMeaningfulOutput(4, WRAPPER_CONNECTION_ID, 2_000);
741+
742+
await expect(getWrapperRuntimeState(harness.storage)).resolves.toMatchObject({
743+
lastWrapperIdleAt: 1_000,
744+
idleReconcileAfter: 9_000,
745+
wrapperIdleDeadlineAt: 50_000,
746+
lastWrapperMessageAt: 2_000,
747+
});
748+
});
749+
682750
it('retains successful idle ownership with a bounded physical warm deadline', async () => {
683751
const harness = createHarness([liveRuntimeState(), OWNED_WRAPPER_LEASE]);
684752

services/cloud-agent-next/src/session/wrapper-supervisor.ts

Lines changed: 80 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -645,41 +645,37 @@ export function createWrapperSupervisor(
645645
return false;
646646
}
647647

648-
async function checkIdleReconciliation(now: number): Promise<void> {
649-
const metadata = await getMetadata();
650-
if (!metadata) return;
651-
652-
const state = await getWrapperRuntimeState(storage);
653-
if (!state.wrapperRunId) return;
654-
655-
const acceptedMessages = await listNonTerminalAcceptedMessages(storage, state.wrapperRunId);
656-
if (acceptedMessages.length === 0) {
657-
if (
658-
state.wrapperConnectionId &&
659-
(state.lastWrapperIdleAt !== undefined || state.idleReconcileAfter !== undefined)
660-
) {
661-
await clearWrapperIdleState(storage, state.wrapperGeneration, state.wrapperConnectionId);
662-
}
663-
return;
664-
}
665-
666-
if (state.idleReconcileAfter !== undefined) {
667-
if (now < state.idleReconcileAfter) return;
668-
} else {
669-
const hasRecentOutput =
670-
state.lastWrapperMessageAt !== undefined &&
671-
now - state.lastWrapperMessageAt < WRAPPER_NO_OUTPUT_TIMEOUT_MS;
672-
if (hasRecentOutput) return;
673-
}
674-
648+
/**
649+
* Terminalize the wrapper run's still-accepted messages against the latest
650+
* assistant reply, mirroring how a finished turn would have settled them.
651+
*
652+
* Shared by two triggers:
653+
* - `idle`: the idle-reconciliation grace deadline elapsed.
654+
* - `wrapper_complete`: the wrapper emitted its terminal `complete` event,
655+
* which is the race-free "fully done" signal (it fires only after all
656+
* post-completion work). This is the authoritative backstop for the case
657+
* where the assistant `message.updated` arrived without `time.completed`,
658+
* so neither the assistant-event nor the idle path settled the message.
659+
*
660+
* `terminalizeSessionMessageOnce` is idempotent, so re-running here after a
661+
* partial settlement is safe.
662+
*/
663+
async function reconcileAcceptedMessages(
664+
now: number,
665+
state: WrapperRuntimeState,
666+
metadata: SessionMetadata,
667+
acceptedMessages: Awaited<ReturnType<typeof listNonTerminalAcceptedMessages>>,
668+
trigger: 'idle' | 'wrapper_complete'
669+
): Promise<void> {
675670
logger
676671
.withFields({
677672
sessionId: metadata.identity.sessionId,
678673
wrapperRunId: state.wrapperRunId,
679674
acceptedMessageCount: acceptedMessages.length,
680675
hasKiloSessionId: metadata.auth.kiloSessionId !== undefined,
676+
trigger,
681677
})
682-
.info('Idle reconciliation processing accepted messages');
678+
.info('Reconciling accepted messages');
683679

684680
let failedTerminalObserved = !metadata.auth.kiloSessionId;
685681
if (!failedTerminalObserved && metadata.auth.kiloSessionId) {
@@ -704,7 +700,7 @@ export function createWrapperSupervisor(
704700
await messageSettlementOutbox.terminalizeSessionMessageOnce(message.messageId, {
705701
kind: 'failed',
706702
reason: 'missing_assistant_reply',
707-
error: 'No assistant reply found after idle timeout',
703+
error: 'No assistant reply found during reconciliation',
708704
completionSource: 'idle_reconciliation',
709705
failureStage: 'post_dispatch_no_activity',
710706
failureCode: 'missing_assistant_reply',
@@ -722,7 +718,7 @@ export function createWrapperSupervisor(
722718
await messageSettlementOutbox.terminalizeSessionMessageOnce(message.messageId, {
723719
kind: 'failed',
724720
reason: 'missing_assistant_reply',
725-
error: 'No assistant reply found after idle timeout',
721+
error: 'No assistant reply found during reconciliation',
726722
completionSource: 'idle_reconciliation',
727723
failureStage: 'post_dispatch_no_activity',
728724
failureCode: 'missing_assistant_reply',
@@ -768,8 +764,39 @@ export function createWrapperSupervisor(
768764
sessionId: metadata.identity.sessionId,
769765
wrapperRunId: state.wrapperRunId,
770766
acceptedMessageCount: acceptedMessages.length,
767+
trigger,
771768
})
772-
.info('Idle reconciliation pass completed');
769+
.info('Reconciliation pass completed');
770+
}
771+
772+
async function checkIdleReconciliation(now: number): Promise<void> {
773+
const metadata = await getMetadata();
774+
if (!metadata) return;
775+
776+
const state = await getWrapperRuntimeState(storage);
777+
if (!state.wrapperRunId) return;
778+
779+
const acceptedMessages = await listNonTerminalAcceptedMessages(storage, state.wrapperRunId);
780+
if (acceptedMessages.length === 0) {
781+
if (
782+
state.wrapperConnectionId &&
783+
(state.lastWrapperIdleAt !== undefined || state.idleReconcileAfter !== undefined)
784+
) {
785+
await clearWrapperIdleState(storage, state.wrapperGeneration, state.wrapperConnectionId);
786+
}
787+
return;
788+
}
789+
790+
if (state.idleReconcileAfter !== undefined) {
791+
if (now < state.idleReconcileAfter) return;
792+
} else {
793+
const hasRecentOutput =
794+
state.lastWrapperMessageAt !== undefined &&
795+
now - state.lastWrapperMessageAt < WRAPPER_NO_OUTPUT_TIMEOUT_MS;
796+
if (hasRecentOutput) return;
797+
}
798+
799+
await reconcileAcceptedMessages(now, state, metadata, acceptedMessages, 'idle');
773800
}
774801

775802
async function checkKeepWarmCleanup(now: number): Promise<void> {
@@ -968,6 +995,28 @@ export function createWrapperSupervisor(
968995
if (acceptedMessages.length === 0) {
969996
await retainPhysicalWrapperWarm(Date.now());
970997
await clearInterruptRequest();
998+
} else {
999+
// `complete` is the race-free "fully done" signal: it is emitted only
1000+
// after all post-completion work. If messages are still accepted here,
1001+
// the assistant-event and idle-reconcile paths both missed them (e.g.
1002+
// the final assistant `message.updated` lacked `time.completed`, and
1003+
// post-idle autocommit refreshed liveness). Settle them now rather than
1004+
// leaving the callback gated indefinitely.
1005+
const metadata = await getMetadata();
1006+
if (metadata) {
1007+
await reconcileAcceptedMessages(
1008+
Date.now(),
1009+
state,
1010+
metadata,
1011+
acceptedMessages,
1012+
'wrapper_complete'
1013+
);
1014+
} else {
1015+
logger
1016+
.withFields({ sessionId, wrapperRunId, acceptedMessageCount: acceptedMessages.length })
1017+
.warn('Wrapper complete with accepted messages but no session metadata to reconcile');
1018+
}
1019+
await clearInterruptRequest();
9711020
}
9721021
} else {
9731022
await clearWrapperRuntimeIdentity(storage, {

services/cloud-agent-next/test/integration/session/execute-directly-failure.test.ts

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
recordWrapperAcceptedMessage,
2121
} from '../../../src/session/wrapper-runtime-state.js';
2222
import {
23+
getSessionMessageState,
2324
listNonTerminalAcceptedMessages,
2425
putSessionMessageState,
2526
type SessionMessageState,
@@ -294,7 +295,7 @@ describe('handleWrapperTerminalEvent — new-path identity and message preservat
294295
);
295296
});
296297

297-
it('wrapper complete does not clear wrapper runtime identity when accepted messages remain', async () => {
298+
it('wrapper complete reconciles still-accepted messages instead of stranding them', async () => {
298299
const userId = 'user_wrapper_complete_identity';
299300
const sessionId = 'agent_wrapper_complete_identity';
300301
const doId = env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`);
@@ -315,9 +316,10 @@ describe('handleWrapperTerminalEvent — new-path identity and message preservat
315316
const { state: wrapperState } = await allocateWrapperRuntimeState(instance.ctx.storage);
316317
const { wrapperRunId, wrapperConnectionId } = wrapperState;
317318

319+
const messageId = 'msg_018f1e2d3c4bWrpCmpAbCdEfGh';
318320
// Store an accepted (non-terminal) session message state
319321
const acceptedMessage: SessionMessageState = {
320-
messageId: 'msg_018f1e2d3c4bWrpCmpAbCdEfGh',
322+
messageId,
321323
status: 'accepted',
322324
prompt: 'hello',
323325
createdAt: Date.now(),
@@ -326,7 +328,11 @@ describe('handleWrapperTerminalEvent — new-path identity and message preservat
326328
};
327329
await putSessionMessageState(instance.ctx.storage, acceptedMessage);
328330

329-
// Fire wrapper complete — with accepted non-terminal messages present
331+
// Fire wrapper complete with an accepted non-terminal message present.
332+
// `complete` is the race-free terminal signal, so the message must be
333+
// settled here rather than left stranded (which would hang the callback).
334+
// This session has no kiloSessionId, so no assistant reply can be found
335+
// and the message settles as failed (missing_assistant_reply).
330336
await instance.handleWrapperTerminalEvent({
331337
wrapperRunId: wrapperRunId!,
332338
status: 'completed',
@@ -337,20 +343,21 @@ describe('handleWrapperTerminalEvent — new-path identity and message preservat
337343
instance.ctx.storage,
338344
wrapperRunId!
339345
);
346+
const settledMessage = await getSessionMessageState(instance.ctx.storage, messageId);
340347

341-
return { wrapperRuntimeState, wrapperConnectionId, acceptedMessages };
348+
return { wrapperRuntimeState, wrapperConnectionId, acceptedMessages, settledMessage };
342349
});
343350

344-
// Identity must NOT be cleared while accepted work remains
345-
expect(result.wrapperRuntimeState.wrapperConnectionId).toBe(result.wrapperConnectionId);
346-
// Accepted message must still be non-terminal
347-
expect(result.acceptedMessages).toHaveLength(1);
348-
expect(result.acceptedMessages[0]?.status).toBe('accepted');
351+
// The message is settled rather than left accepted.
352+
expect(result.acceptedMessages).toHaveLength(0);
353+
expect(result.settledMessage).toMatchObject({
354+
status: 'failed',
355+
failureCode: 'missing_assistant_reply',
356+
completionSource: 'idle_reconciliation',
357+
});
358+
// Identity is released once the run has no remaining accepted work.
359+
expect(result.wrapperRuntimeState.wrapperConnectionId).toBeUndefined();
349360
});
350-
351-
// NOTE: Per Phase 6 (keep-warm cleanup), wrapper `complete` will eventually NOT clear
352-
// identity even when no accepted messages remain — keep-warm alarm cleanup owns that.
353-
// The current behavior (clearing when idle) is interim and will be superseded by Phase 6.
354361
});
355362

356363
describe('new-path liveness without executionId', () => {

0 commit comments

Comments
 (0)