Skip to content

Commit 7118369

Browse files
committed
perf(sessions): coalesce each flush batch into one store append per notification run
Every 16ms flush handed its buffered events to handleSessionEvent one at a time, so a 15-event burst did 15 setState calls through the session store — each one an O(transcript) walk (session-map copy + subscriber notification fan-out). On a 30k-event transcript that is ~248ms of main-thread time per second of heavy streaming, burned before React paints anything. handleSessionEvents now coalesces runs of plain notifications (agent message chunks, tool-call updates — the bulk of a turn) into a single appendEvents call per flush. Events carrying a JSON-RPC id (user-prompt echoes, stop-reason responses) still go through handleSessionEvent individually, so turn-lifecycle semantics are untouched; global order is preserved by flushing the pending run before each id-carrying event. The notification side effects (config/usage/adapter/compaction updates) split into applyNotificationEffects so both paths share them. Measured with scripts/measure-append-hotpath.ts (real store module, V8 via tsx): 60 flushes x 15 events over a 30k-event transcript drops from ~248ms to ~26ms per streamed second (~10x); store subscriber notifications drop from 900/s to 60/s. Generated-By: PostHog Code Task-Id: e3bcc511-15af-4f46-a3c4-7b29e65f026b
1 parent a88f688 commit 7118369

3 files changed

Lines changed: 238 additions & 8 deletions

File tree

packages/core/src/sessions/sessionEventBatching.test.ts

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,35 @@ function chunkText(event: AcpMessage): string {
3232
return params.update.content.text;
3333
}
3434

