diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpCapacityTracker.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpCapacityTracker.java index daaca5be55..8e3fe9acd6 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpCapacityTracker.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpCapacityTracker.java @@ -15,7 +15,7 @@ */ package io.aklivity.zilla.runtime.binding.tcp.internal; -import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_WORKER_CAPACITY; +import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_WORKER_CAPACITY_LIMIT; import java.util.function.LongConsumer; @@ -32,7 +32,7 @@ public TcpCapacityTracker( TcpConfiguration config, EngineContext context) { - this.capacity = ENGINE_WORKER_CAPACITY.getAsInt(config); + this.capacity = ENGINE_WORKER_CAPACITY_LIMIT.getAsInt(config); this.recordUsage = context.supplyUtilizationMetric(); } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java index 6fefe09c8b..9e7737cc30 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java @@ -60,6 +60,8 @@ public class EngineConfiguration extends Configuration public static final PropertyDef ENGINE_CACHE_DIRECTORY; public static final PropertyDef ENGINE_HOST_RESOLVER; public static final IntPropertyDef ENGINE_WORKER_CAPACITY; + public static final IntPropertyDef ENGINE_WORKER_CAPACITY_LIMIT; + public static final BooleanPropertyDef ENGINE_WORKER_CAPACITY_UNBOUNDED; public static final DoublePropertyDef ENGINE_MEMORY_PERCENTAGE; public static final DoublePropertyDef ENGINE_DISK_PERCENTAGE; public static final IntPropertyDef ENGINE_BUFFER_POOL_CAPACITY; @@ -111,6 +113,8 @@ public class EngineConfiguration extends Configuration ENGINE_MEMORY_PERCENTAGE = config.property("memory.percentage", 0.25); ENGINE_DISK_PERCENTAGE = config.property("disk.percentage", 0.75); ENGINE_WORKER_CAPACITY = config.property("worker.capacity", EngineConfiguration::defaultWorkerCapacity); + ENGINE_WORKER_CAPACITY_UNBOUNDED = config.property("worker.capacity.unbounded", false); + ENGINE_WORKER_CAPACITY_LIMIT = config.property("worker.capacity.limit", EngineConfiguration::defaultWorkerCapacityLimit); ENGINE_BUFFER_POOL_CAPACITY = config.property("buffer.pool.capacity", EngineConfiguration::defaultBufferPoolCapacity); ENGINE_BUFFER_SLOT_CAPACITY = config.property("buffer.slot.capacity", 32 * 1024); ENGINE_STREAMS_BUFFER_CAPACITY = config.property("streams.buffer.capacity", @@ -426,6 +430,14 @@ private static int defaultWorkerCapacity( return newWorkersCapacity; } + private static int defaultWorkerCapacityLimit( + Configuration config) + { + return ENGINE_WORKER_CAPACITY_UNBOUNDED.get(config) + ? Integer.MAX_VALUE + : ENGINE_WORKER_CAPACITY.get(config); + } + private static URL configURL( Configuration config, String url) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java index 63ab53aaf5..b79419a8eb 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java @@ -15,7 +15,7 @@ */ package io.aklivity.zilla.runtime.engine.internal.registry; -import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_WORKER_CAPACITY; +import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_WORKER_CAPACITY_LIMIT; import static io.aklivity.zilla.runtime.engine.budget.BudgetCreditor.NO_BUDGET_ID; import static io.aklivity.zilla.runtime.engine.concurrent.Signaler.NO_CANCEL_ID; import static io.aklivity.zilla.runtime.engine.internal.registry.MetricHandlerKind.ORIGIN; @@ -895,7 +895,7 @@ public void onStart() LongConsumer recordUtilization = supplyGaugeWriter(utilizationMetricId); recordCount.accept(1); - recordCapacity.accept(ENGINE_WORKER_CAPACITY.getAsInt(config)); + recordCapacity.accept(ENGINE_WORKER_CAPACITY_LIMIT.getAsInt(config)); recordUtilization.accept(0); } }