From 0c281f487adf2aeea658ea95755520f95b0a9396 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Mon, 5 May 2025 23:55:24 +0500 Subject: [PATCH 01/20] Tcp capacity usage report --- .../binding/tcp/internal/CapacityTracker.java | 78 +++++++++++++++++++ .../tcp/internal/TcpBindingContext.java | 7 +- .../binding/tcp/internal/TcpEventContext.java | 25 ++++++ .../tcp/internal/TcpEventFormatter.java | 11 ++- .../tcp/internal/stream/TcpClientFactory.java | 24 +++--- .../tcp/internal/stream/TcpClientRouter.java | 16 ++-- .../tcp/internal/stream/TcpServerFactory.java | 18 +++-- .../tcp/internal/stream/TcpServerRouter.java | 13 ++-- .../src/main/resources/META-INF/zilla/tcp.idl | 9 ++- 9 files changed, 163 insertions(+), 38 deletions(-) create mode 100644 runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/CapacityTracker.java diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/CapacityTracker.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/CapacityTracker.java new file mode 100644 index 0000000000..72543c0c37 --- /dev/null +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/CapacityTracker.java @@ -0,0 +1,78 @@ +/* + * Copyright 2021-2024 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.tcp.internal; + +import org.agrona.collections.MutableInteger; + +public final class CapacityTracker +{ + private final MutableInteger capacity; + private final TcpEventContext eventContext; + private final int initialCapacity; + + private int capacityPercentage; + + public CapacityTracker( + int initialCapacity, + TcpEventContext eventContext) + { + this.eventContext = eventContext; + this.initialCapacity = initialCapacity; + this.capacity = new MutableInteger(initialCapacity); + } + + public int capacity() + { + return capacity.get(); + } + + public int incrementAndGet( + long bindingId) + { + int newCapacity = capacity.incrementAndGet(); + capacityChanged(bindingId, newCapacity); + + return newCapacity; + } + + public int decrementAndGet( + long bindingId) + { + int newCapacity = capacity.decrementAndGet(); + capacityChanged(bindingId, newCapacity); + + return newCapacity; + } + + public int get() + { + return capacity.get(); + } + + private void capacityChanged( + long bindingId, + int newCapacity) + { + int newCapacityPercentage = 100 - (newCapacity * 100 / initialCapacity); + + if (Math.abs(capacityPercentage - newCapacityPercentage) >= 1) + { + eventContext.usageChanged(bindingId, newCapacityPercentage); + } + + capacityPercentage = newCapacityPercentage; + } +} diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBindingContext.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBindingContext.java index 3f596751c5..61aaa11bbc 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBindingContext.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBindingContext.java @@ -22,8 +22,6 @@ import java.util.EnumMap; import java.util.Map; -import org.agrona.collections.MutableInteger; - import io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpClientFactory; import io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpServerFactory; import io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpStreamFactory; @@ -41,10 +39,11 @@ final class TcpBindingContext implements BindingContext TcpConfiguration config, EngineContext context) { - MutableInteger capacity = new MutableInteger(ENGINE_WORKER_CAPACITY.getAsInt(config)); + TcpEventContext event = new TcpEventContext(context); + CapacityTracker capacity = new CapacityTracker(ENGINE_WORKER_CAPACITY.getAsInt(config), event); Map factories = new EnumMap<>(KindConfig.class); factories.put(SERVER, new TcpServerFactory(config, context, capacity)); - factories.put(CLIENT, new TcpClientFactory(config, context, capacity)); + factories.put(CLIENT, new TcpClientFactory(config, context, event, capacity)); this.factories = factories; } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventContext.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventContext.java index 4997edbed8..553ba49bdc 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventContext.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventContext.java @@ -16,6 +16,7 @@ package io.aklivity.zilla.runtime.binding.tcp.internal; import static io.aklivity.zilla.runtime.binding.tcp.internal.types.event.TcpEventType.DNS_FAILED; +import static io.aklivity.zilla.runtime.binding.tcp.internal.types.event.TcpEventType.USAGE_CAPACITY_PERCENTAGE; import java.nio.ByteBuffer; import java.time.Clock; @@ -39,6 +40,7 @@ public class TcpEventContext private final int tcpTypeId; private final int dnsFailedEventId; + private final int capacityPercantageEventId; private final MessageConsumer eventWriter; private final Clock clock; @@ -47,6 +49,7 @@ public TcpEventContext( { this.tcpTypeId = context.supplyTypeId(TcpBinding.NAME); this.dnsFailedEventId = context.supplyEventId("binding.tcp.dns.failed"); + this.capacityPercantageEventId = context.supplyEventId("binding.tcp.usage.capacity.percentage"); this.eventWriter = context.supplyEventWriter(); this.clock = context.clock(); } @@ -73,4 +76,26 @@ public void dnsFailed( .build(); eventWriter.accept(tcpTypeId, event.buffer(), event.offset(), event.limit()); } + + public void usageChanged( + long bindingId, + int percentage) + { + TcpEventExFW extension = tcpEventExRW + .wrap(extensionBuffer, 0, extensionBuffer.capacity()) + .usageCapacity(e -> e + .typeId(USAGE_CAPACITY_PERCENTAGE.value()) + .percentage(percentage) + ) + .build(); + EventFW event = eventRW + .wrap(eventBuffer, 0, eventBuffer.capacity()) + .id(capacityPercantageEventId) + .timestamp(clock.millis()) + .traceId(0L) + .namespacedId(bindingId) + .extension(extension.buffer(), extension.offset(), extension.limit()) + .build(); + eventWriter.accept(tcpTypeId, event.buffer(), event.offset(), event.limit()); + } } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventFormatter.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventFormatter.java index 66fbad7cf0..6d2db868ef 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventFormatter.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventFormatter.java @@ -21,6 +21,7 @@ import io.aklivity.zilla.runtime.binding.tcp.internal.types.event.EventFW; import io.aklivity.zilla.runtime.binding.tcp.internal.types.event.TcpDnsFailedExFW; import io.aklivity.zilla.runtime.binding.tcp.internal.types.event.TcpEventExFW; +import io.aklivity.zilla.runtime.binding.tcp.internal.types.event.TcpUsageCapacityPercentageExFW; import io.aklivity.zilla.runtime.engine.Configuration; import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi; @@ -46,11 +47,13 @@ public String format( switch (extension.kind()) { case DNS_FAILED: - { - final TcpDnsFailedExFW ex = extension.dnsFailed(); - result = String.format("Unable to resolve host dns for address (%s).", asString(ex.address())); + final TcpDnsFailedExFW dnsFailed = extension.dnsFailed(); + result = String.format("Unable to resolve host dns for address (%s).", asString(dnsFailed.address())); + break; + case USAGE_CAPACITY_PERCENTAGE: + final TcpUsageCapacityPercentageExFW capacityUsage = extension.usageCapacity(); + result = String.format("%d", capacityUsage.percentage()); break; - } } return result; } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientFactory.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientFactory.java index 61c069320c..2b263f869f 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientFactory.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientFactory.java @@ -40,11 +40,12 @@ import org.agrona.CloseHelper; import org.agrona.DirectBuffer; import org.agrona.MutableDirectBuffer; -import org.agrona.collections.MutableInteger; import org.agrona.concurrent.UnsafeBuffer; import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig; +import io.aklivity.zilla.runtime.binding.tcp.internal.CapacityTracker; import io.aklivity.zilla.runtime.binding.tcp.internal.TcpConfiguration; +import io.aklivity.zilla.runtime.binding.tcp.internal.TcpEventContext; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpBindingConfig; import io.aklivity.zilla.runtime.binding.tcp.internal.types.Flyweight; import io.aklivity.zilla.runtime.binding.tcp.internal.types.OctetsFW; @@ -100,9 +101,10 @@ public class TcpClientFactory implements TcpStreamFactory public TcpClientFactory( TcpConfiguration config, EngineContext context, - MutableInteger capacity) + TcpEventContext event, + CapacityTracker capacity) { - this.router = new TcpClientRouter(context, capacity); + this.router = new TcpClientRouter(context, event, capacity); this.writeBuffer = context.writeBuffer(); this.writeByteBuffer = ByteBuffer.allocateDirect(writeBuffer.capacity()).order(nativeOrder()); this.bufferPool = context.bufferPool(); @@ -164,7 +166,7 @@ public MessageConsumer newStream( final long initialId = begin.streamId(); final SocketChannel channel = newSocketChannel(); - final TcpClient client = new TcpClient(application, originId, routedId, initialId, channel); + final TcpClient client = new TcpClient(application, originId, routedId, initialId, binding.id, channel); client.doNetConnect(route, binding.options); newStream = client::onAppMessage; } @@ -206,10 +208,11 @@ private SocketChannel newSocketChannel() } private void closeNet( + long bindingId, SocketChannel network) { CloseHelper.quietClose(network); - router.close(); + router.close(bindingId); } private final class TcpClient @@ -219,6 +222,7 @@ private final class TcpClient private final long routedId; private final long initialId; private final long replyId; + private final long bindingId; private final SocketChannel net; private PollerKey networkKey; @@ -242,6 +246,7 @@ private TcpClient( long originId, long routedId, long initialId, + long bindingId, SocketChannel net) { this.app = app; @@ -249,6 +254,7 @@ private TcpClient( this.routedId = routedId; this.initialId = initialId; this.replyId = supplyReplyId.applyAsLong(initialId); + this.bindingId = bindingId; this.net = net; } @@ -355,7 +361,7 @@ private int onNetReadable( if (net.socket().isOutputShutdown()) { - closeNet(net); + closeNet(bindingId, net); } } else if (bytesRead != 0) @@ -468,7 +474,7 @@ private void doNetShutdownOutput( if (net.isConnectionPending()) { networkKey.clear(OP_CONNECT); - closeNet(net); + closeNet(bindingId, net); } else { @@ -477,7 +483,7 @@ private void doNetShutdownOutput( if (net.socket().isInputShutdown()) { - closeNet(net); + closeNet(bindingId, net); } } } @@ -773,7 +779,7 @@ private void cleanup( doAppAbort(traceId); doAppReset(traceId); - closeNet(net); + closeNet(bindingId, net); cleanupWriteSlot(); } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java index 9b20b5a979..65111bc054 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java @@ -27,9 +27,9 @@ import java.util.function.Predicate; import org.agrona.collections.Long2ObjectHashMap; -import org.agrona.collections.MutableInteger; import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig; +import io.aklivity.zilla.runtime.binding.tcp.internal.CapacityTracker; import io.aklivity.zilla.runtime.binding.tcp.internal.TcpEventContext; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpBindingConfig; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpRouteConfig; @@ -49,18 +49,19 @@ public final class TcpClientRouter private final byte[] ipv6ros = new byte[16]; private final Function resolveHost; - private final MutableInteger capacity; + private final CapacityTracker capacity; private final Long2ObjectHashMap bindings; private final TcpEventContext event; public TcpClientRouter( EngineContext context, - MutableInteger capacity) + TcpEventContext event, + CapacityTracker capacity) { this.resolveHost = context::resolveHost; + this.event = event; this.capacity = capacity; this.bindings = new Long2ObjectHashMap<>(); - this.event = new TcpEventContext(context); } public void attach( @@ -179,7 +180,7 @@ else if (binding.routes == TcpBindingConfig.DEFAULT_CLIENT_ROUTES) if (resolved != null) { - capacity.decrementAndGet(); + capacity.decrementAndGet(binding.id); } return resolved; @@ -191,9 +192,10 @@ public void detach( bindings.remove(bindingId); } - public void close() + public void close( + long bindingId) { - capacity.decrementAndGet(); + capacity.decrementAndGet(bindingId); } @Override diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java index cb4c0d6344..67aa226f11 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java @@ -40,10 +40,10 @@ import org.agrona.DirectBuffer; import org.agrona.LangUtil; import org.agrona.MutableDirectBuffer; -import org.agrona.collections.MutableInteger; import org.agrona.concurrent.UnsafeBuffer; import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig; +import io.aklivity.zilla.runtime.binding.tcp.internal.CapacityTracker; import io.aklivity.zilla.runtime.binding.tcp.internal.TcpConfiguration; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpBindingConfig; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpRouteConfig; @@ -103,7 +103,7 @@ public class TcpServerFactory implements TcpStreamFactory public TcpServerFactory( TcpConfiguration config, EngineContext context, - MutableInteger capacity) + CapacityTracker capacity) { this.router = new TcpServerRouter(context, this::handleAccept, capacity); this.writeBuffer = context.writeBuffer(); @@ -171,7 +171,8 @@ private int handleAccept( ServerSocketChannel server = (ServerSocketChannel) acceptKey.channel(); - for (SocketChannel channel = router.accept(server); channel != null; channel = router.accept(server)) + for (SocketChannel channel = router.accept(binding.id, server); channel != null; + channel = router.accept(binding.id, server)) { channel.configureBlocking(false); channel.setOption(TCP_NODELAY, options.nodelay); @@ -204,14 +205,15 @@ private void onAccepted( } else { - closeNet(network); + closeNet(binding.id, network); } } private void closeNet( + long bindingId, SocketChannel network) { - router.close(network); + router.close(bindingId, network); } private final class TcpServer @@ -293,7 +295,7 @@ private int onNetReadable( if (net.socket().isOutputShutdown()) { - closeNet(net); + closeNet(originId, net); } } else if (bytesRead != 0) @@ -407,7 +409,7 @@ private void doNetShutdownOutput( if (net.socket().isInputShutdown()) { - closeNet(net); + closeNet(originId, net); } } catch (IOException ex) @@ -725,7 +727,7 @@ private void cleanup( cleanupWriteSlot(); - closeNet(net); + closeNet(originId, net); } private void cleanupWriteSlot() diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java index 4a30e4c158..21bdcf6bbd 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java @@ -27,8 +27,8 @@ import org.agrona.CloseHelper; import org.agrona.collections.Long2ObjectHashMap; -import org.agrona.collections.MutableInteger; +import io.aklivity.zilla.runtime.binding.tcp.internal.CapacityTracker; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpBindingConfig; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpServerBindingConfig; import io.aklivity.zilla.runtime.engine.EngineContext; @@ -40,14 +40,14 @@ public final class TcpServerRouter private final ToIntFunction acceptHandler; private final Function supplyPollerKey; private final Long2ObjectHashMap serversById; - private final MutableInteger capacity; + private final CapacityTracker capacity; private boolean unbound; public TcpServerRouter( EngineContext context, ToIntFunction acceptHandler, - MutableInteger capacity) + CapacityTracker capacity) { this.bindings = new Long2ObjectHashMap<>(); this.supplyPollerKey = context::supplyPollerKey; @@ -85,6 +85,7 @@ public String toString() } public SocketChannel accept( + long bindingId, ServerSocketChannel server) throws IOException { SocketChannel channel = null; @@ -95,7 +96,7 @@ public SocketChannel accept( if (channel != null) { - capacity.decrementAndGet(); + capacity.decrementAndGet(bindingId); } } @@ -111,11 +112,13 @@ public SocketChannel accept( } public void close( + long bindingId, SocketChannel channel) { CloseHelper.quietClose(channel); - if (unbound && capacity.incrementAndGet() > 0) + int newCapacity = capacity.incrementAndGet(bindingId); + if (unbound && newCapacity > 0) { bindings.values().stream() .filter(b -> b.kind == SERVER) diff --git a/specs/binding-tcp.spec/src/main/resources/META-INF/zilla/tcp.idl b/specs/binding-tcp.spec/src/main/resources/META-INF/zilla/tcp.idl index 3b4fb2b53d..0931ffa215 100644 --- a/specs/binding-tcp.spec/src/main/resources/META-INF/zilla/tcp.idl +++ b/specs/binding-tcp.spec/src/main/resources/META-INF/zilla/tcp.idl @@ -19,7 +19,8 @@ scope tcp { enum TcpEventType (uint8) { - DNS_FAILED (1) + DNS_FAILED (1), + USAGE_CAPACITY_PERCENTAGE (2) } struct TcpDnsFailedEx extends core::stream::Extension @@ -27,9 +28,15 @@ scope tcp string16 address; } + struct TcpUsageCapacityPercentageEx extends core::stream::Extension + { + int32 percentage; + } + union TcpEventEx switch (TcpEventType) { case DNS_FAILED: TcpDnsFailedEx dnsFailed; + case USAGE_CAPACITY_PERCENTAGE: TcpUsageCapacityPercentageEx usageCapacity; } } } From b6f9124def2861efd29b151cf39f5c5fd1d7b3cc Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 14 May 2025 10:07:01 +0500 Subject: [PATCH 02/20] Add utilization metrics support --- .../echo/internal/bench/EchoWorker.java | 6 +++++ .../binding/tcp/internal/CapacityTracker.java | 26 +++++++++---------- .../tcp/internal/TcpBindingContext.java | 7 ++--- .../binding/tcp/internal/TcpEventContext.java | 25 ------------------ .../tcp/internal/TcpEventFormatter.java | 5 ---- .../tcp/internal/stream/TcpClientFactory.java | 18 +++++-------- .../tcp/internal/stream/TcpClientRouter.java | 7 +++-- .../tcp/internal/stream/TcpServerFactory.java | 15 +++++------ .../tcp/internal/stream/TcpServerRouter.java | 6 ++--- .../binding/tls/internal/bench/TlsWorker.java | 6 +++++ .../zilla/runtime/engine/EngineContext.java | 2 ++ .../internal/registry/EngineWorker.java | 11 ++++++++ .../src/main/resources/META-INF/zilla/tcp.idl | 9 +------ 13 files changed, 62 insertions(+), 81 deletions(-) diff --git a/runtime/binding-echo/src/test/java/io/aklivity/zilla/runtime/binding/echo/internal/bench/EchoWorker.java b/runtime/binding-echo/src/test/java/io/aklivity/zilla/runtime/binding/echo/internal/bench/EchoWorker.java index 0ef4b86779..5ca0a81270 100644 --- a/runtime/binding-echo/src/test/java/io/aklivity/zilla/runtime/binding/echo/internal/bench/EchoWorker.java +++ b/runtime/binding-echo/src/test/java/io/aklivity/zilla/runtime/binding/echo/internal/bench/EchoWorker.java @@ -325,6 +325,12 @@ public ConverterHandler supplyWriteConverter( return null; } + @Override + public LongConsumer supplyUtilizationMetric() + { + return null; + } + @Override public Path resolvePath( String location) diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/CapacityTracker.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/CapacityTracker.java index 72543c0c37..4260c7a84c 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/CapacityTracker.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/CapacityTracker.java @@ -15,23 +15,25 @@ */ package io.aklivity.zilla.runtime.binding.tcp.internal; +import java.util.function.LongConsumer; + import org.agrona.collections.MutableInteger; public final class CapacityTracker { private final MutableInteger capacity; - private final TcpEventContext eventContext; + private final LongConsumer capacityUsage; private final int initialCapacity; private int capacityPercentage; public CapacityTracker( int initialCapacity, - TcpEventContext eventContext) + LongConsumer capacityUsage) { - this.eventContext = eventContext; this.initialCapacity = initialCapacity; this.capacity = new MutableInteger(initialCapacity); + this.capacityUsage = capacityUsage; } public int capacity() @@ -39,20 +41,18 @@ public int capacity() return capacity.get(); } - public int incrementAndGet( - long bindingId) + public int incrementAndGet() { int newCapacity = capacity.incrementAndGet(); - capacityChanged(bindingId, newCapacity); + capacityChanged(newCapacity); return newCapacity; } - public int decrementAndGet( - long bindingId) + public int decrementAndGet() { int newCapacity = capacity.decrementAndGet(); - capacityChanged(bindingId, newCapacity); + capacityChanged(newCapacity); return newCapacity; } @@ -63,14 +63,14 @@ public int get() } private void capacityChanged( - long bindingId, int newCapacity) { - int newCapacityPercentage = 100 - (newCapacity * 100 / initialCapacity); + final int newCapacityPercentage = 100 - (newCapacity * 100 / initialCapacity); + final int percentageDiff = newCapacityPercentage - capacityPercentage; - if (Math.abs(capacityPercentage - newCapacityPercentage) >= 1) + if (Math.abs(percentageDiff) >= 1) { - eventContext.usageChanged(bindingId, newCapacityPercentage); + capacityUsage.accept(percentageDiff); } capacityPercentage = newCapacityPercentage; diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBindingContext.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBindingContext.java index 61aaa11bbc..a7465a9c2b 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBindingContext.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBindingContext.java @@ -21,6 +21,7 @@ import java.util.EnumMap; import java.util.Map; +import java.util.function.LongConsumer; import io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpClientFactory; import io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpServerFactory; @@ -39,11 +40,11 @@ final class TcpBindingContext implements BindingContext TcpConfiguration config, EngineContext context) { - TcpEventContext event = new TcpEventContext(context); - CapacityTracker capacity = new CapacityTracker(ENGINE_WORKER_CAPACITY.getAsInt(config), event); + LongConsumer capacityUsage = context.supplyUtilizationMetric(); + CapacityTracker capacity = new CapacityTracker(ENGINE_WORKER_CAPACITY.getAsInt(config), capacityUsage); Map factories = new EnumMap<>(KindConfig.class); factories.put(SERVER, new TcpServerFactory(config, context, capacity)); - factories.put(CLIENT, new TcpClientFactory(config, context, event, capacity)); + factories.put(CLIENT, new TcpClientFactory(config, context, capacity)); this.factories = factories; } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventContext.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventContext.java index 553ba49bdc..4997edbed8 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventContext.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventContext.java @@ -16,7 +16,6 @@ package io.aklivity.zilla.runtime.binding.tcp.internal; import static io.aklivity.zilla.runtime.binding.tcp.internal.types.event.TcpEventType.DNS_FAILED; -import static io.aklivity.zilla.runtime.binding.tcp.internal.types.event.TcpEventType.USAGE_CAPACITY_PERCENTAGE; import java.nio.ByteBuffer; import java.time.Clock; @@ -40,7 +39,6 @@ public class TcpEventContext private final int tcpTypeId; private final int dnsFailedEventId; - private final int capacityPercantageEventId; private final MessageConsumer eventWriter; private final Clock clock; @@ -49,7 +47,6 @@ public TcpEventContext( { this.tcpTypeId = context.supplyTypeId(TcpBinding.NAME); this.dnsFailedEventId = context.supplyEventId("binding.tcp.dns.failed"); - this.capacityPercantageEventId = context.supplyEventId("binding.tcp.usage.capacity.percentage"); this.eventWriter = context.supplyEventWriter(); this.clock = context.clock(); } @@ -76,26 +73,4 @@ public void dnsFailed( .build(); eventWriter.accept(tcpTypeId, event.buffer(), event.offset(), event.limit()); } - - public void usageChanged( - long bindingId, - int percentage) - { - TcpEventExFW extension = tcpEventExRW - .wrap(extensionBuffer, 0, extensionBuffer.capacity()) - .usageCapacity(e -> e - .typeId(USAGE_CAPACITY_PERCENTAGE.value()) - .percentage(percentage) - ) - .build(); - EventFW event = eventRW - .wrap(eventBuffer, 0, eventBuffer.capacity()) - .id(capacityPercantageEventId) - .timestamp(clock.millis()) - .traceId(0L) - .namespacedId(bindingId) - .extension(extension.buffer(), extension.offset(), extension.limit()) - .build(); - eventWriter.accept(tcpTypeId, event.buffer(), event.offset(), event.limit()); - } } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventFormatter.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventFormatter.java index 6d2db868ef..3598a9ff5b 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventFormatter.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpEventFormatter.java @@ -21,7 +21,6 @@ import io.aklivity.zilla.runtime.binding.tcp.internal.types.event.EventFW; import io.aklivity.zilla.runtime.binding.tcp.internal.types.event.TcpDnsFailedExFW; import io.aklivity.zilla.runtime.binding.tcp.internal.types.event.TcpEventExFW; -import io.aklivity.zilla.runtime.binding.tcp.internal.types.event.TcpUsageCapacityPercentageExFW; import io.aklivity.zilla.runtime.engine.Configuration; import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi; @@ -50,10 +49,6 @@ public String format( final TcpDnsFailedExFW dnsFailed = extension.dnsFailed(); result = String.format("Unable to resolve host dns for address (%s).", asString(dnsFailed.address())); break; - case USAGE_CAPACITY_PERCENTAGE: - final TcpUsageCapacityPercentageExFW capacityUsage = extension.usageCapacity(); - result = String.format("%d", capacityUsage.percentage()); - break; } return result; } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientFactory.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientFactory.java index 2b263f869f..e478f6954e 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientFactory.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientFactory.java @@ -101,9 +101,9 @@ public class TcpClientFactory implements TcpStreamFactory public TcpClientFactory( TcpConfiguration config, EngineContext context, - TcpEventContext event, CapacityTracker capacity) { + TcpEventContext event = new TcpEventContext(context); this.router = new TcpClientRouter(context, event, capacity); this.writeBuffer = context.writeBuffer(); this.writeByteBuffer = ByteBuffer.allocateDirect(writeBuffer.capacity()).order(nativeOrder()); @@ -166,7 +166,7 @@ public MessageConsumer newStream( final long initialId = begin.streamId(); final SocketChannel channel = newSocketChannel(); - final TcpClient client = new TcpClient(application, originId, routedId, initialId, binding.id, channel); + final TcpClient client = new TcpClient(application, originId, routedId, initialId, channel); client.doNetConnect(route, binding.options); newStream = client::onAppMessage; } @@ -208,11 +208,10 @@ private SocketChannel newSocketChannel() } private void closeNet( - long bindingId, SocketChannel network) { CloseHelper.quietClose(network); - router.close(bindingId); + router.close(); } private final class TcpClient @@ -222,7 +221,6 @@ private final class TcpClient private final long routedId; private final long initialId; private final long replyId; - private final long bindingId; private final SocketChannel net; private PollerKey networkKey; @@ -246,7 +244,6 @@ private TcpClient( long originId, long routedId, long initialId, - long bindingId, SocketChannel net) { this.app = app; @@ -254,7 +251,6 @@ private TcpClient( this.routedId = routedId; this.initialId = initialId; this.replyId = supplyReplyId.applyAsLong(initialId); - this.bindingId = bindingId; this.net = net; } @@ -361,7 +357,7 @@ private int onNetReadable( if (net.socket().isOutputShutdown()) { - closeNet(bindingId, net); + closeNet(net); } } else if (bytesRead != 0) @@ -474,7 +470,7 @@ private void doNetShutdownOutput( if (net.isConnectionPending()) { networkKey.clear(OP_CONNECT); - closeNet(bindingId, net); + closeNet(net); } else { @@ -483,7 +479,7 @@ private void doNetShutdownOutput( if (net.socket().isInputShutdown()) { - closeNet(bindingId, net); + closeNet(net); } } } @@ -779,7 +775,7 @@ private void cleanup( doAppAbort(traceId); doAppReset(traceId); - closeNet(bindingId, net); + closeNet(net); cleanupWriteSlot(); } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java index 65111bc054..3fbb62ab33 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java @@ -180,7 +180,7 @@ else if (binding.routes == TcpBindingConfig.DEFAULT_CLIENT_ROUTES) if (resolved != null) { - capacity.decrementAndGet(binding.id); + capacity.decrementAndGet(); } return resolved; @@ -192,10 +192,9 @@ public void detach( bindings.remove(bindingId); } - public void close( - long bindingId) + public void close() { - capacity.decrementAndGet(bindingId); + capacity.decrementAndGet(); } @Override diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java index 67aa226f11..fe64005bde 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java @@ -171,8 +171,8 @@ private int handleAccept( ServerSocketChannel server = (ServerSocketChannel) acceptKey.channel(); - for (SocketChannel channel = router.accept(binding.id, server); channel != null; - channel = router.accept(binding.id, server)) + for (SocketChannel channel = router.accept(server); channel != null; + channel = router.accept(server)) { channel.configureBlocking(false); channel.setOption(TCP_NODELAY, options.nodelay); @@ -205,15 +205,14 @@ private void onAccepted( } else { - closeNet(binding.id, network); + closeNet(network); } } private void closeNet( - long bindingId, SocketChannel network) { - router.close(bindingId, network); + router.close(network); } private final class TcpServer @@ -295,7 +294,7 @@ private int onNetReadable( if (net.socket().isOutputShutdown()) { - closeNet(originId, net); + closeNet(net); } } else if (bytesRead != 0) @@ -409,7 +408,7 @@ private void doNetShutdownOutput( if (net.socket().isInputShutdown()) { - closeNet(originId, net); + closeNet(net); } } catch (IOException ex) @@ -727,7 +726,7 @@ private void cleanup( cleanupWriteSlot(); - closeNet(originId, net); + closeNet(net); } private void cleanupWriteSlot() diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java index 21bdcf6bbd..b903b6a5c6 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java @@ -85,7 +85,6 @@ public String toString() } public SocketChannel accept( - long bindingId, ServerSocketChannel server) throws IOException { SocketChannel channel = null; @@ -96,7 +95,7 @@ public SocketChannel accept( if (channel != null) { - capacity.decrementAndGet(bindingId); + capacity.decrementAndGet(); } } @@ -112,12 +111,11 @@ public SocketChannel accept( } public void close( - long bindingId, SocketChannel channel) { CloseHelper.quietClose(channel); - int newCapacity = capacity.incrementAndGet(bindingId); + int newCapacity = capacity.incrementAndGet(); if (unbound && newCapacity > 0) { bindings.values().stream() diff --git a/runtime/binding-tls/src/test/java/io/aklivity/zilla/runtime/binding/tls/internal/bench/TlsWorker.java b/runtime/binding-tls/src/test/java/io/aklivity/zilla/runtime/binding/tls/internal/bench/TlsWorker.java index 1c59591591..58da38f435 100644 --- a/runtime/binding-tls/src/test/java/io/aklivity/zilla/runtime/binding/tls/internal/bench/TlsWorker.java +++ b/runtime/binding-tls/src/test/java/io/aklivity/zilla/runtime/binding/tls/internal/bench/TlsWorker.java @@ -391,6 +391,12 @@ public ConverterHandler supplyWriteConverter( return null; } + @Override + public LongConsumer supplyUtilizationMetric() + { + return null; + } + @Override public Path resolvePath( String location) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineContext.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineContext.java index 1fac875102..f117863f39 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineContext.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineContext.java @@ -161,6 +161,8 @@ ConverterHandler supplyReadConverter( ConverterHandler supplyWriteConverter( ModelConfig config); + LongConsumer supplyUtilizationMetric(); + Path resolvePath( String location); diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java index 20f8f14b40..330439e165 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java @@ -766,6 +766,17 @@ public ConverterHandler supplyWriteConverter( return model != null ? model.supplyWriteConverterHandler(config) : null; } + @Override + public LongConsumer supplyUtilizationMetric() + { + final int metricId = labels.supplyLabelId("worker.utilization"); + //final int metricId = labels.supplyLabelId("worker.count"); + final int engineId = labels.supplyLabelId("engine"); + final long bindingId = NamespacedId.id(engineId, engineId); + + return supplyMetricWriter(GAUGE, bindingId, metricId); + } + @Override public Path resolvePath( String location) diff --git a/specs/binding-tcp.spec/src/main/resources/META-INF/zilla/tcp.idl b/specs/binding-tcp.spec/src/main/resources/META-INF/zilla/tcp.idl index 0931ffa215..3b4fb2b53d 100644 --- a/specs/binding-tcp.spec/src/main/resources/META-INF/zilla/tcp.idl +++ b/specs/binding-tcp.spec/src/main/resources/META-INF/zilla/tcp.idl @@ -19,8 +19,7 @@ scope tcp { enum TcpEventType (uint8) { - DNS_FAILED (1), - USAGE_CAPACITY_PERCENTAGE (2) + DNS_FAILED (1) } struct TcpDnsFailedEx extends core::stream::Extension @@ -28,15 +27,9 @@ scope tcp string16 address; } - struct TcpUsageCapacityPercentageEx extends core::stream::Extension - { - int32 percentage; - } - union TcpEventEx switch (TcpEventType) { case DNS_FAILED: TcpDnsFailedEx dnsFailed; - case USAGE_CAPACITY_PERCENTAGE: TcpUsageCapacityPercentageEx usageCapacity; } } } From 94982d6e923d85b43ff821dd89da52b1fd4bf884 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Wed, 14 May 2025 11:26:04 +0500 Subject: [PATCH 03/20] Testing --- .../zilla/runtime/engine/internal/registry/EngineWorker.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java index 330439e165..2a15cc6c7d 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java @@ -1013,7 +1013,9 @@ public void onExporterDetached( public Metric resolveMetric( String metricName) { + System.out.println("resolveMetric: " + metricName); String metricGroupName = metricName.split("\\.")[0]; + System.out.println(metricGroupName); return metricGroupsByName.get(metricGroupName).supply(metricName); } From 2af7f953e0757478ae2db12871765e449b71bc35 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 15 May 2025 09:38:31 +0500 Subject: [PATCH 04/20] Add engine metric module --- .../internal/metrics/EngineMetricGroup.java | 64 +++++++++++++++++++ .../metrics/EngineMetricGroupFactorySpi.java | 36 +++++++++++ .../EngineWorkerUtilizationMetric.java | 59 +++++++++++++++++ .../internal/registry/EngineWorker.java | 2 +- .../engine/src/main/moditect/module-info.java | 2 + ...ntime.engine.metrics.MetricGroupFactorySpi | 1 + .../schema/metrics/engine.schema.patch.json | 7 ++ 7 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineMetricGroup.java create mode 100644 runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineMetricGroupFactorySpi.java create mode 100644 runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerUtilizationMetric.java create mode 100644 runtime/engine/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.metrics.MetricGroupFactorySpi create mode 100644 specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/metrics/engine.schema.patch.json diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineMetricGroup.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineMetricGroup.java new file mode 100644 index 0000000000..d3d9640836 --- /dev/null +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineMetricGroup.java @@ -0,0 +1,64 @@ +/* + * Copyright 2021-2024 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.engine.internal.metrics; + +import java.net.URL; +import java.util.Collection; +import java.util.Map; +import java.util.function.Supplier; + +import io.aklivity.zilla.runtime.engine.Configuration; +import io.aklivity.zilla.runtime.engine.metrics.Metric; +import io.aklivity.zilla.runtime.engine.metrics.MetricGroup; + +public class EngineMetricGroup implements MetricGroup +{ + public static final String NAME = "engine"; + + private final Map> engineMetrics = Map.of( + "engine.worker.utilization", EngineWorkerUtilizationMetric::new + ); + + public EngineMetricGroup( + Configuration config) + { + } + + @Override + public String name() + { + return NAME; + } + + @Override + public URL type() + { + return getClass().getResource("schema/engine.schema.patch.json"); + } + + @Override + public Metric supply( + String name) + { + return engineMetrics.getOrDefault(name, () -> null).get(); + } + + @Override + public Collection metricNames() + { + return engineMetrics.keySet(); + } +} diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineMetricGroupFactorySpi.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineMetricGroupFactorySpi.java new file mode 100644 index 0000000000..de9325f3b1 --- /dev/null +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineMetricGroupFactorySpi.java @@ -0,0 +1,36 @@ +/* + * Copyright 2021-2024 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.engine.internal.metrics; + +import io.aklivity.zilla.runtime.engine.Configuration; +import io.aklivity.zilla.runtime.engine.metrics.MetricGroup; +import io.aklivity.zilla.runtime.engine.metrics.MetricGroupFactorySpi; + +public class EngineMetricGroupFactorySpi implements MetricGroupFactorySpi +{ + @Override + public String type() + { + return EngineMetricGroup.NAME; + } + + @Override + public MetricGroup create( + Configuration config) + { + return new EngineMetricGroup(config); + } +} diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerUtilizationMetric.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerUtilizationMetric.java new file mode 100644 index 0000000000..186f0eb182 --- /dev/null +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerUtilizationMetric.java @@ -0,0 +1,59 @@ +/* + * Copyright 2021-2024 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.engine.internal.metrics; + +import io.aklivity.zilla.runtime.engine.EngineContext; +import io.aklivity.zilla.runtime.engine.metrics.Metric; +import io.aklivity.zilla.runtime.engine.metrics.MetricContext; + +public class EngineWorkerUtilizationMetric implements Metric +{ + private static final String GROUP = EngineMetricGroup.NAME; + private static final String NAME = String.format("%s.%s", GROUP, "worker.utilization"); + private static final String DESCRIPTION = "Engine worker utilization"; + + @Override + public String name() + { + return NAME; + } + + @Override + public Kind kind() + { + return Kind.GAUGE; + } + + @Override + public Unit unit() + { + return Unit.COUNT; + } + + @Override + public String description() + { + return DESCRIPTION; + } + + @Override + public MetricContext supply( + EngineContext context) + { + //Unsupported metric context + return null; + } +} diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java index 2a15cc6c7d..0f8ad5f1bb 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java @@ -769,7 +769,7 @@ public ConverterHandler supplyWriteConverter( @Override public LongConsumer supplyUtilizationMetric() { - final int metricId = labels.supplyLabelId("worker.utilization"); + final int metricId = labels.supplyLabelId("engine.worker.utilization"); //final int metricId = labels.supplyLabelId("worker.count"); final int engineId = labels.supplyLabelId("engine"); final long bindingId = NamespacedId.id(engineId, engineId); diff --git a/runtime/engine/src/main/moditect/module-info.java b/runtime/engine/src/main/moditect/module-info.java index e9d7927615..eb866bdcfe 100644 --- a/runtime/engine/src/main/moditect/module-info.java +++ b/runtime/engine/src/main/moditect/module-info.java @@ -58,6 +58,8 @@ provides io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi with io.aklivity.zilla.runtime.engine.internal.event.EngineEventFormatterFactory; + provides io.aklivity.zilla.runtime.engine.metrics.MetricGroupFactorySpi + with io.aklivity.zilla.runtime.engine.internal.metrics.EngineMetricGroupFactorySpi; uses io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi; uses io.aklivity.zilla.runtime.engine.config.CompositeBindingAdapterSpi; diff --git a/runtime/engine/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.metrics.MetricGroupFactorySpi b/runtime/engine/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.metrics.MetricGroupFactorySpi new file mode 100644 index 0000000000..ddc7212652 --- /dev/null +++ b/runtime/engine/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.metrics.MetricGroupFactorySpi @@ -0,0 +1 @@ +io.aklivity.zilla.runtime.engine.internal.metrics.EngineMetricGroupFactorySpi diff --git a/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/metrics/engine.schema.patch.json b/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/metrics/engine.schema.patch.json new file mode 100644 index 0000000000..4da6bb14ac --- /dev/null +++ b/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/metrics/engine.schema.patch.json @@ -0,0 +1,7 @@ +[ + { + "op": "add", + "path": "/$defs/telemetry/metrics/items/enum/-", + "value": "engine.worker.utilization" + } +] From 37f08d3fe5c2f633993a1c3020e5f4aa002d3911 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 15 May 2025 10:44:48 +0500 Subject: [PATCH 05/20] Remove print statements --- .../zilla/runtime/engine/internal/registry/EngineWorker.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java index 0f8ad5f1bb..623d55d592 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java @@ -1013,9 +1013,7 @@ public void onExporterDetached( public Metric resolveMetric( String metricName) { - System.out.println("resolveMetric: " + metricName); String metricGroupName = metricName.split("\\.")[0]; - System.out.println(metricGroupName); return metricGroupsByName.get(metricGroupName).supply(metricName); } From 7a75db51d7178b371296f53981ea54ce50c56ed2 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 15 May 2025 12:38:45 +0500 Subject: [PATCH 06/20] Add worker count metric --- .../engine/internal/registry/EngineWorker.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java index 623d55d592..0163ee5572 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java @@ -297,6 +297,16 @@ public EngineWorker( metricWriterSuppliers.put(GAUGE, gaugesLayout::supplyWriter); metricWriterSuppliers.put(HISTOGRAM, histogramsLayout::supplyWriter); + if (!readonly) + { + final int privateId = labels.supplyLabelId("private"); + final int engineId = labels.supplyLabelId("engine"); + final long bindingId = NamespacedId.id(privateId, engineId); + + final int metricId = labels.supplyLabelId("engine.worker.count"); + supplyMetricWriter(GAUGE, bindingId, metricId).accept(1); + } + final StreamsLayout streamsLayout = new StreamsLayout.Builder() .path(config.directory().resolve(String.format("data%d", index))) .streamsCapacity(config.streamsBufferCapacity()) @@ -769,10 +779,11 @@ public ConverterHandler supplyWriteConverter( @Override public LongConsumer supplyUtilizationMetric() { - final int metricId = labels.supplyLabelId("engine.worker.utilization"); - //final int metricId = labels.supplyLabelId("worker.count"); + final int privateId = labels.supplyLabelId("private"); final int engineId = labels.supplyLabelId("engine"); - final long bindingId = NamespacedId.id(engineId, engineId); + final long bindingId = NamespacedId.id(privateId, engineId); + + final int metricId = labels.supplyLabelId("engine.worker.utilization"); return supplyMetricWriter(GAUGE, bindingId, metricId); } From be679e8e0daddc270e47483fa6a97412567e51d2 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 15 May 2025 13:55:42 +0500 Subject: [PATCH 07/20] Add missing metric --- .../internal/metrics/EngineMetricGroup.java | 3 +- .../metrics/EngineWorkerCountMetric.java | 59 +++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerCountMetric.java diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineMetricGroup.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineMetricGroup.java index d3d9640836..a4f81b9244 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineMetricGroup.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineMetricGroup.java @@ -29,7 +29,8 @@ public class EngineMetricGroup implements MetricGroup public static final String NAME = "engine"; private final Map> engineMetrics = Map.of( - "engine.worker.utilization", EngineWorkerUtilizationMetric::new + "engine.worker.utilization", EngineWorkerUtilizationMetric::new, + "engine.worker.count", EngineWorkerCountMetric::new ); public EngineMetricGroup( diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerCountMetric.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerCountMetric.java new file mode 100644 index 0000000000..7aeefe269d --- /dev/null +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerCountMetric.java @@ -0,0 +1,59 @@ +/* + * Copyright 2021-2024 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.engine.internal.metrics; + +import io.aklivity.zilla.runtime.engine.EngineContext; +import io.aklivity.zilla.runtime.engine.metrics.Metric; +import io.aklivity.zilla.runtime.engine.metrics.MetricContext; + +public class EngineWorkerCountMetric implements Metric +{ + private static final String GROUP = EngineMetricGroup.NAME; + private static final String NAME = String.format("%s.%s", GROUP, "worker.count"); + private static final String DESCRIPTION = "Engine worker count"; + + @Override + public String name() + { + return NAME; + } + + @Override + public Kind kind() + { + return Kind.COUNTER; + } + + @Override + public Unit unit() + { + return Unit.COUNT; + } + + @Override + public String description() + { + return DESCRIPTION; + } + + @Override + public MetricContext supply( + EngineContext context) + { + //Unsupported metric context + return null; + } +} From 554dea3e1abdcfbd53dccd3c07d9206c8b7ed21a Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 15 May 2025 15:11:23 +0500 Subject: [PATCH 08/20] Testing --- .../runtime/engine/internal/registry/EngineWorker.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java index 0163ee5572..16bca8d223 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java @@ -299,9 +299,8 @@ public EngineWorker( if (!readonly) { - final int privateId = labels.supplyLabelId("private"); final int engineId = labels.supplyLabelId("engine"); - final long bindingId = NamespacedId.id(privateId, engineId); + final long bindingId = NamespacedId.id(engineId, engineId); final int metricId = labels.supplyLabelId("engine.worker.count"); supplyMetricWriter(GAUGE, bindingId, metricId).accept(1); @@ -779,9 +778,8 @@ public ConverterHandler supplyWriteConverter( @Override public LongConsumer supplyUtilizationMetric() { - final int privateId = labels.supplyLabelId("private"); final int engineId = labels.supplyLabelId("engine"); - final long bindingId = NamespacedId.id(privateId, engineId); + final long bindingId = NamespacedId.id(engineId, engineId); final int metricId = labels.supplyLabelId("engine.worker.utilization"); From 3015bc0bc04cb0119d11ecdd2122b0a72fc42797 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 15 May 2025 15:42:23 +0500 Subject: [PATCH 09/20] Add percent unit --- .../engine/internal/metrics/EngineWorkerCountMetric.java | 2 +- .../aklivity/zilla/runtime/engine/internal/EngineMetricsIT.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerCountMetric.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerCountMetric.java index 7aeefe269d..01a8181992 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerCountMetric.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerCountMetric.java @@ -40,7 +40,7 @@ public Kind kind() @Override public Unit unit() { - return Unit.COUNT; + return Unit.PERCENT; } @Override diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/EngineMetricsIT.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/EngineMetricsIT.java index e1782cd668..3182ba67ff 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/EngineMetricsIT.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/internal/EngineMetricsIT.java @@ -105,7 +105,7 @@ public void shouldFetchGaugeIds() // THEN // gaugeIds[0] is coming from test.gauge in server.yaml - assertThat(gaugeIds[1], equalTo(new long[]{3L, 7L})); + assertThat(gaugeIds[2], equalTo(new long[]{3L, 7L})); } @Test From f4222fbfcff417e27f12a5a3106a7e639c60e3e8 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 15 May 2025 15:47:22 +0500 Subject: [PATCH 10/20] Fix typo --- .../java/io/aklivity/zilla/runtime/engine/metrics/Metric.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java index 07164f806a..8a107a9945 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java @@ -30,7 +30,8 @@ enum Unit { BYTES, NANOSECONDS, - COUNT + COUNT, + PERCENT } String name(); From 351cffbdc5a2a6078c557dd8e7938f9aee7fa9ab Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 15 May 2025 16:05:52 +0500 Subject: [PATCH 11/20] WIP --- .../engine/internal/metrics/EngineWorkerCountMetric.java | 2 +- .../java/io/aklivity/zilla/runtime/engine/metrics/Metric.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerCountMetric.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerCountMetric.java index 01a8181992..7aeefe269d 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerCountMetric.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerCountMetric.java @@ -40,7 +40,7 @@ public Kind kind() @Override public Unit unit() { - return Unit.PERCENT; + return Unit.COUNT; } @Override diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java index 8a107a9945..07164f806a 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java @@ -30,8 +30,7 @@ enum Unit { BYTES, NANOSECONDS, - COUNT, - PERCENT + COUNT } String name(); From b43665aa05167e06e4be2589ce8edc782205326b Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Thu, 15 May 2025 16:24:09 +0500 Subject: [PATCH 12/20] Testing --- .../engine/internal/metrics/EngineWorkerUtilizationMetric.java | 2 +- .../java/io/aklivity/zilla/runtime/engine/metrics/Metric.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerUtilizationMetric.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerUtilizationMetric.java index 186f0eb182..15b382099b 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerUtilizationMetric.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerUtilizationMetric.java @@ -40,7 +40,7 @@ public Kind kind() @Override public Unit unit() { - return Unit.COUNT; + return Unit.PERCENT; } @Override diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java index 07164f806a..8a107a9945 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java @@ -30,7 +30,8 @@ enum Unit { BYTES, NANOSECONDS, - COUNT + COUNT, + PERCENT } String name(); From bcd0bb6874d5f66d62dbcc68791930daabe8d081 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 16 May 2025 09:56:17 +0500 Subject: [PATCH 13/20] Ignore namspace and binding ids --- .../metrics/internal/printer/MetricsPrinter.java | 9 ++++++--- .../engine/internal/registry/EngineWorker.java | 12 ++++-------- .../runtime/engine/metrics/reader/ScalarRecord.java | 4 ++-- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/runtime/command-metrics/src/main/java/io/aklivity/zilla/runtime/command/metrics/internal/printer/MetricsPrinter.java b/runtime/command-metrics/src/main/java/io/aklivity/zilla/runtime/command/metrics/internal/printer/MetricsPrinter.java index c9a2babe5a..e514b606cd 100644 --- a/runtime/command-metrics/src/main/java/io/aklivity/zilla/runtime/command/metrics/internal/printer/MetricsPrinter.java +++ b/runtime/command-metrics/src/main/java/io/aklivity/zilla/runtime/command/metrics/internal/printer/MetricsPrinter.java @@ -69,8 +69,8 @@ private void calculateColumnWidths() for (MetricRecord metric : records) { - namespaceWidth = Math.max(namespaceWidth, metric.namespace().length()); - bindingWidth = Math.max(bindingWidth, metric.binding().length()); + namespaceWidth = Math.max(namespaceWidth, metric.namespace() != null ? metric.namespace().length() : 0); + bindingWidth = Math.max(bindingWidth, metric.binding() != null ? metric.binding().length() : 0); metricWidth = Math.max(metricWidth, metric.metric().length()); valueWidth = Math.max(valueWidth, metricValues.get(metric).length()); } @@ -84,7 +84,10 @@ private void printRecords( out.format(format, NAMESPACE_HEADER, BINDING_HEADER, METRIC_HEADER, VALUE_HEADER); for (MetricRecord metric : records) { - out.format(format, metric.namespace(), metric.binding(), metric.metric(), metricValues.get(metric)); + String namespace = metric.namespace() != null ? metric.namespace() : ""; + String binding = metric.binding() != null ? metric.binding() : ""; + + out.format(format, namespace, binding, metric.metric(), metricValues.get(metric)); } out.println(); } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java index 16bca8d223..0cab2f63ee 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java @@ -154,6 +154,8 @@ public class EngineWorker implements EngineContext, Agent { + private static final long NO_NAMESPACE_BINDING_ID = -1L; + private static final int RESERVED_SIZE = 33; private static final int SHIFT_SIZE = 56; @@ -299,11 +301,8 @@ public EngineWorker( if (!readonly) { - final int engineId = labels.supplyLabelId("engine"); - final long bindingId = NamespacedId.id(engineId, engineId); - final int metricId = labels.supplyLabelId("engine.worker.count"); - supplyMetricWriter(GAUGE, bindingId, metricId).accept(1); + supplyMetricWriter(GAUGE, NO_NAMESPACE_BINDING_ID, metricId).accept(1); } final StreamsLayout streamsLayout = new StreamsLayout.Builder() @@ -778,12 +777,9 @@ public ConverterHandler supplyWriteConverter( @Override public LongConsumer supplyUtilizationMetric() { - final int engineId = labels.supplyLabelId("engine"); - final long bindingId = NamespacedId.id(engineId, engineId); - final int metricId = labels.supplyLabelId("engine.worker.utilization"); - return supplyMetricWriter(GAUGE, bindingId, metricId); + return supplyMetricWriter(GAUGE, NO_NAMESPACE_BINDING_ID, metricId); } @Override diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java index d54c8d66bc..a0501dcd2c 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java @@ -53,13 +53,13 @@ public long bindingId() public String namespace() { // implicit int -> long conversion, it's OK - return labelResolver.apply(namespaceId); + return namespaceId != -1 ? labelResolver.apply(namespaceId) : null; } @Override public String binding() { - return labelResolver.apply(bindingId); + return bindingId != -1 ? labelResolver.apply(bindingId) : null; } @Override From d64769c29d2d70df6ca67b81c5b7df11b5da79cb Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Tue, 20 May 2025 09:13:39 +0500 Subject: [PATCH 14/20] Refactor naming --- .../runtime/engine/internal/registry/EngineWorker.java | 7 +++---- .../zilla/runtime/engine/namespace/NamespacedId.java | 4 ++++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java index 0cab2f63ee..a3bdaa39fe 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java @@ -33,6 +33,7 @@ import static io.aklivity.zilla.runtime.engine.metrics.Metric.Kind.HISTOGRAM; import static io.aklivity.zilla.runtime.engine.metrics.MetricContext.Direction.RECEIVED; import static io.aklivity.zilla.runtime.engine.metrics.MetricContext.Direction.SENT; +import static io.aklivity.zilla.runtime.engine.namespace.NamespacedId.NO_NAMESPACED_ID; import static java.lang.System.currentTimeMillis; import static java.lang.ThreadLocal.withInitial; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -154,8 +155,6 @@ public class EngineWorker implements EngineContext, Agent { - private static final long NO_NAMESPACE_BINDING_ID = -1L; - private static final int RESERVED_SIZE = 33; private static final int SHIFT_SIZE = 56; @@ -302,7 +301,7 @@ public EngineWorker( if (!readonly) { final int metricId = labels.supplyLabelId("engine.worker.count"); - supplyMetricWriter(GAUGE, NO_NAMESPACE_BINDING_ID, metricId).accept(1); + supplyMetricWriter(GAUGE, NO_NAMESPACED_ID, metricId).accept(1); } final StreamsLayout streamsLayout = new StreamsLayout.Builder() @@ -779,7 +778,7 @@ public LongConsumer supplyUtilizationMetric() { final int metricId = labels.supplyLabelId("engine.worker.utilization"); - return supplyMetricWriter(GAUGE, NO_NAMESPACE_BINDING_ID, metricId); + return supplyMetricWriter(GAUGE, NO_NAMESPACED_ID, metricId); } @Override diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/namespace/NamespacedId.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/namespace/NamespacedId.java index effc9d8836..766e79e127 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/namespace/NamespacedId.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/namespace/NamespacedId.java @@ -17,6 +17,10 @@ public final class NamespacedId { + public static final int NO_NAMESPACE_ID = -1; + public static final int NO_LOCAL_ID = -1; + public static final long NO_NAMESPACED_ID = id(NO_NAMESPACE_ID, NO_LOCAL_ID); + public static int namespaceId( long namespacedId) { From 38c77379d5f4bec95cf9a824e09c6c6208aa095b Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Tue, 20 May 2025 16:17:26 +0500 Subject: [PATCH 15/20] Update scripts --- .../serializer/OtlpMetricsSerializer.java | 40 ++- .../metrics.with.service.name/client.rpt | 174 ++++------ .../metrics.with.service.name/server.rpt | 174 ++++------ .../otlp/application/metrics/client.rpt | 327 +++++------------- .../otlp/application/metrics/server.rpt | 327 +++++------------- 5 files changed, 352 insertions(+), 690 deletions(-) diff --git a/runtime/exporter-otlp/src/main/java/io/aklivity/zilla/runtime/exporter/otlp/internal/serializer/OtlpMetricsSerializer.java b/runtime/exporter-otlp/src/main/java/io/aklivity/zilla/runtime/exporter/otlp/internal/serializer/OtlpMetricsSerializer.java index b20419d5c5..124e91edd7 100644 --- a/runtime/exporter-otlp/src/main/java/io/aklivity/zilla/runtime/exporter/otlp/internal/serializer/OtlpMetricsSerializer.java +++ b/runtime/exporter-otlp/src/main/java/io/aklivity/zilla/runtime/exporter/otlp/internal/serializer/OtlpMetricsSerializer.java @@ -15,6 +15,7 @@ package io.aklivity.zilla.runtime.exporter.otlp.internal.serializer; import static io.aklivity.zilla.runtime.engine.metrics.Metric.Kind.COUNTER; +import static io.aklivity.zilla.runtime.engine.namespace.NamespacedId.NO_NAMESPACED_ID; import java.util.Arrays; import java.util.LinkedList; @@ -174,16 +175,24 @@ private JsonArrayBuilder attributes( MetricRecord record) { List attributes = new LinkedList<>(); - attributes.add(AttributeConfig.builder() - .name("namespace") - .value(record.namespace()) - .build() - ); - attributes.add(AttributeConfig.builder() - .name("binding") - .value(record.binding()) - .build() - ); + + if (record.namespace() != null) + { + attributes.add(AttributeConfig.builder() + .name("namespace") + .value(record.namespace()) + .build() + ); + } + + if (record.binding() != null) + { + attributes.add(AttributeConfig.builder() + .name("binding") + .value(record.binding()) + .build() + ); + } if (serviceNameAttribute != null) { attributes.add(serviceNameAttribute); @@ -310,11 +319,14 @@ public String nameByBinding( long bindingId) { String result = null; - KindConfig kind = resolveKind.apply(bindingId); - Map externalNames = KIND_METRIC_NAMES.get(kind); - if (externalNames != null) + if (bindingId != NO_NAMESPACED_ID) { - result = externalNames.get(internalMetricName); + KindConfig kind = resolveKind.apply(bindingId); + Map externalNames = KIND_METRIC_NAMES.get(kind); + if (externalNames != null) + { + result = externalNames.get(internalMetricName); + } } return result != null ? result : internalMetricName; } diff --git a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics.with.service.name/client.rpt b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics.with.service.name/client.rpt index 928e3225d2..28ad1582e3 100644 --- a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics.with.service.name/client.rpt +++ b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics.with.service.name/client.rpt @@ -19,107 +19,85 @@ connected write http:method "POST" write http:version "HTTP/1.1" write http:header "Host" "localhost:4318" -write http:header "Content-Length" "942" +write http:header "Content-Length" "1205" write '{' - '"resourceMetrics":[' - '{' - '"resource":{' - '"attributes":[' - '{' - '"key":"service.namespace",' - '"value":{' - '"stringValue":"example"' - '}' - '},' - '{' - '"key":"service.name",' - '"value":{' - '"stringValue":"zilla"' - '}' - '}' - ']' - '},' - '"scopeMetrics":[' - '{' - '"scope":{' - '"name":"OtlpMetricsSerializer",' - '"version":"1.0.0"' - '},' - '"metrics":[' - '{' - '"name":"test.counter",' - '"unit":"",' - '"description":"Description for test.counter",' - '"sum":{' - '"dataPoints":[' - '{' - '"asInt":77,' - '"timeUnixNano":0123456789123456789,' - '"attributes":[' - '{' - '"key":"namespace",' - '"value":{' - '"stringValue":"test"' - '}' - '},' - '{' - '"key":"binding",' - '"value":{' - '"stringValue":"net0"' - '}' - '},' - '{' - '"key":"service.name",' - '"value":{' - '"stringValue":"zilla"' - '}' - '}' - ']' - '}' - '],' - '"aggregationTemporality":2,' - '"isMonotonic":true' - '}' - '},' - '{' - '"name":"test.gauge",' - '"unit":"",' - '"description":"Description for test.gauge",' - '"gauge":{' - '"dataPoints":[' - '{' - '"asInt":66,' - '"timeUnixNano":0123456789123456789,' - '"attributes":[' - '{' - '"key":"namespace",' - '"value":{' - '"stringValue":"test"' - '}' - '},' - '{' - '"key":"binding",' - '"value":{' - '"stringValue":"net0"' - '}' - '},' - '{' - '"key":"service.name",' - '"value":{' - '"stringValue":"zilla"' - '}' - '}' - ']' - '}' - ']' - '}' - '}' - ']' - '}' - ']' + '"resourceMetrics":[{' + '"resource":{' + '"attributes":[{' + '"key":"service.namespace",' + '"value":{"stringValue":"example"}' + '},{' + '"key":"service.name",' + '"value":{"stringValue":"zilla"}' + '}]' + '},' + '"scopeMetrics":[{' + '"scope":{' + '"name":"OtlpMetricsSerializer",' + '"version":"1.0.0"' + '},' + '"metrics":[{' + '"name":"test.counter",' + '"unit":"",' + '"description":"Description for test.counter",' + '"sum":{' + '"dataPoints":[{' + '"asInt":77,' + '"timeUnixNano":' [0..19] ',' + '"attributes":[{' + '"key":"namespace",' + '"value":{"stringValue":"test"}' + '},{' + '"key":"binding",' + '"value":{"stringValue":"net0"}' + '},{' + '"key":"service.name",' + '"value":{"stringValue":"zilla"}' + '}]' + '}],' + '"aggregationTemporality":2,' + '"isMonotonic":true' '}' - ']' + '},{' + '"name":"engine.worker.count",' + '"unit":"",' + '"description":"Engine worker count",' + '"sum":{' + '"dataPoints":[{' + '"asInt":3,' + '"timeUnixNano":' [0..19] ',' + '"attributes":[{' + '"key":"service.name",' + '"value":{"stringValue":"zilla"}' + '}]' + '}],' + '"aggregationTemporality":2,' + '"isMonotonic":true' + '}' + '},{' + '"name":"test.gauge",' + '"unit":"",' + '"description":"Description for test.gauge",' + '"gauge":{' + '"dataPoints":[{' + '"asInt":66,' + '"timeUnixNano":' [0..19] ',' + '"attributes":[{' + '"key":"namespace",' + '"value":{"stringValue":"test"}' + '},{' + '"key":"binding",' + '"value":{"stringValue":"net0"}' + '},{' + '"key":"service.name",' + '"value":{"stringValue":"zilla"}' + '}]' + '}]' + '}' + '}]' + '}]' + '}]' '}' write close diff --git a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics.with.service.name/server.rpt b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics.with.service.name/server.rpt index c703e2f51b..ec4d08ea8b 100644 --- a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics.with.service.name/server.rpt +++ b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics.with.service.name/server.rpt @@ -20,107 +20,85 @@ connected read http:method "POST" read http:version "HTTP/1.1" read http:header "Host" "localhost:4318" -read http:header "Content-Length" "942" +read http:header "Content-Length" "1205" read '{' - '"resourceMetrics":[' - '{' - '"resource":{' - '"attributes":[' - '{' - '"key":"service.namespace",' - '"value":{' - '"stringValue":"example"' - '}' - '},' - '{' - '"key":"service.name",' - '"value":{' - '"stringValue":"zilla"' - '}' - '}' - ']' - '},' - '"scopeMetrics":[' - '{' - '"scope":{' - '"name":"OtlpMetricsSerializer",' - '"version":"1.0.0"' - '},' - '"metrics":[' - '{' - '"name":"test.counter",' - '"unit":"",' - '"description":"Description for test.counter",' - '"sum":{' - '"dataPoints":[' - '{' - '"asInt":77,' - '"timeUnixNano":' [0..19] ',' - '"attributes":[' - '{' - '"key":"namespace",' - '"value":{' - '"stringValue":"test"' - '}' - '},' - '{' - '"key":"binding",' - '"value":{' - '"stringValue":"net0"' - '}' - '},' - '{' - '"key":"service.name",' - '"value":{' - '"stringValue":"zilla"' - '}' - '}' - ']' - '}' - '],' - '"aggregationTemporality":2,' - '"isMonotonic":true' - '}' - '},' - '{' - '"name":"test.gauge",' - '"unit":"",' - '"description":"Description for test.gauge",' - '"gauge":{' - '"dataPoints":[' - '{' - '"asInt":66,' - '"timeUnixNano":' [0..19] ',' - '"attributes":[' - '{' - '"key":"namespace",' - '"value":{' - '"stringValue":"test"' - '}' - '},' - '{' - '"key":"binding",' - '"value":{' - '"stringValue":"net0"' - '}' - '},' - '{' - '"key":"service.name",' - '"value":{' - '"stringValue":"zilla"' - '}' - '}' - ']' - '}' - ']' - '}' - '}' - ']' - '}' - ']' + '"resourceMetrics":[{' + '"resource":{' + '"attributes":[{' + '"key":"service.namespace",' + '"value":{"stringValue":"example"}' + '},{' + '"key":"service.name",' + '"value":{"stringValue":"zilla"}' + '}]' + '},' + '"scopeMetrics":[{' + '"scope":{' + '"name":"OtlpMetricsSerializer",' + '"version":"1.0.0"' + '},' + '"metrics":[{' + '"name":"test.counter",' + '"unit":"",' + '"description":"Description for test.counter",' + '"sum":{' + '"dataPoints":[{' + '"asInt":77,' + '"timeUnixNano":' [0..19] ',' + '"attributes":[{' + '"key":"namespace",' + '"value":{"stringValue":"test"}' + '},{' + '"key":"binding",' + '"value":{"stringValue":"net0"}' + '},{' + '"key":"service.name",' + '"value":{"stringValue":"zilla"}' + '}]' + '}],' + '"aggregationTemporality":2,' + '"isMonotonic":true' '}' - ']' + '},{' + '"name":"engine.worker.count",' + '"unit":"",' + '"description":"Engine worker count",' + '"sum":{' + '"dataPoints":[{' + '"asInt":3,' + '"timeUnixNano":' [0..19] ',' + '"attributes":[{' + '"key":"service.name",' + '"value":{"stringValue":"zilla"}' + '}]' + '}],' + '"aggregationTemporality":2,' + '"isMonotonic":true' + '}' + '},{' + '"name":"test.gauge",' + '"unit":"",' + '"description":"Description for test.gauge",' + '"gauge":{' + '"dataPoints":[{' + '"asInt":66,' + '"timeUnixNano":' [0..19] ',' + '"attributes":[{' + '"key":"namespace",' + '"value":{"stringValue":"test"}' + '},{' + '"key":"binding",' + '"value":{"stringValue":"net0"}' + '},{' + '"key":"service.name",' + '"value":{"stringValue":"zilla"}' + '}]' + '}]' + '}' + '}]' + '}]' + '}]' '}' read closed diff --git a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt index 049a21def6..e221726990 100644 --- a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt +++ b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt @@ -19,249 +19,96 @@ connected write http:method "POST" write http:version "HTTP/1.1" write http:header "Host" "localhost:4318" -write http:header "Content-Length" "1953" +write http:header "Content-Length" "2162" write '{' - '"resourceMetrics":[' - '{' - '"resource":{' - '"attributes":[' - '{' - '"key":"service.namespace",' - '"value":{' - '"stringValue":"example"' - '}' - '}' - ']' - '},' - '"scopeMetrics":[' - '{' - '"scope":{' - '"name":"OtlpMetricsSerializer",' - '"version":"1.0.0"' - '},' - '"metrics":[' - '{' - '"name":"test.counter",' - '"unit":"",' - '"description":"Description for test.counter",' - '"sum":{' - '"dataPoints":[' - '{' - '"asInt":77,' - '"timeUnixNano":0123456789123456789,' - '"attributes":[' - '{' - '"key":"namespace",' - '"value":{' - '"stringValue":"test"' - '}' - '},' - '{' - '"key":"binding",' - '"value":{' - '"stringValue":"net0"' - '}' - '}' - ']' - '}' - '],' - '"aggregationTemporality":2,' - '"isMonotonic":true' - '}' - '},' - '{' - '"name":"test.gauge",' - '"unit":"",' - '"description":"Description for test.gauge",' - '"gauge":{' - '"dataPoints":[' - '{' - '"asInt":66,' - '"timeUnixNano":0123456789123456789,' - '"attributes":[' - '{' - '"key":"namespace",' - '"value":{' - '"stringValue":"test"' - '}' - '},' - '{' - '"key":"binding",' - '"value":{' - '"stringValue":"net0"' - '}' - '}' - ']' - '}' - ']' - '}' - '},' - '{' - '"name":"test.histogram",' - '"description":"Description for test.histogram",' - '"unit":"bytes",' - '"histogram":{' - '"aggregationTemporality":2,' - '"dataPoints":[' - '{' - '"timeUnixNano":0123456789123456789,' - '"attributes":[' - '{' - '"key":"namespace",' - '"value":{' - '"stringValue":"test"' - '}' - '},' - '{' - '"key":"binding",' - '"value":{' - '"stringValue":"net0"' - '}' - '}' - '],' - '"min":1,' - '"max":31,' - '"sum":63,' - '"count":3,' - '"explicitBounds":[' - '2,' - '4,' - '8,' - '16,' - '32,' - '64,' - '128,' - '256,' - '512,' - '1024,' - '2048,' - '4096,' - '8192,' - '16384,' - '32768,' - '65536,' - '131072,' - '262144,' - '524288,' - '1048576,' - '2097152,' - '4194304,' - '8388608,' - '16777216,' - '33554432,' - '67108864,' - '134217728,' - '268435456,' - '536870912,' - '1073741824,' - '2147483648,' - '4294967296,' - '8589934592,' - '17179869184,' - '34359738368,' - '68719476736,' - '137438953472,' - '274877906944,' - '549755813888,' - '1099511627776,' - '2199023255552,' - '4398046511104,' - '8796093022208,' - '17592186044416,' - '35184372088832,' - '70368744177664,' - '140737488355328,' - '281474976710656,' - '562949953421312,' - '1125899906842624,' - '2251799813685248,' - '4503599627370496,' - '9007199254740992,' - '18014398509481984,' - '36028797018963968,' - '72057594037927936,' - '144115188075855872,' - '288230376151711744,' - '576460752303423488,' - '1152921504606846976,' - '2305843009213693952,' - '4611686018427387904' - '],' - '"bucketCounts":[' - '1,' - '0,' - '0,' - '0,' - '2,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0' - ']' - '}' - ']' - '}' - '}' - ']' - '}' - ']' + '"resourceMetrics":[{' + '"resource":{' + '"attributes":[{' + '"key":"service.namespace",' + '"value":{"stringValue":"example"}' + '}]' + '},' + '"scopeMetrics":[{' + '"scope":{' + '"name":"OtlpMetricsSerializer",' + '"version":"1.0.0"' + '},' + '"metrics":[{' + '"name":"test.counter",' + '"unit":"",' + '"description":"Description for test.counter",' + '"sum":{' + '"dataPoints":[{' + '"asInt":77,' + '"timeUnixNano":' [0..19] ',' + '"attributes":[{' + '"key":"namespace",' + '"value":{"stringValue":"test"}' + '},{' + '"key":"binding",' + '"value":{"stringValue":"net0"}' + '}]' + '}],' + '"aggregationTemporality":2,' + '"isMonotonic":true' '}' - ']' + '},{' + '"name":"engine.worker.count",' + '"unit":"",' + '"description":"Engine worker count",' + '"sum":{' + '"dataPoints":[{' + '"asInt":3,' + '"timeUnixNano":' [0..19] ',' + '"attributes":[]' + '}],' + '"aggregationTemporality":2,' + '"isMonotonic":true' + '}' + '},{' + '"name":"test.gauge",' + '"unit":"",' + '"description":"Description for test.gauge",' + '"gauge":{' + '"dataPoints":[{' + '"asInt":66,' + '"timeUnixNano":' [0..19] ',' + '"attributes":[{' + '"key":"namespace",' + '"value":{"stringValue":"test"}' + '},{' + '"key":"binding",' + '"value":{"stringValue":"net0"}' + '}]' + '}]' + '}' + '},{' + '"name":"test.histogram",' + '"description":"Description for test.histogram",' + '"unit":"bytes",' + '"histogram":{' + '"aggregationTemporality":2,' + '"dataPoints":[{' + '"timeUnixNano":' [0..19] ',' + '"attributes":[{' + '"key":"namespace",' + '"value":{"stringValue":"test"}' + '},{' + '"key":"binding",' + '"value":{"stringValue":"net0"}' + '}],' + '"min":1,' + '"max":31,' + '"sum":63,' + '"count":3,' + '"explicitBounds":[2,4,8,16,32,64,128,256,512,1024,2048,4096,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,16777216,33554432,67108864,134217728,268435456,536870912,1073741824,2147483648,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944,549755813888,1099511627776,2199023255552,4398046511104,8796093022208,17592186044416,35184372088832,70368744177664,140737488355328,281474976710656,562949953421312,1125899906842624,2251799813685248,4503599627370496,9007199254740992,18014398509481984,36028797018963968,72057594037927936,144115188075855872,288230376151711744,576460752303423488,1152921504606846976,2305843009213693952,4611686018427387904],' + '"bucketCounts":[1,0,0,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]' + '}]' + '}' + '}]' + '}]' + '}]' '}' write close diff --git a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/server.rpt b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/server.rpt index 0b9e9dfeb4..9f98c010be 100644 --- a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/server.rpt +++ b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/server.rpt @@ -20,249 +20,96 @@ connected read http:method "POST" read http:version "HTTP/1.1" read http:header "Host" "localhost:4318" -read http:header "Content-Length" "1953" +read http:header "Content-Length" "2162" read '{' - '"resourceMetrics":[' - '{' - '"resource":{' - '"attributes":[' - '{' - '"key":"service.namespace",' - '"value":{' - '"stringValue":"example"' - '}' - '}' - ']' - '},' - '"scopeMetrics":[' - '{' - '"scope":{' - '"name":"OtlpMetricsSerializer",' - '"version":"1.0.0"' - '},' - '"metrics":[' - '{' - '"name":"test.counter",' - '"unit":"",' - '"description":"Description for test.counter",' - '"sum":{' - '"dataPoints":[' - '{' - '"asInt":77,' - '"timeUnixNano":' [0..19] ',' - '"attributes":[' - '{' - '"key":"namespace",' - '"value":{' - '"stringValue":"test"' - '}' - '},' - '{' - '"key":"binding",' - '"value":{' - '"stringValue":"net0"' - '}' - '}' - ']' - '}' - '],' - '"aggregationTemporality":2,' - '"isMonotonic":true' - '}' - '},' - '{' - '"name":"test.gauge",' - '"unit":"",' - '"description":"Description for test.gauge",' - '"gauge":{' - '"dataPoints":[' - '{' - '"asInt":66,' - '"timeUnixNano":' [0..19] ',' - '"attributes":[' - '{' - '"key":"namespace",' - '"value":{' - '"stringValue":"test"' - '}' - '},' - '{' - '"key":"binding",' - '"value":{' - '"stringValue":"net0"' - '}' - '}' - ']' - '}' - ']' - '}' - '},' - '{' - '"name":"test.histogram",' - '"description":"Description for test.histogram",' - '"unit":"bytes",' - '"histogram":{' - '"aggregationTemporality":2,' - '"dataPoints":[' - '{' - '"timeUnixNano":' [0..19] ',' - '"attributes":[' - '{' - '"key":"namespace",' - '"value":{' - '"stringValue":"test"' - '}' - '},' - '{' - '"key":"binding",' - '"value":{' - '"stringValue":"net0"' - '}' - '}' - '],' - '"min":1,' - '"max":31,' - '"sum":63,' - '"count":3,' - '"explicitBounds":[' - '2,' - '4,' - '8,' - '16,' - '32,' - '64,' - '128,' - '256,' - '512,' - '1024,' - '2048,' - '4096,' - '8192,' - '16384,' - '32768,' - '65536,' - '131072,' - '262144,' - '524288,' - '1048576,' - '2097152,' - '4194304,' - '8388608,' - '16777216,' - '33554432,' - '67108864,' - '134217728,' - '268435456,' - '536870912,' - '1073741824,' - '2147483648,' - '4294967296,' - '8589934592,' - '17179869184,' - '34359738368,' - '68719476736,' - '137438953472,' - '274877906944,' - '549755813888,' - '1099511627776,' - '2199023255552,' - '4398046511104,' - '8796093022208,' - '17592186044416,' - '35184372088832,' - '70368744177664,' - '140737488355328,' - '281474976710656,' - '562949953421312,' - '1125899906842624,' - '2251799813685248,' - '4503599627370496,' - '9007199254740992,' - '18014398509481984,' - '36028797018963968,' - '72057594037927936,' - '144115188075855872,' - '288230376151711744,' - '576460752303423488,' - '1152921504606846976,' - '2305843009213693952,' - '4611686018427387904' - '],' - '"bucketCounts":[' - '1,' - '0,' - '0,' - '0,' - '2,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0,' - '0' - ']' - '}' - ']' - '}' - '}' - ']' - '}' - ']' + '"resourceMetrics":[{' + '"resource":{' + '"attributes":[{' + '"key":"service.namespace",' + '"value":{"stringValue":"example"}' + '}]' + '},' + '"scopeMetrics":[{' + '"scope":{' + '"name":"OtlpMetricsSerializer",' + '"version":"1.0.0"' + '},' + '"metrics":[{' + '"name":"test.counter",' + '"unit":"",' + '"description":"Description for test.counter",' + '"sum":{' + '"dataPoints":[{' + '"asInt":77,' + '"timeUnixNano":' [0..19] ',' + '"attributes":[{' + '"key":"namespace",' + '"value":{"stringValue":"test"}' + '},{' + '"key":"binding",' + '"value":{"stringValue":"net0"}' + '}]' + '}],' + '"aggregationTemporality":2,' + '"isMonotonic":true' '}' - ']' + '},{' + '"name":"engine.worker.count",' + '"unit":"",' + '"description":"Engine worker count",' + '"sum":{' + '"dataPoints":[{' + '"asInt":3,' + '"timeUnixNano":' [0..19] ',' + '"attributes":[]' + '}],' + '"aggregationTemporality":2,' + '"isMonotonic":true' + '}' + '},{' + '"name":"test.gauge",' + '"unit":"",' + '"description":"Description for test.gauge",' + '"gauge":{' + '"dataPoints":[{' + '"asInt":66,' + '"timeUnixNano":' [0..19] ',' + '"attributes":[{' + '"key":"namespace",' + '"value":{"stringValue":"test"}' + '},{' + '"key":"binding",' + '"value":{"stringValue":"net0"}' + '}]' + '}]' + '}' + '},{' + '"name":"test.histogram",' + '"description":"Description for test.histogram",' + '"unit":"bytes",' + '"histogram":{' + '"aggregationTemporality":2,' + '"dataPoints":[{' + '"timeUnixNano":' [0..19] ',' + '"attributes":[{' + '"key":"namespace",' + '"value":{"stringValue":"test"}' + '},{' + '"key":"binding",' + '"value":{"stringValue":"net0"}' + '}],' + '"min":1,' + '"max":31,' + '"sum":63,' + '"count":3,' + '"explicitBounds":[2,4,8,16,32,64,128,256,512,1024,2048,4096,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,16777216,33554432,67108864,134217728,268435456,536870912,1073741824,2147483648,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944,549755813888,1099511627776,2199023255552,4398046511104,8796093022208,17592186044416,35184372088832,70368744177664,140737488355328,281474976710656,562949953421312,1125899906842624,2251799813685248,4503599627370496,9007199254740992,18014398509481984,36028797018963968,72057594037927936,144115188075855872,288230376151711744,576460752303423488,1152921504606846976,2305843009213693952,4611686018427387904],' + '"bucketCounts":[1,0,0,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]' + '}]' + '}' + '}]' + '}]' + '}]' '}' read closed From eecaa512129469f4f2e0747c4c123bf69e3ee1d5 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Tue, 20 May 2025 16:35:03 +0500 Subject: [PATCH 16/20] Fix typo --- .../otlp/application/metrics.with.service.name/client.rpt | 6 +++--- .../specs/exporter/otlp/application/metrics/client.rpt | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics.with.service.name/client.rpt b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics.with.service.name/client.rpt index 28ad1582e3..3e20588e28 100644 --- a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics.with.service.name/client.rpt +++ b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics.with.service.name/client.rpt @@ -44,7 +44,7 @@ write '"sum":{' '"dataPoints":[{' '"asInt":77,' - '"timeUnixNano":' [0..19] ',' + '"timeUnixNano":0123456789123456789,' '"attributes":[{' '"key":"namespace",' '"value":{"stringValue":"test"}' @@ -66,7 +66,7 @@ write '"sum":{' '"dataPoints":[{' '"asInt":3,' - '"timeUnixNano":' [0..19] ',' + '"timeUnixNano":0123456789123456789,' '"attributes":[{' '"key":"service.name",' '"value":{"stringValue":"zilla"}' @@ -82,7 +82,7 @@ write '"gauge":{' '"dataPoints":[{' '"asInt":66,' - '"timeUnixNano":' [0..19] ',' + '"timeUnixNano":0123456789123456789,' '"attributes":[{' '"key":"namespace",' '"value":{"stringValue":"test"}' diff --git a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt index e221726990..15a492d797 100644 --- a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt +++ b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt @@ -41,7 +41,7 @@ write '"sum":{' '"dataPoints":[{' '"asInt":77,' - '"timeUnixNano":' [0..19] ',' + '"timeUnixNano":0123456789123456789,' '"attributes":[{' '"key":"namespace",' '"value":{"stringValue":"test"}' @@ -60,7 +60,7 @@ write '"sum":{' '"dataPoints":[{' '"asInt":3,' - '"timeUnixNano":' [0..19] ',' + '"timeUnixNano":0123456789123456789,' '"attributes":[]' '}],' '"aggregationTemporality":2,' @@ -73,7 +73,7 @@ write '"gauge":{' '"dataPoints":[{' '"asInt":66,' - '"timeUnixNano":' [0..19] ',' + '"timeUnixNano":0123456789123456789,' '"attributes":[{' '"key":"namespace",' '"value":{"stringValue":"test"}' @@ -90,7 +90,7 @@ write '"histogram":{' '"aggregationTemporality":2,' '"dataPoints":[{' - '"timeUnixNano":' [0..19] ',' + '"timeUnixNano":0123456789123456789,' '"attributes":[{' '"key":"namespace",' '"value":{"stringValue":"test"}' From d6e8cd4252cc672db14274b16c9a3123156e6540 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 23 May 2025 11:18:44 +0500 Subject: [PATCH 17/20] Apply feedback from PR --- .../tcp/internal/TcpBindingContext.java | 5 +- ...tyTracker.java => TcpCapacityTracker.java} | 16 ++- .../tcp/internal/stream/TcpClientFactory.java | 4 +- .../tcp/internal/stream/TcpClientRouter.java | 6 +- .../tcp/internal/stream/TcpServerFactory.java | 7 +- .../tcp/internal/stream/TcpServerRouter.java | 6 +- .../EngineWorkerUtilizationMetric.java | 2 +- .../zilla/runtime/engine/metrics/Metric.java | 3 +- .../engine/metrics/reader/ScalarRecord.java | 4 +- .../schema/metrics/engine.schema.patch.json | 7 - .../otlp/application/metrics/client.rpt | 130 +++++++++++++++++- .../otlp/application/metrics/server.rpt | 130 +++++++++++++++++- 12 files changed, 283 insertions(+), 37 deletions(-) rename runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/{CapacityTracker.java => TcpCapacityTracker.java} (81%) delete mode 100644 specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/metrics/engine.schema.patch.json diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBindingContext.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBindingContext.java index a7465a9c2b..9d8edd1981 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBindingContext.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBindingContext.java @@ -15,13 +15,11 @@ */ package io.aklivity.zilla.runtime.binding.tcp.internal; -import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_WORKER_CAPACITY; import static io.aklivity.zilla.runtime.engine.config.KindConfig.CLIENT; import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER; import java.util.EnumMap; import java.util.Map; -import java.util.function.LongConsumer; import io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpClientFactory; import io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpServerFactory; @@ -40,8 +38,7 @@ final class TcpBindingContext implements BindingContext TcpConfiguration config, EngineContext context) { - LongConsumer capacityUsage = context.supplyUtilizationMetric(); - CapacityTracker capacity = new CapacityTracker(ENGINE_WORKER_CAPACITY.getAsInt(config), capacityUsage); + TcpCapacityTracker capacity = new TcpCapacityTracker(config, context); Map factories = new EnumMap<>(KindConfig.class); factories.put(SERVER, new TcpServerFactory(config, context, capacity)); factories.put(CLIENT, new TcpClientFactory(config, context, capacity)); diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/CapacityTracker.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpCapacityTracker.java similarity index 81% rename from runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/CapacityTracker.java rename to runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpCapacityTracker.java index 4260c7a84c..37fb4ce85e 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/CapacityTracker.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpCapacityTracker.java @@ -15,11 +15,15 @@ */ package io.aklivity.zilla.runtime.binding.tcp.internal; +import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_WORKER_CAPACITY; + import java.util.function.LongConsumer; import org.agrona.collections.MutableInteger; -public final class CapacityTracker +import io.aklivity.zilla.runtime.engine.EngineContext; + +public final class TcpCapacityTracker { private final MutableInteger capacity; private final LongConsumer capacityUsage; @@ -27,13 +31,13 @@ public final class CapacityTracker private int capacityPercentage; - public CapacityTracker( - int initialCapacity, - LongConsumer capacityUsage) + public TcpCapacityTracker( + TcpConfiguration config, + EngineContext context) { - this.initialCapacity = initialCapacity; + this.initialCapacity = ENGINE_WORKER_CAPACITY.getAsInt(config); this.capacity = new MutableInteger(initialCapacity); - this.capacityUsage = capacityUsage; + this.capacityUsage = context.supplyUtilizationMetric(); } public int capacity() diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientFactory.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientFactory.java index e478f6954e..8bac1363b4 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientFactory.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientFactory.java @@ -43,7 +43,7 @@ import org.agrona.concurrent.UnsafeBuffer; import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig; -import io.aklivity.zilla.runtime.binding.tcp.internal.CapacityTracker; +import io.aklivity.zilla.runtime.binding.tcp.internal.TcpCapacityTracker; import io.aklivity.zilla.runtime.binding.tcp.internal.TcpConfiguration; import io.aklivity.zilla.runtime.binding.tcp.internal.TcpEventContext; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpBindingConfig; @@ -101,7 +101,7 @@ public class TcpClientFactory implements TcpStreamFactory public TcpClientFactory( TcpConfiguration config, EngineContext context, - CapacityTracker capacity) + TcpCapacityTracker capacity) { TcpEventContext event = new TcpEventContext(context); this.router = new TcpClientRouter(context, event, capacity); diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java index 3fbb62ab33..456098c65f 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java @@ -29,7 +29,7 @@ import org.agrona.collections.Long2ObjectHashMap; import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig; -import io.aklivity.zilla.runtime.binding.tcp.internal.CapacityTracker; +import io.aklivity.zilla.runtime.binding.tcp.internal.TcpCapacityTracker; import io.aklivity.zilla.runtime.binding.tcp.internal.TcpEventContext; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpBindingConfig; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpRouteConfig; @@ -49,14 +49,14 @@ public final class TcpClientRouter private final byte[] ipv6ros = new byte[16]; private final Function resolveHost; - private final CapacityTracker capacity; + private final TcpCapacityTracker capacity; private final Long2ObjectHashMap bindings; private final TcpEventContext event; public TcpClientRouter( EngineContext context, TcpEventContext event, - CapacityTracker capacity) + TcpCapacityTracker capacity) { this.resolveHost = context::resolveHost; this.event = event; diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java index fe64005bde..c0cb1dca90 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java @@ -43,7 +43,7 @@ import org.agrona.concurrent.UnsafeBuffer; import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig; -import io.aklivity.zilla.runtime.binding.tcp.internal.CapacityTracker; +import io.aklivity.zilla.runtime.binding.tcp.internal.TcpCapacityTracker; import io.aklivity.zilla.runtime.binding.tcp.internal.TcpConfiguration; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpBindingConfig; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpRouteConfig; @@ -103,7 +103,7 @@ public class TcpServerFactory implements TcpStreamFactory public TcpServerFactory( TcpConfiguration config, EngineContext context, - CapacityTracker capacity) + TcpCapacityTracker capacity) { this.router = new TcpServerRouter(context, this::handleAccept, capacity); this.writeBuffer = context.writeBuffer(); @@ -171,8 +171,7 @@ private int handleAccept( ServerSocketChannel server = (ServerSocketChannel) acceptKey.channel(); - for (SocketChannel channel = router.accept(server); channel != null; - channel = router.accept(server)) + for (SocketChannel channel = router.accept(server); channel != null; channel = router.accept(server)) { channel.configureBlocking(false); channel.setOption(TCP_NODELAY, options.nodelay); diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java index b903b6a5c6..4c8d631cbe 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java @@ -28,7 +28,7 @@ import org.agrona.CloseHelper; import org.agrona.collections.Long2ObjectHashMap; -import io.aklivity.zilla.runtime.binding.tcp.internal.CapacityTracker; +import io.aklivity.zilla.runtime.binding.tcp.internal.TcpCapacityTracker; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpBindingConfig; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpServerBindingConfig; import io.aklivity.zilla.runtime.engine.EngineContext; @@ -40,14 +40,14 @@ public final class TcpServerRouter private final ToIntFunction acceptHandler; private final Function supplyPollerKey; private final Long2ObjectHashMap serversById; - private final CapacityTracker capacity; + private final TcpCapacityTracker capacity; private boolean unbound; public TcpServerRouter( EngineContext context, ToIntFunction acceptHandler, - CapacityTracker capacity) + TcpCapacityTracker capacity) { this.bindings = new Long2ObjectHashMap<>(); this.supplyPollerKey = context::supplyPollerKey; diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerUtilizationMetric.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerUtilizationMetric.java index 15b382099b..186f0eb182 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerUtilizationMetric.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/metrics/EngineWorkerUtilizationMetric.java @@ -40,7 +40,7 @@ public Kind kind() @Override public Unit unit() { - return Unit.PERCENT; + return Unit.COUNT; } @Override diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java index 8a107a9945..07164f806a 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/Metric.java @@ -30,8 +30,7 @@ enum Unit { BYTES, NANOSECONDS, - COUNT, - PERCENT + COUNT } String name(); diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java index a0501dcd2c..53c9319838 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java @@ -22,6 +22,8 @@ import java.util.function.LongFunction; import java.util.function.LongSupplier; +import io.aklivity.zilla.runtime.engine.namespace.NamespacedId; + public class ScalarRecord implements MetricRecord { private final long bindingId; @@ -53,7 +55,7 @@ public long bindingId() public String namespace() { // implicit int -> long conversion, it's OK - return namespaceId != -1 ? labelResolver.apply(namespaceId) : null; + return namespaceId != NamespacedId.NO_NAMESPACE_ID ? labelResolver.apply(namespaceId) : null; } @Override diff --git a/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/metrics/engine.schema.patch.json b/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/metrics/engine.schema.patch.json deleted file mode 100644 index 4da6bb14ac..0000000000 --- a/specs/engine.spec/src/main/scripts/io/aklivity/zilla/specs/engine/schema/metrics/engine.schema.patch.json +++ /dev/null @@ -1,7 +0,0 @@ -[ - { - "op": "add", - "path": "/$defs/telemetry/metrics/items/enum/-", - "value": "engine.worker.utilization" - } -] diff --git a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt index 15a492d797..9fdffb093b 100644 --- a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt +++ b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt @@ -102,8 +102,134 @@ write '"max":31,' '"sum":63,' '"count":3,' - '"explicitBounds":[2,4,8,16,32,64,128,256,512,1024,2048,4096,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,16777216,33554432,67108864,134217728,268435456,536870912,1073741824,2147483648,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944,549755813888,1099511627776,2199023255552,4398046511104,8796093022208,17592186044416,35184372088832,70368744177664,140737488355328,281474976710656,562949953421312,1125899906842624,2251799813685248,4503599627370496,9007199254740992,18014398509481984,36028797018963968,72057594037927936,144115188075855872,288230376151711744,576460752303423488,1152921504606846976,2305843009213693952,4611686018427387904],' - '"bucketCounts":[1,0,0,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]' + '"explicitBounds":[' + '2,' + '4,' + '8,' + '16,' + '32,' + '64,' + '128,' + '256,' + '512,' + '1024,' + '2048,' + '4096,' + '8192,' + '16384,' + '32768,' + '65536,' + '131072,' + '262144,' + '524288,' + '1048576,' + '2097152,' + '4194304,' + '8388608,' + '16777216,' + '33554432,' + '67108864,' + '134217728,' + '268435456,' + '536870912,' + '1073741824,' + '2147483648,' + '4294967296,' + '8589934592,' + '17179869184,' + '34359738368,' + '68719476736,' + '137438953472,' + '274877906944,' + '549755813888,' + '1099511627776,' + '2199023255552,' + '4398046511104,' + '8796093022208,' + '17592186044416,' + '35184372088832,' + '70368744177664,' + '140737488355328,' + '281474976710656,' + '562949953421312,' + '1125899906842624,' + '2251799813685248,' + '4503599627370496,' + '9007199254740992,' + '18014398509481984,' + '36028797018963968,' + '72057594037927936,' + '144115188075855872,' + '288230376151711744,' + '576460752303423488,' + '1152921504606846976,' + '2305843009213693952,' + '4611686018427387904' + '],' + '"bucketCounts":[' + '1,' + '0,' + '0,' + '0,' + '2,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0' + ']' '}]' '}' '}]' diff --git a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/server.rpt b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/server.rpt index 9f98c010be..ae2086d51b 100644 --- a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/server.rpt +++ b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/server.rpt @@ -103,8 +103,134 @@ read '"max":31,' '"sum":63,' '"count":3,' - '"explicitBounds":[2,4,8,16,32,64,128,256,512,1024,2048,4096,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,16777216,33554432,67108864,134217728,268435456,536870912,1073741824,2147483648,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944,549755813888,1099511627776,2199023255552,4398046511104,8796093022208,17592186044416,35184372088832,70368744177664,140737488355328,281474976710656,562949953421312,1125899906842624,2251799813685248,4503599627370496,9007199254740992,18014398509481984,36028797018963968,72057594037927936,144115188075855872,288230376151711744,576460752303423488,1152921504606846976,2305843009213693952,4611686018427387904],' - '"bucketCounts":[1,0,0,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]' + '"explicitBounds":[' + '2,' + '4,' + '8,' + '16,' + '32,' + '64,' + '128,' + '256,' + '512,' + '1024,' + '2048,' + '4096,' + '8192,' + '16384,' + '32768,' + '65536,' + '131072,' + '262144,' + '524288,' + '1048576,' + '2097152,' + '4194304,' + '8388608,' + '16777216,' + '33554432,' + '67108864,' + '134217728,' + '268435456,' + '536870912,' + '1073741824,' + '2147483648,' + '4294967296,' + '8589934592,' + '17179869184,' + '34359738368,' + '68719476736,' + '137438953472,' + '274877906944,' + '549755813888,' + '1099511627776,' + '2199023255552,' + '4398046511104,' + '8796093022208,' + '17592186044416,' + '35184372088832,' + '70368744177664,' + '140737488355328,' + '281474976710656,' + '562949953421312,' + '1125899906842624,' + '2251799813685248,' + '4503599627370496,' + '9007199254740992,' + '18014398509481984,' + '36028797018963968,' + '72057594037927936,' + '144115188075855872,' + '288230376151711744,' + '576460752303423488,' + '1152921504606846976,' + '2305843009213693952,' + '4611686018427387904' + '],' + '"bucketCounts":[' + '1,' + '0,' + '0,' + '0,' + '2,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0,' + '0' + ']' '}]' '}' '}]' From e6d6221133f6d26350823880517ed19228f401dc Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 23 May 2025 11:19:37 +0500 Subject: [PATCH 18/20] Fix remaining issues --- .../zilla/runtime/engine/metrics/reader/ScalarRecord.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java index 53c9319838..aba2a8b1da 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java @@ -61,7 +61,7 @@ public String namespace() @Override public String binding() { - return bindingId != -1 ? labelResolver.apply(bindingId) : null; + return bindingId != NamespacedId.NO_NAMESPACE_ID ? labelResolver.apply(bindingId) : null; } @Override From 661ef9b2b5f4739a54eb4ac62b301f50832f38ef Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 23 May 2025 11:32:32 +0500 Subject: [PATCH 19/20] Feedback applied --- .../zilla/runtime/engine/metrics/reader/ScalarRecord.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java index aba2a8b1da..b591a721fe 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/metrics/reader/ScalarRecord.java @@ -61,7 +61,7 @@ public String namespace() @Override public String binding() { - return bindingId != NamespacedId.NO_NAMESPACE_ID ? labelResolver.apply(bindingId) : null; + return bindingId != NamespacedId.NO_LOCAL_ID ? labelResolver.apply(bindingId) : null; } @Override From b2c358b867481f73c1ea793fb8f5feb7c460e8b1 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 23 May 2025 18:35:36 +0500 Subject: [PATCH 20/20] Fix typo --- .../zilla/specs/exporter/otlp/application/metrics/client.rpt | 1 + .../zilla/specs/exporter/otlp/application/metrics/server.rpt | 1 + 2 files changed, 2 insertions(+) diff --git a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt index 9fdffb093b..7cd102e81d 100644 --- a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt +++ b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/client.rpt @@ -228,6 +228,7 @@ write '0,' '0,' '0,' + '0,' '0' ']' '}]' diff --git a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/server.rpt b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/server.rpt index ae2086d51b..9d7c1ba5a1 100644 --- a/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/server.rpt +++ b/specs/exporter-otlp.spec/src/main/scripts/io/aklivity/zilla/specs/exporter/otlp/application/metrics/server.rpt @@ -229,6 +229,7 @@ read '0,' '0,' '0,' + '0,' '0' ']' '}]'