Skip to content

Commit b38b7f6

Browse files
authored
fix(agent): Flush proxy ingest uploads live (#3042)
1 parent d73302a commit b38b7f6

2 files changed

Lines changed: 47 additions & 1 deletion

File tree

packages/agent/src/server/event-stream-sender.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,49 @@ describe("TaskRunEventStreamSender", () => {
208208
expect(lastCall[0]).not.toContain("/api/projects/");
209209
});
210210

211+
it("uses a short stream window for agent-proxy ingest so buffered uploads commit live events", async () => {
212+
vi.useFakeTimers();
213+
try {
214+
const requestBodies: string[] = [];
215+
let activeStreamClosed = false;
216+
const fetchMock = vi.fn(
217+
async (_url: string | URL | Request, init?: RequestInit) => {
218+
if (!init?.body || typeof init.body === "string") {
219+
return responseForBody(await readRequestBody(init));
220+
}
221+
222+
const body = await readRequestBody(init);
223+
activeStreamClosed = true;
224+
requestBodies.push(body);
225+
return responseForBody(body);
226+
},
227+
);
228+
vi.stubGlobal("fetch", fetchMock);
229+
230+
const sender = createSender({
231+
eventIngestBaseUrl: "http://agent-proxy:8003/",
232+
});
233+
234+
sender.enqueue({
235+
type: "notification",
236+
notification: { method: "first" },
237+
});
238+
await vi.advanceTimersByTimeAsync(0);
239+
240+
expect(fetchMock).toHaveBeenCalledTimes(2);
241+
expect(activeStreamClosed).toBe(false);
242+
243+
await vi.advanceTimersByTimeAsync(1_000);
244+
245+
expect(activeStreamClosed).toBe(true);
246+
expect(eventSequences(requestBodies[0])).toEqual([1]);
247+
248+
await sender.stop();
249+
} finally {
250+
vi.useRealTimers();
251+
}
252+
});
253+
211254
it("keeps the active ingest request open across scheduled flushes", async () => {
212255
const requestBodies: string[] = [];
213256
let activeStreamClosed = false;

packages/agent/src/server/event-stream-sender.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ const DEFAULT_RETRY_DELAY_MS = 1_000;
5656
const DEFAULT_REQUEST_TIMEOUT_MS = 10_000;
5757
const DEFAULT_STOP_TIMEOUT_MS = 30_000;
5858
const DEFAULT_STREAM_WINDOW_MS = 5 * 60 * 1_000;
59+
const DEFAULT_PROXY_STREAM_WINDOW_MS = 1_000;
5960
const STREAM_COMPLETE_CONTROL_TYPE = "_posthog/stream_complete";
6061

6162
export class TaskRunEventStreamSender {
@@ -111,7 +112,9 @@ export class TaskRunEventStreamSender {
111112
this.requestTimeoutMs =
112113
config.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT_MS;
113114
this.stopTimeoutMs = config.stopTimeoutMs ?? DEFAULT_STOP_TIMEOUT_MS;
114-
this.streamWindowMs = config.streamWindowMs ?? DEFAULT_STREAM_WINDOW_MS;
115+
this.streamWindowMs =
116+
config.streamWindowMs ??
117+
(usingProxy ? DEFAULT_PROXY_STREAM_WINDOW_MS : DEFAULT_STREAM_WINDOW_MS);
115118
this.createStreamingUpload =
116119
config.createStreamingUpload ?? createNodeStreamingUpload;
117120
}

0 commit comments

Comments
 (0)