Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion apps/server/src/provider/Layers/ProviderService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ const dieOnMissingBindingInstanceId = (
);
};

// Capacity of the unified runtime-event bus. A bounded PubSub caps the
// per-subscriber backlog so a lagging consumer (e.g. ProviderRuntimeIngestion
// or CheckpointReactor falling behind during a burst) applies backpressure all
// the way up to the adapter stdio reader instead of letting the buffer grow
// without bound and eventually OOM the process. The window is generous enough
// to absorb normal event bursts (streaming deltas, tool calls) while keeping a
// hard ceiling on retained events. Runtime events must never be dropped — they
// feed event-sourced orchestration ingestion — so we use the suspending
// (backpressure) strategy rather than sliding/dropping.
const RUNTIME_EVENT_BUS_CAPACITY = 4096;

const correlateRuntimeEventWithInstance = (
source: {
readonly instanceId: ProviderInstanceId;
Expand Down Expand Up @@ -199,7 +210,9 @@ const makeProviderService = Effect.fn("makeProviderService")(function* (

const registry = yield* ProviderAdapterRegistry;
const directory = yield* ProviderSessionDirectory;
const runtimeEventPubSub = yield* PubSub.unbounded<ProviderRuntimeEvent>();
const runtimeEventPubSub = yield* PubSub.bounded<ProviderRuntimeEvent>(
RUNTIME_EVENT_BUS_CAPACITY,
);

const publishRuntimeEvent = (event: ProviderRuntimeEvent): Effect.Effect<void> =>
Effect.succeed(event).pipe(
Expand Down
Loading