35+
/** The renderer-side echo of a user prompt — a JSON-RPC request (carries an
36+
* id), so it must not be coalesced with plain notifications. */
37+
function promptEcho(id: number, text: string): AcpMessage {
38+
return {
39+
ts: 1,
40+
message: {
41+
jsonrpc: "2.0",
42+
id,
43+
method: "session/prompt",
44+
params: {
45+
sessionId: RUN_ID,
46+
prompt: [{ type: "text", text }],
47+
},
48+
},
49+
} as unknown as AcpMessage;
50+
}
51+
52+
/** The agent's terminal response for a prompt — completes the turn. */
53+
function stopResponse(id: number): AcpMessage {
54+
return {
55+
ts: 2,
56+
message: {
57+
jsonrpc: "2.0",
58+
id,
59+
result: { stopReason: "end_turn" },
60+
},
61+
} as unknown as AcpMessage;
62+
}
63+
3564
function createHarness() {
3665
const sessions: Record<string, AgentSession> = {
3766
[RUN_ID]: {
@@ -61,8 +90,11 @@ function createHarness() {
6190
sessions[session.taskRunId] = session;
6291
},
6392
updateSession: (taskRunId: string, updates: Partial<AgentSession>) => {
93+
// Like the real store: produce a NEW session object per update, so a
94+
// reference captured before the update keeps its pre-update values
95+
// (handleSessionEvent's stop-reason check depends on that).
6496
const session = sessions[taskRunId];
65-
if (session) Object.assign(session, updates);
97+
if (session) sessions[taskRunId] = { ...session, ...updates };
6698
},
6799
appendEvents,
68100
replaceOptimisticWithEvent: vi.fn(),
@@ -80,10 +112,11 @@ function createHarness() {
80112
debug: vi.fn(),
81113
};
82114

115+
const notifyPromptComplete = vi.fn();
83116
const deps = {
84117
store,
85118
log: noopLog,
86-
notifyPromptComplete: vi.fn(),
119+
notifyPromptComplete,
87120
notifyPermissionRequest: vi.fn(),
88121
taskViewedApi: { markActivity: vi.fn() },
89122
getPersistedConfigOptions: () => undefined,
@@ -120,6 +153,7 @@ function createHarness() {
120153
return {
121154
service,
122155
appendEvents,
156+
notifyPromptComplete,
123157
emit: (event: AcpMessage) => onEvent?.(event),
124158
events: () => sessions[RUN_ID].events,
125159
};
@@ -149,6 +183,60 @@ describe("streamed event batching", () => {
149183
expect(h.events().map(chunkText)).toEqual(["a", "b", "c"]);
150184
});
151185

186+
it("coalesces a run of plain notifications into one appendEvents call", () => {
187+
const h = createHarness();
188+
189+
h.emit(chunk("a"));
190+
h.emit(chunk("b"));
191+
h.emit(chunk("c"));
192+
vi.advanceTimersByTime(FLUSH_MS);
193+
194+
// One store write for the whole run — not one per event. Each write costs
195+
// O(transcript) (map copy + subscriber fan-out), so this is the batching
196+
// that keeps big transcripts smooth during streaming.
197+
expect(h.appendEvents).toHaveBeenCalledTimes(1);
198+
expect(h.appendEvents.mock.calls[0][1].map(chunkText)).toEqual([
199+
"a",
200+
"b",
201+
"c",
202+
]);
203+
});
204+
205+
it("an id-carrying event splits the run but preserves global order", () => {
206+
const h = createHarness();
207+
208+
h.emit(chunk("a"));
209+
h.emit(chunk("b"));
210+
h.emit(promptEcho(7, "user steer-less prompt"));
211+
h.emit(chunk("c"));
212+
vi.advanceTimersByTime(FLUSH_MS);
213+
214+
// Two coalesced appends around the individually-handled echo.
215+
expect(h.appendEvents).toHaveBeenCalledTimes(2);
216+
expect(h.appendEvents.mock.calls[0][1].map(chunkText)).toEqual(["a", "b"]);
217+
expect(h.appendEvents.mock.calls[1][1].map(chunkText)).toEqual(["c"]);
218+
expect(h.events().map(chunkText)).toEqual(["a", "b", "c"]);
219+
});
220+
221+
it("a stop-reason response batched with chunks still completes the turn", () => {
222+
const h = createHarness();
223+
224+
// Turn starts: the echo claims currentPromptId…
225+
h.emit(promptEcho(9, "do the thing"));
226+
// …streams some content…
227+
h.emit(chunk("a"));
228+
h.emit(chunk("b"));
229+
// …and finishes, all within one flush window.
230+
h.emit(stopResponse(9));
231+
vi.advanceTimersByTime(FLUSH_MS);
232+
233+
// Transcript keeps everything appendable: the two chunks plus the
234+
// response itself (the echo goes through replaceOptimisticWithEvent).
235+
expect(h.events()).toHaveLength(3);
236+
expect(h.events().slice(0, 2).map(chunkText)).toEqual(["a", "b"]);
237+
expect(h.notifyPromptComplete).toHaveBeenCalledTimes(1);
238+
});
239+
152240
it("flushes buffered events synchronously on teardown", () => {
153241
const h = createHarness();
154242

packages/core/src/sessions/sessionService.ts

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1462,9 +1462,7 @@ export class SessionService {
14621462
const batches = this.pendingSessionEvents;
14631463
this.pendingSessionEvents = new Map();
14641464
for (const [taskRunId, events] of batches) {
1465-
for (const acpMsg of events) {
1466-
this.handleSessionEvent(taskRunId, acpMsg);
1467-
}
1465+
this.handleSessionEvents(taskRunId, events);
14681466
}
14691467
}
14701468

@@ -1474,9 +1472,7 @@ export class SessionService {
14741472
const events = this.pendingSessionEvents.get(taskRunId);
14751473
if (!events) return;
14761474
this.pendingSessionEvents.delete(taskRunId);
1477-
for (const acpMsg of events) {
1478-
this.handleSessionEvent(taskRunId, acpMsg);
1479-
}
1475+
this.handleSessionEvents(taskRunId, events);
14801476
}
14811477

14821478
// --- Transcript residency (memory eviction) ---
@@ -1886,6 +1882,50 @@ export class SessionService {
18861882
}
18871883
}
18881884

1885+
/** Apply one flush batch for a task. Runs of plain notifications (message
1886+
* chunks, tool-call updates — the bulk of a streamed turn) are appended to
1887+
* the store in a single `appendEvents` call: every store write costs
1888+
* O(transcript) (session-map copy plus subscriber notification fan-out), so
1889+
* writing per event made a 15-event flush walk the whole transcript 15
1890+
* times. Events carrying a JSON-RPC `id` (user-prompt echoes, stop-reason
1891+
* responses) keep going through `handleSessionEvent` one at a time, so
1892+
* turn-lifecycle semantics are untouched. Global event order is preserved:
1893+
* a run is flushed before any id-carrying event is handled. */
1894+
private handleSessionEvents(taskRunId: string, acpMsgs: AcpMessage[]): void {
1895+
let run: AcpMessage[] = [];
1896+
1897+
const flushRun = () => {
1898+
if (run.length === 0) return;
1899+
const batch = run;
1900+
run = [];
1901+
const session = this.d.store.getSessions()[taskRunId];
1902+
if (!session) return;
1903+
// Once the agent starts responding, clear initialPrompt so that retry
1904+
// reconnects to this session instead of creating a new one.
1905+
if (session.initialPrompt?.length) {
1906+
this.d.store.updateSession(taskRunId, { initialPrompt: undefined });
1907+
}
1908+
this.d.store.appendEvents(taskRunId, batch);
1909+
this.updatePromptStateFromEvents(taskRunId, batch, { isLive: true });
1910+
for (const acpMsg of batch) {
1911+
this.applyNotificationEffects(taskRunId, acpMsg, session);
1912+
}
1913+
};
1914+
1915+
for (const acpMsg of acpMsgs) {
1916+
// Notifications carry no JSON-RPC id; requests and responses do. Only
1917+
// id-less events are safe to coalesce: they are pure appends plus
1918+
// field-level side effects, with no turn-lifecycle branching.
1919+
if ("id" in acpMsg.message) {
1920+
flushRun();
1921+
this.handleSessionEvent(taskRunId, acpMsg);
1922+
} else {
1923+
run.push(acpMsg);
1924+
}
1925+
}
1926+
flushRun();
1927+
}
1928+
18891929
private handleSessionEvent(taskRunId: string, acpMsg: AcpMessage): void {
18901930
const session = this.d.store.getSessions()[taskRunId];
18911931
if (!session) return;
@@ -1947,6 +1987,20 @@ export class SessionService {
19471987
this.d.taskViewedApi.markActivity(session.taskId);
19481988
}
19491989

1990+
this.applyNotificationEffects(taskRunId, acpMsg, session);
1991+
}
1992+
1993+
/** Side effects carried by session notifications (config, usage, adapter,
1994+
* compaction status). Split out of `handleSessionEvent` so a coalesced run
1995+
* of notifications (see `handleSessionEvents`) shares it; no-op for
1996+
* requests/responses, which carry no `method`. */
1997+
private applyNotificationEffects(
1998+
taskRunId: string,
1999+
acpMsg: AcpMessage,
2000+
session: AgentSession,
2001+
): void {
2002+
const msg = acpMsg.message;
2003+
19502004
if ("method" in msg && msg.method === "session/update" && "params" in msg) {
19512005
const params = msg.params as {
19522006
update?: {

scripts/measure-append-hotpath.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Mide el hot path real de appendEvents (módulo real, sin réplicas):
2+
// 1 segundo de streaming pesado = 60 flushes x 15 eventos sobre un transcript
3+
// de N eventos. Correr en main y en la rama para comparar.
4+
// bun scripts/measure-append-hotpath.ts
5+
import type { AcpMessage, AgentSession } from "@posthog/shared";
6+
import { sessionStoreSetters } from "../packages/core/src/sessions/sessionStore";
7+
8+
const RUN_ID = "run-bench";
9+
let seq = 0;
10+
function makeEvent(): AcpMessage {
11+
seq++;
12+
const message =
13+
seq % 12 === 0
14+
? {
15+
jsonrpc: "2.0",
16+
method: "session/update",
17+
params: {
18+
sessionId: RUN_ID,
19+
update: {
20+
sessionUpdate: "tool_call_update",
21+
toolCallId: `tool-${seq % 40}`,
22+
status: "in_progress",
23+
rawInput: { file_path: "/src/app.ts", content: "x".repeat(2048) },
24+
},
25+
},
26+
}
27+
: {
28+
jsonrpc: "2.0",
29+
method: "session/update",
30+
params: {
31+
sessionId: RUN_ID,
32+
update: {
33+
sessionUpdate: "agent_message_chunk",
34+
content: { type: "text", text: "palabra ".repeat(10) },
35+
},
36+
},
37+
};
38+
return { ts: 1700000000000 + seq, message } as unknown as AcpMessage;
39+
}
40+
41+
function seedSession(n: number): void {
42+
const events: AcpMessage[] = [];
43+
for (let i = 0; i < n; i++) events.push(Object.freeze(makeEvent()));
44+
sessionStoreSetters.setSession({
45+
taskRunId: RUN_ID,
46+
taskId: "task-bench",
47+
events,
48+
messageQueue: [],
49+
pendingPermissions: new Map(),
50+
status: "connected",
51+
} as unknown as AgentSession);
52+
}
53+
54+
const FRAMES = 60;
55+
const PER_FRAME = 15;
56+
57+
function run(perEvent: boolean): number {
58+
const batches: AcpMessage[][] = [];
59+
for (let f = 0; f < FRAMES; f++) {
60+
const b: AcpMessage[] = [];
61+
for (let e = 0; e < PER_FRAME; e++) b.push(makeEvent());
62+
batches.push(b);
63+
}
64+
const t0 = performance.now();
65+
for (const batch of batches) {
66+
if (perEvent) {
67+
for (const ev of batch) sessionStoreSetters.appendEvents(RUN_ID, [ev]);
68+
} else {
69+
sessionStoreSetters.appendEvents(RUN_ID, batch);
70+
}
71+
}
72+
return performance.now() - t0;
73+
}
74+
75+
for (const n of [10_000, 30_000]) {
76+
for (const perEvent of [true, false]) {
77+
const label = perEvent ? "per-evento x15" : "batcheado x1 ";
78+
const times: number[] = [];
79+
for (let rep = 0; rep < 7; rep++) {
80+
seedSession(n);
81+
times.push(run(perEvent));
82+
}
83+
times.sort((a, b) => a - b);
84+
console.log(
85+
`transcript ${n.toLocaleString()} | ${label} | mediana ${times[3].toFixed(1)} ms/seg-streaming (min ${times[0].toFixed(1)}, max ${times[6].toFixed(1)})`,
86+
);
87+
}
88+
}

0 commit comments

Comments
 (0)