Skip to content

Commit 568d935

Browse files
authored
fix(cloud-agent-next): hide superseded delivery failures (#3451)
1 parent 0035ea2 commit 568d935

3 files changed

Lines changed: 97 additions & 4 deletions

File tree

services/cloud-agent-next/src/session/session-message-queue.test.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,92 @@ describe('SessionMessageQueue', () => {
10881088
]);
10891089
});
10901090

1091+
it('omits a reconnect delivery failure after a later turn completed', async () => {
1092+
const harness = createQueueHarness();
1093+
await putSessionMessageState(harness.storage, {
1094+
...createQueuedSessionMessageState(
1095+
{
1096+
turn: {
1097+
type: 'prompt',
1098+
messageId: FIRST_MESSAGE_ID,
1099+
prompt: 'failed before acceptance',
1100+
},
1101+
agent: { mode: 'code', model: 'default-model' },
1102+
},
1103+
undefined,
1104+
10
1105+
),
1106+
status: 'failed',
1107+
terminalAt: 30,
1108+
completionSource: 'delivery_failure',
1109+
failureReason: 'exhausted',
1110+
error: 'delivery exhausted',
1111+
attempts: 1,
1112+
});
1113+
await putSessionMessageState(harness.storage, {
1114+
...createQueuedSessionMessageState(
1115+
{
1116+
turn: {
1117+
type: 'prompt',
1118+
messageId: SECOND_MESSAGE_ID,
1119+
prompt: 'delivered later',
1120+
},
1121+
agent: { mode: 'code', model: 'default-model' },
1122+
},
1123+
undefined,
1124+
40
1125+
),
1126+
status: 'completed',
1127+
acceptedAt: 50,
1128+
terminalAt: 60,
1129+
completionSource: 'assistant_message_event',
1130+
});
1131+
1132+
await expect(harness.queue.snapshotForStreamConnect()).resolves.toEqual([]);
1133+
});
1134+
1135+
it('omits a reconnect delivery failure while a later turn is accepted', async () => {
1136+
const harness = createQueueHarness();
1137+
await putSessionMessageState(harness.storage, {
1138+
...createQueuedSessionMessageState(
1139+
{
1140+
turn: {
1141+
type: 'prompt',
1142+
messageId: FIRST_MESSAGE_ID,
1143+
prompt: 'failed before acceptance',
1144+
},
1145+
agent: { mode: 'code', model: 'default-model' },
1146+
},
1147+
undefined,
1148+
10
1149+
),
1150+
status: 'failed',
1151+
terminalAt: 30,
1152+
completionSource: 'delivery_failure',
1153+
failureReason: 'exhausted',
1154+
error: 'delivery exhausted',
1155+
attempts: 1,
1156+
});
1157+
await putSessionMessageState(harness.storage, {
1158+
...createQueuedSessionMessageState(
1159+
{
1160+
turn: {
1161+
type: 'prompt',
1162+
messageId: SECOND_MESSAGE_ID,
1163+
prompt: 'currently running',
1164+
},
1165+
agent: { mode: 'code', model: 'default-model' },
1166+
},
1167+
undefined,
1168+
40
1169+
),
1170+
status: 'accepted',
1171+
acceptedAt: 50,
1172+
});
1173+
1174+
await expect(harness.queue.snapshotForStreamConnect()).resolves.toEqual([]);
1175+
});
1176+
10911177
it('terminalizes pending queued work before deleting it during interrupt handoff', async () => {
10921178
const harness = createQueueHarness();
10931179
const first = createPendingSessionMessage({

services/cloud-agent-next/src/session/session-message-queue.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import {
3838
import {
3939
createQueuedSessionMessageState,
4040
getSessionMessageState,
41-
listNeverAcceptedTerminalQueuedMessages,
41+
listReconnectVisibleTerminalQueuedMessages,
4242
putSessionMessageState,
4343
type SessionMessageStorage,
4444
type TerminalizeParams,
@@ -848,7 +848,7 @@ export function createSessionMessageQueue(
848848
async function snapshotForStreamConnect(): Promise<QueuedMessageSnapshot[]> {
849849
const pending = await listPendingSessionMessages(storage);
850850
const pendingMessageIds = new Set(pending.map(message => message.messageId));
851-
const terminalQueued = await listNeverAcceptedTerminalQueuedMessages(storage);
851+
const terminalQueued = await listReconnectVisibleTerminalQueuedMessages(storage);
852852

853853
return [
854854
...pending.map(message => ({

services/cloud-agent-next/src/session/session-message-state.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ type NeverAcceptedTerminalQueuedMessageState = SessionMessageState & {
419419
status: 'failed' | 'interrupted';
420420
};
421421

422-
export async function listNeverAcceptedTerminalQueuedMessages(
422+
export async function listReconnectVisibleTerminalQueuedMessages(
423423
storage: SessionMessageStorage
424424
): Promise<NeverAcceptedTerminalQueuedMessageState[]> {
425425
const entries = await listSessionMessageStates(storage);
@@ -428,7 +428,14 @@ export async function listNeverAcceptedTerminalQueuedMessages(
428428
state.acceptedAt === undefined &&
429429
state.queuedAt !== undefined &&
430430
(state.status === 'failed' || state.status === 'interrupted') &&
431-
(state.completionSource === 'delivery_failure' || state.completionSource === 'interrupt')
431+
(state.completionSource === 'delivery_failure' || state.completionSource === 'interrupt') &&
432+
!entries.some(
433+
laterState =>
434+
(laterState.acceptedAt !== undefined ||
435+
laterState.status === 'accepted' ||
436+
laterState.status === 'completed') &&
437+
(laterState.queuedAt ?? laterState.createdAt) > (state.queuedAt ?? state.createdAt)
438+
)
432439
);
433440
}
434441

0 commit comments

Comments
 (0)