Skip to content

Commit a23de3a

Browse files
claudeericallam
authored andcommitted
refactor: connect input stream tail lazily from listener side
Instead of relying on an INPUT_STREAM_CREATED IPC message from the platform (which was never wired up), the worker now connects the SSE tail automatically when task code first calls .on() or .once() on an input stream. The worker receives the run ID via setRunId() at execution start, and #ensureTailConnected() opens the tail on demand. This eliminates the need for platform→coordinator→executor→worker IPC signaling and ensures no race conditions since the SSE tail reads from the beginning of the __input stream. https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc
1 parent 1075894 commit a23de3a

File tree

7 files changed

+32
-12
lines changed

7 files changed

+32
-12
lines changed

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ const zodIpc = new ZodIpcConnection({
391391
}
392392

393393
resetExecutionEnvironment();
394+
standardInputStreamManager.setRunId(execution.run.id);
394395

395396
standardTraceContextManager.traceContext = traceContext;
396397
standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics, isWarmStart);
@@ -645,9 +646,6 @@ const zodIpc = new ZodIpcConnection({
645646
RESOLVE_WAITPOINT: async ({ waitpoint }) => {
646647
_sharedWorkerRuntime?.resolveWaitpoints([waitpoint]);
647648
},
648-
INPUT_STREAM_CREATED: async ({ runId }) => {
649-
standardInputStreamManager.connectTail(runId);
650-
},
651649
},
652650
});
653651

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@ const zodIpc = new ZodIpcConnection({
375375
}
376376

377377
resetExecutionEnvironment();
378+
standardInputStreamManager.setRunId(execution.run.id);
378379

379380
standardTraceContextManager.traceContext = traceContext;
380381

@@ -638,9 +639,6 @@ const zodIpc = new ZodIpcConnection({
638639
RESOLVE_WAITPOINT: async ({ waitpoint }) => {
639640
_sharedWorkerRuntime?.resolveWaitpoints([waitpoint]);
640641
},
641-
INPUT_STREAM_CREATED: async ({ runId }) => {
642-
standardInputStreamManager.connectTail(runId);
643-
},
644642
},
645643
});
646644

packages/core/src/v3/inputStreams/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ export class InputStreamsAPI implements InputStreamManager {
2828
return getGlobal(API_NAME) ?? NOOP_MANAGER;
2929
}
3030

31+
public setRunId(runId: string): void {
32+
this.#getManager().setRunId(runId);
33+
}
34+
3135
public on(
3236
streamId: string,
3337
handler: (data: unknown) => void | Promise<void>

packages/core/src/v3/inputStreams/manager.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,18 @@ export class StandardInputStreamManager implements InputStreamManager {
2626
private buffer = new Map<string, unknown[]>();
2727
private tailAbortController: AbortController | null = null;
2828
private tailPromise: Promise<void> | null = null;
29+
private currentRunId: string | null = null;
2930

3031
constructor(
3132
private apiClient: ApiClient,
3233
private baseUrl: string,
3334
private debug: boolean = false
3435
) {}
3536

37+
setRunId(runId: string): void {
38+
this.currentRunId = runId;
39+
}
40+
3641
on(streamId: string, handler: InputStreamHandler): { off: () => void } {
3742
let handlerSet = this.handlers.get(streamId);
3843
if (!handlerSet) {
@@ -41,6 +46,9 @@ export class StandardInputStreamManager implements InputStreamManager {
4146
}
4247
handlerSet.add(handler);
4348

49+
// Lazily connect the tail on first listener registration
50+
this.#ensureTailConnected();
51+
4452
// Flush any buffered data for this stream
4553
const buffered = this.buffer.get(streamId);
4654
if (buffered && buffered.length > 0) {
@@ -61,6 +69,9 @@ export class StandardInputStreamManager implements InputStreamManager {
6169
}
6270

6371
once(streamId: string, options?: InputStreamOnceOptions): Promise<unknown> {
72+
// Lazily connect the tail on first listener registration
73+
this.#ensureTailConnected();
74+
6475
// Check buffer first
6576
const buffered = this.buffer.get(streamId);
6677
if (buffered && buffered.length > 0) {
@@ -140,6 +151,7 @@ export class StandardInputStreamManager implements InputStreamManager {
140151

141152
reset(): void {
142153
this.disconnect();
154+
this.currentRunId = null;
143155
this.handlers.clear();
144156

145157
// Reject all pending once waiters
@@ -155,6 +167,12 @@ export class StandardInputStreamManager implements InputStreamManager {
155167
this.buffer.clear();
156168
}
157169

170+
#ensureTailConnected(): void {
171+
if (!this.tailAbortController && this.currentRunId) {
172+
this.connectTail(this.currentRunId);
173+
}
174+
}
175+
158176
async #runTail(runId: string, signal: AbortSignal): Promise<void> {
159177
try {
160178
const stream = await this.apiClient.fetchStream<InputStreamRecord>(

packages/core/src/v3/inputStreams/noopManager.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import { InputStreamManager } from "./types.js";
22
import { InputStreamOnceOptions } from "../realtimeStreams/types.js";
33

44
export class NoopInputStreamManager implements InputStreamManager {
5+
setRunId(_runId: string): void {}
6+
57
on(_streamId: string, _handler: (data: unknown) => void | Promise<void>): { off: () => void } {
68
return { off: () => {} };
79
}

packages/core/src/v3/inputStreams/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import { InputStreamOnceOptions } from "../realtimeStreams/types.js";
22

33
export interface InputStreamManager {
4+
/**
5+
* Set the current run ID. The tail connection will be established lazily
6+
* when `on()` or `once()` is first called.
7+
*/
8+
setRunId(runId: string): void;
9+
410
/**
511
* Register a handler that fires every time data arrives on the given input stream.
612
*/

packages/core/src/v3/schemas/messages.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -229,12 +229,6 @@ export const WorkerToExecutorMessageCatalog = {
229229
waitpoint: CompletedWaitpoint,
230230
}),
231231
},
232-
INPUT_STREAM_CREATED: {
233-
message: z.object({
234-
version: z.literal("v1").default("v1"),
235-
runId: z.string(),
236-
}),
237-
},
238232
};
239233

240234
export const ProviderToPlatformMessages = {

0 commit comments

Comments
 (0)