Skip to content

Commit eb2f0a2

Browse files
claudeericallam
authored andcommitted
fix: gate input stream tail connection on v2 realtime streams
Input streams are backed by S2 which is only available with v2 realtime streams. The lazy tail connection now checks that streamsVersion is "v2" before connecting, so runs using v1 (Redis-backed) streams won't attempt to open an S2 tail. https://claude.ai/code/session_01SJHJts7r2yAxmoKLLz8vpc
1 parent a23de3a commit eb2f0a2

File tree

6 files changed

+14
-10
lines changed

6 files changed

+14
-10
lines changed

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ const zodIpc = new ZodIpcConnection({
391391
}
392392

393393
resetExecutionEnvironment();
394-
standardInputStreamManager.setRunId(execution.run.id);
394+
standardInputStreamManager.setRunId(execution.run.id, execution.run.realtimeStreamsVersion);
395395

396396
standardTraceContextManager.traceContext = traceContext;
397397
standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics, isWarmStart);

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ const zodIpc = new ZodIpcConnection({
375375
}
376376

377377
resetExecutionEnvironment();
378-
standardInputStreamManager.setRunId(execution.run.id);
378+
standardInputStreamManager.setRunId(execution.run.id, execution.run.realtimeStreamsVersion);
379379

380380
standardTraceContextManager.traceContext = traceContext;
381381

packages/core/src/v3/inputStreams/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ export class InputStreamsAPI implements InputStreamManager {
2828
return getGlobal(API_NAME) ?? NOOP_MANAGER;
2929
}
3030

31-
public setRunId(runId: string): void {
32-
this.#getManager().setRunId(runId);
31+
public setRunId(runId: string, streamsVersion?: string): void {
32+
this.#getManager().setRunId(runId, streamsVersion);
3333
}
3434

3535
public on(

packages/core/src/v3/inputStreams/manager.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,17 @@ export class StandardInputStreamManager implements InputStreamManager {
2727
private tailAbortController: AbortController | null = null;
2828
private tailPromise: Promise<void> | null = null;
2929
private currentRunId: string | null = null;
30+
private streamsVersion: string | undefined;
3031

3132
constructor(
3233
private apiClient: ApiClient,
3334
private baseUrl: string,
3435
private debug: boolean = false
3536
) {}
3637

37-
setRunId(runId: string): void {
38+
setRunId(runId: string, streamsVersion?: string): void {
3839
this.currentRunId = runId;
40+
this.streamsVersion = streamsVersion;
3941
}
4042

4143
on(streamId: string, handler: InputStreamHandler): { off: () => void } {
@@ -152,6 +154,7 @@ export class StandardInputStreamManager implements InputStreamManager {
152154
reset(): void {
153155
this.disconnect();
154156
this.currentRunId = null;
157+
this.streamsVersion = undefined;
155158
this.handlers.clear();
156159

157160
// Reject all pending once waiters
@@ -168,7 +171,7 @@ export class StandardInputStreamManager implements InputStreamManager {
168171
}
169172

170173
#ensureTailConnected(): void {
171-
if (!this.tailAbortController && this.currentRunId) {
174+
if (!this.tailAbortController && this.currentRunId && this.streamsVersion === "v2") {
172175
this.connectTail(this.currentRunId);
173176
}
174177
}

packages/core/src/v3/inputStreams/noopManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { InputStreamManager } from "./types.js";
22
import { InputStreamOnceOptions } from "../realtimeStreams/types.js";
33

44
export class NoopInputStreamManager implements InputStreamManager {
5-
setRunId(_runId: string): void {}
5+
setRunId(_runId: string, _streamsVersion?: string): void {}
66

77
on(_streamId: string, _handler: (data: unknown) => void | Promise<void>): { off: () => void } {
88
return { off: () => {} };

packages/core/src/v3/inputStreams/types.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ import { InputStreamOnceOptions } from "../realtimeStreams/types.js";
22

33
export interface InputStreamManager {
44
/**
5-
* Set the current run ID. The tail connection will be established lazily
6-
* when `on()` or `once()` is first called.
5+
* Set the current run ID and streams version. The tail connection will be
6+
* established lazily when `on()` or `once()` is first called, but only
7+
* for v2 (S2-backed) realtime streams.
78
*/
8-
setRunId(runId: string): void;
9+
setRunId(runId: string, streamsVersion?: string): void;
910

1011
/**
1112
* Register a handler that fires every time data arrives on the given input stream.

0 commit comments

Comments
 (0)