Skip to content

Commit da27ca4

Browse files
perf(server): bound the provider runtime-event bus
ProviderService exposes a unified runtime-event stream via a PubSub that internal consumers (ProviderRuntimeIngestion, CheckpointReactor) each subscribe to. The bus was `PubSub.unbounded`, so if any consumer lagged during an event burst its per-subscriber backlog grew without bound, risking unbounded memory growth and eventual OOM under load. Switch to `PubSub.bounded` with a generous capacity. Because runtime events feed event-sourced orchestration ingestion and must never be dropped, this uses the suspending (backpressure) strategy: a slow consumer now applies backpressure up to the adapter stdio reader instead of silently accumulating memory. This keeps behavior predictable under load while preserving at-least-once delivery semantics. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent a0b5d39 commit da27ca4

1 file changed

Lines changed: 14 additions & 1 deletion

File tree

apps/server/src/provider/Layers/ProviderService.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,17 @@ const dieOnMissingBindingInstanceId = (
166166
);
167167
};
168168

169+
// Capacity of the unified runtime-event bus. A bounded PubSub caps the
170+
// per-subscriber backlog so a lagging consumer (e.g. ProviderRuntimeIngestion
171+
// or CheckpointReactor falling behind during a burst) applies backpressure all
172+
// the way up to the adapter stdio reader instead of letting the buffer grow
173+
// without bound and eventually OOM the process. The window is generous enough
174+
// to absorb normal event bursts (streaming deltas, tool calls) while keeping a
175+
// hard ceiling on retained events. Runtime events must never be dropped — they
176+
// feed event-sourced orchestration ingestion — so we use the suspending
177+
// (backpressure) strategy rather than sliding/dropping.
178+
const RUNTIME_EVENT_BUS_CAPACITY = 4096;
179+
169180
const correlateRuntimeEventWithInstance = (
170181
source: {
171182
readonly instanceId: ProviderInstanceId;
@@ -199,7 +210,9 @@ const makeProviderService = Effect.fn("makeProviderService")(function* (
199210

200211
const registry = yield* ProviderAdapterRegistry;
201212
const directory = yield* ProviderSessionDirectory;
202-
const runtimeEventPubSub = yield* PubSub.unbounded<ProviderRuntimeEvent>();
213+
const runtimeEventPubSub = yield* PubSub.bounded<ProviderRuntimeEvent>(
214+
RUNTIME_EVENT_BUS_CAPACITY,
215+
);
203216

204217
const publishRuntimeEvent = (event: ProviderRuntimeEvent): Effect.Effect<void> =>
205218
Effect.succeed(event).pipe(

0 commit comments

Comments
 (0)