1515 */
1616package io .aklivity .zilla .runtime .engine .internal .registry ;
1717
18+ import static io .aklivity .zilla .runtime .engine .EngineConfiguration .ENGINE_WORKER_CAPACITY ;
1819import static io .aklivity .zilla .runtime .engine .budget .BudgetCreditor .NO_BUDGET_ID ;
1920import static io .aklivity .zilla .runtime .engine .concurrent .Signaler .NO_CANCEL_ID ;
2021import static io .aklivity .zilla .runtime .engine .internal .registry .MetricHandlerKind .ORIGIN ;
126127import io .aklivity .zilla .runtime .engine .internal .layouts .metrics .CountersLayout ;
127128import io .aklivity .zilla .runtime .engine .internal .layouts .metrics .GaugesLayout ;
128129import io .aklivity .zilla .runtime .engine .internal .layouts .metrics .HistogramsLayout ;
129- import io .aklivity .zilla .runtime .engine .internal .metrics .EngineWorkerUtilizationMetric ;
130+ import io .aklivity .zilla .runtime .engine .internal .metrics .EngineWorkersCapacityMetric ;
131+ import io .aklivity .zilla .runtime .engine .internal .metrics .EngineWorkersCountMetric ;
132+ import io .aklivity .zilla .runtime .engine .internal .metrics .EngineWorkersUtilizationMetric ;
130133import io .aklivity .zilla .runtime .engine .internal .poller .Poller ;
131134import io .aklivity .zilla .runtime .engine .internal .stream .StreamId ;
132135import io .aklivity .zilla .runtime .engine .internal .stream .Target ;
@@ -233,6 +236,7 @@ public class EngineWorker implements EngineContext, Agent
233236 private final Supplier <MessageReader > supplyEventReader ;
234237 private final EventFormatterFactory eventFormatterFactory ;
235238 private final LongSupplier utilizationMetric ;
239+ private final boolean readonly ;
236240
237241 private long initialId ;
238242 private long promiseId ;
@@ -244,6 +248,7 @@ public class EngineWorker implements EngineContext, Agent
244248
245249 private volatile Thread thread ;
246250
251+
247252 public EngineWorker (
248253 EngineConfiguration config ,
249254 ExecutorService executor ,
@@ -269,6 +274,7 @@ public EngineWorker(
269274 this .configPath = Path .of (config .configURI ());
270275 this .labels = labels ;
271276 this .affinityMask = affinityMask ;
277+ this .readonly = readonly ;
272278
273279 this .supplyIdleStrategy = () -> new BackoffIdleStrategy (
274280 config .maxSpins (),
@@ -301,12 +307,6 @@ public EngineWorker(
301307 metricWriterSuppliers .put (GAUGE , gaugesLayout ::supplyWriter );
302308 metricWriterSuppliers .put (HISTOGRAM , histogramsLayout ::supplyWriter );
303309
304- if (!readonly )
305- {
306- final int metricId = labels .supplyLabelId ("engine.worker.count" );
307- supplyMetricWriter (GAUGE , NO_NAMESPACED_ID , metricId ).accept (1 );
308- }
309-
310310 final StreamsLayout streamsLayout = new StreamsLayout .Builder ()
311311 .path (config .directory ().resolve (String .format ("data%d" , index )))
312312 .streamsCapacity (config .streamsBufferCapacity ())
@@ -467,7 +467,7 @@ public EngineWorker(
467467 this .exportersById = new Long2ObjectHashMap <>();
468468 this .supplyEventReader = supplyEventReader ;
469469 this .eventFormatterFactory = eventFormatterFactory ;
470- this .utilizationMetric = supplyGauge (NO_NAMESPACED_ID , labels .supplyLabelId (EngineWorkerUtilizationMetric .NAME ));
470+ this .utilizationMetric = supplyGauge (NO_NAMESPACED_ID , labels .supplyLabelId (EngineWorkersUtilizationMetric .NAME ));
471471 }
472472
473473 public static int indexOfId (
@@ -780,7 +780,7 @@ public ConverterHandler supplyWriteConverter(
780780 @ Override
781781 public LongConsumer supplyUtilizationMetric ()
782782 {
783- final int metricId = labels .supplyLabelId (EngineWorkerUtilizationMetric .NAME );
783+ final int metricId = labels .supplyLabelId (EngineWorkersUtilizationMetric .NAME );
784784
785785 return supplyMetricWriter (GAUGE , NO_NAMESPACED_ID , metricId );
786786 }
@@ -880,6 +880,26 @@ public int doWork()
880880 return workDone ;
881881 }
882882
883+ @ Override
884+ public void onStart ()
885+ {
886+ if (!readonly )
887+ {
888+ int workersMetricId = labels .supplyLabelId (EngineWorkersCountMetric .NAME );
889+ LongConsumer recordCount = supplyMetricWriter (GAUGE , NO_NAMESPACED_ID , workersMetricId );
890+
891+ int capacityMetricId = labels .supplyLabelId (EngineWorkersCapacityMetric .NAME );
892+ LongConsumer recordCapacity = supplyGaugeWriter (capacityMetricId );
893+
894+ int utilizationMetricId = labels .supplyLabelId (EngineWorkersUtilizationMetric .NAME );
895+ LongConsumer recordUtilization = supplyGaugeWriter (utilizationMetricId );
896+
897+ recordCount .accept (1 );
898+ recordCapacity .accept (ENGINE_WORKER_CAPACITY .getAsInt (config ));
899+ recordUtilization .accept (0 );
900+ }
901+ }
902+
883903 @ Override
884904 public void onClose ()
885905 {
@@ -927,10 +947,13 @@ public void onClose()
927947 acquiredBuffers , acquiredCreditors , acquiredDebitors ));
928948 }
929949
930- long utilization = utilizationMetric .getAsLong ();
931- if (utilization != 0L )
950+ if (!readonly )
932951 {
933- throw new IllegalStateException ("Engine worker utilization is non-zero: %d" .formatted (utilization ));
952+ long utilization = utilizationMetric .getAsLong ();
953+ if (utilization != 0L )
954+ {
955+ throw new IllegalStateException ("Engine worker utilization is non-zero: %d" .formatted (utilization ));
956+ }
934957 }
935958 }
936959
@@ -1031,6 +1054,12 @@ public Metric resolveMetric(
10311054 return metricGroupsByName .get (metricGroupName ).supply (metricName );
10321055 }
10331056
1057+ public LongConsumer supplyGaugeWriter (
1058+ long metricId )
1059+ {
1060+ return gaugesLayout .supplyWriter (NO_NAMESPACED_ID , metricId );
1061+ }
1062+
10341063 // required for testing
10351064 public LongConsumer supplyCounterWriter (
10361065 long bindingId ,
0 commit comments