diff --git a/apps/server/src/provider/Layers/ProviderService.ts b/apps/server/src/provider/Layers/ProviderService.ts index c4d1dc153f4..e7534127ddd 100644 --- a/apps/server/src/provider/Layers/ProviderService.ts +++ b/apps/server/src/provider/Layers/ProviderService.ts @@ -166,6 +166,17 @@ const dieOnMissingBindingInstanceId = ( ); }; +// Capacity of the unified runtime-event bus. A bounded PubSub caps the +// per-subscriber backlog so a lagging consumer (e.g. ProviderRuntimeIngestion +// or CheckpointReactor falling behind during a burst) applies backpressure all +// the way up to the adapter stdio reader instead of letting the buffer grow +// without bound and eventually OOM the process. The window is generous enough +// to absorb normal event bursts (streaming deltas, tool calls) while keeping a +// hard ceiling on retained events. Runtime events must never be dropped — they +// feed event-sourced orchestration ingestion — so we use the suspending +// (backpressure) strategy rather than sliding/dropping. +const RUNTIME_EVENT_BUS_CAPACITY = 4096; + const correlateRuntimeEventWithInstance = ( source: { readonly instanceId: ProviderInstanceId; @@ -199,7 +210,9 @@ const makeProviderService = Effect.fn("makeProviderService")(function* ( const registry = yield* ProviderAdapterRegistry; const directory = yield* ProviderSessionDirectory; - const runtimeEventPubSub = yield* PubSub.unbounded(); + const runtimeEventPubSub = yield* PubSub.bounded( + RUNTIME_EVENT_BUS_CAPACITY, + ); const publishRuntimeEvent = (event: ProviderRuntimeEvent): Effect.Effect => Effect.succeed(event).pipe(