Skip to content

Commit e5373e1

Browse files
committed
fix(cloud-agent-next): isolate dev reporting and track activity
1 parent fd1d0cd commit e5373e1

8 files changed

Lines changed: 48 additions & 19 deletions

File tree

services/cloud-agent-next/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ Required secrets:
703703

704704
- `CLOUDFLARE_API_TOKEN` — must have permission to deploy the worker for the selected environment.
705705

706-
Cloud Agent Next owns its operational reporting projection. Deploy the reporting database migration before this worker release and provision `cloud-agent-next-report-queue` plus `cloud-agent-next-report-queue-dlq` with four-day message retention. New sessions synchronously create their reporting anchor before setup; queued run reports do not synthesize parents when an anchor is absent.
706+
Cloud Agent Next owns its operational reporting projection. Deploy the reporting database migration before this worker release and provision `cloud-agent-next-report-queue` plus `cloud-agent-next-report-queue-dlq` with four-day message retention. A deployed `dev` environment requires isolated `cloud-agent-next-report-queue-dev` and `cloud-agent-next-report-queue-dlq-dev` resources with the same retention. New sessions synchronously create their reporting anchor before setup; queued run reports do not synthesize parents when an anchor is absent.
707707

708708
### Testing
709709

services/cloud-agent-next/src/persistence/CloudAgentSession.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,7 @@ export class CloudAgentSession extends DurableObject<WorkerEnv> {
596596
keepContainerAlive: () => {
597597
void this.keepContainerAlive();
598598
},
599+
observeCorrelatedAgentActivity: messageId => this.recordCorrelatedAgentActivity(messageId),
599600
terminalizeSessionMessageOnce: async (messageId, params, wrapperRunId) => {
600601
await this.ensureAcceptedMessageBeforeTerminal(messageId, wrapperRunId);
601602
await this.recordCorrelatedAgentActivity(messageId);
@@ -2051,7 +2052,7 @@ export class CloudAgentSession extends DurableObject<WorkerEnv> {
20512052

20522053
private async recordCorrelatedAgentActivity(messageId: string): Promise<void> {
20532054
const updated = await markAgentActivityObserved(this.ctx.storage, messageId);
2054-
if (updated) void this.reportRunState(updated).catch(() => undefined);
2055+
if (updated) this.ctx.waitUntil(this.reportRunState(updated));
20552056
}
20562057

20572058
private async ensureAcceptedMessageBeforeTerminal(

services/cloud-agent-next/src/server.test.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ vi.mock('./callbacks/index.js', () => ({
5151
vi.mock('./telemetry/report-consumer.js', () => ({
5252
CLOUD_AGENT_REPORT_QUEUE_NAMES: new Set([
5353
'cloud-agent-next-report-queue',
54+
'cloud-agent-next-report-queue-dev',
5455
'cloud-agent-next-report-queue-test',
5556
]),
5657
consumeCloudAgentReportBatch: consumeCloudAgentReportBatchMock,
@@ -160,6 +161,18 @@ describe('server background reporting', () => {
160161
expect(consumeCloudAgentReportBatchMock).toHaveBeenCalledWith(batch, env);
161162
});
162163

164+
it('routes isolated development report queue batches to the Cloud Agent report consumer', async () => {
165+
const env = createEnv();
166+
const batch = {
167+
queue: 'cloud-agent-next-report-queue-dev',
168+
messages: [],
169+
} as unknown as MessageBatch<unknown>;
170+
171+
await worker.queue(batch, env as unknown as Env);
172+
173+
expect(consumeCloudAgentReportBatchMock).toHaveBeenCalledWith(batch, env);
174+
});
175+
163176
it('runs reporting retention cleanup from the scheduled handler', async () => {
164177
const env = createEnv();
165178

services/cloud-agent-next/src/telemetry/report-consumer.test.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ vi.mock('./report-store.js', () => ({
99
}));
1010

1111
import { createCloudAgentReportStore } from './report-store.js';
12-
import { consumeCloudAgentReportBatch } from './report-consumer.js';
12+
import { CLOUD_AGENT_REPORT_QUEUE_NAMES, consumeCloudAgentReportBatch } from './report-consumer.js';
1313

1414
const report = {
1515
version: 1,
@@ -38,6 +38,10 @@ describe('Cloud Agent report consumer', () => {
3838
vi.resetAllMocks();
3939
});
4040

41+
it('routes isolated development reporting queue messages', () => {
42+
expect(CLOUD_AGENT_REPORT_QUEUE_NAMES.has('cloud-agent-next-report-queue-dev')).toBe(true);
43+
});
44+
4145
it('acks a valid saved report after its essential write completes', async () => {
4246
const saveReport = vi.fn(async () => ({ outcome: 'applied' as const }));
4347
vi.mocked(createCloudAgentReportStore).mockReturnValue({ saveReport } as never);

services/cloud-agent-next/src/telemetry/report-consumer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { createCloudAgentReportStore } from './report-store.js';
66

77
export const CLOUD_AGENT_REPORT_QUEUE_NAMES = new Set([
88
'cloud-agent-next-report-queue',
9+
'cloud-agent-next-report-queue-dev',
910
'cloud-agent-next-report-queue-test',
1011
]);
1112

services/cloud-agent-next/src/websocket/ingest.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -847,11 +847,12 @@ describe('createIngestHandler', () => {
847847
function createNewPathDOContext() {
848848
return {
849849
...createFakeDOContext(),
850+
observeCorrelatedAgentActivity: vi.fn().mockResolvedValue(undefined),
850851
terminalizeSessionMessageOnce: vi.fn().mockResolvedValue(undefined),
851852
};
852853
}
853854

854-
it('does NOT terminalize on partial assistant message.updated (no time.completed)', async () => {
855+
it('observes activity without terminalizing on partial assistant message.updated', async () => {
855856
const state = createFakeState();
856857
const doContext = createNewPathDOContext();
857858
const handler = createIngestHandler(
@@ -882,6 +883,7 @@ describe('createIngestHandler', () => {
882883

883884
await handler.handleIngestMessage(ws, message);
884885

886+
expect(doContext.observeCorrelatedAgentActivity).toHaveBeenCalledWith('msg_user_111');
885887
expect(doContext.terminalizeSessionMessageOnce).not.toHaveBeenCalled();
886888
});
887889

services/cloud-agent-next/src/websocket/ingest.ts

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ export type IngestDOContext = {
144144
| 'onTerminalEvent'
145145
>;
146146
keepContainerAlive?: () => void;
147+
observeCorrelatedAgentActivity?: (messageId: string) => Promise<void>;
147148
terminalizeSessionMessageOnce?: (
148149
messageId: string,
149150
params: {
@@ -609,18 +610,25 @@ export function createIngestHandler(
609610
const assistantError = getAssistantErrorMessage(info?.error);
610611
const hasError = assistantError !== undefined;
611612
const isTerminal = isCompleted || hasError;
612-
if (info?.role === 'assistant' && typeof info.parentID === 'string' && isTerminal) {
613-
await doContext.terminalizeSessionMessageOnce?.(
614-
info.parentID,
615-
{
616-
kind: hasError ? 'failed' : 'completed',
617-
assistantMessageId: typeof info.id === 'string' ? info.id : undefined,
618-
completionSource: 'assistant_message_event',
619-
reason: hasError ? 'assistant_error' : undefined,
620-
error: assistantError,
621-
},
622-
wrapperRunId
623-
);
613+
const parentMessageId =
614+
info?.role === 'assistant' && typeof info.parentID === 'string'
615+
? info.parentID
616+
: undefined;
617+
if (parentMessageId !== undefined) {
618+
await doContext.observeCorrelatedAgentActivity?.(parentMessageId);
619+
if (isTerminal) {
620+
await doContext.terminalizeSessionMessageOnce?.(
621+
parentMessageId,
622+
{
623+
kind: hasError ? 'failed' : 'completed',
624+
assistantMessageId: typeof info?.id === 'string' ? info.id : undefined,
625+
completionSource: 'assistant_message_event',
626+
reason: hasError ? 'assistant_error' : undefined,
627+
error: assistantError,
628+
},
629+
wrapperRunId
630+
);
631+
}
624632
}
625633
}
626634
}

services/cloud-agent-next/wrangler.jsonc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@
363363
},
364364
{
365365
"binding": "CLOUD_AGENT_REPORT_QUEUE",
366-
"queue": "cloud-agent-next-report-queue",
366+
"queue": "cloud-agent-next-report-queue-dev",
367367
},
368368
],
369369
"consumers": [
@@ -374,9 +374,9 @@
374374
"max_retries": 4,
375375
},
376376
{
377-
"queue": "cloud-agent-next-report-queue",
377+
"queue": "cloud-agent-next-report-queue-dev",
378378
"max_retries": 3,
379-
"dead_letter_queue": "cloud-agent-next-report-queue-dlq",
379+
"dead_letter_queue": "cloud-agent-next-report-queue-dlq-dev",
380380
},
381381
],
382382
},

0 commit comments

Comments
 (0)