Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ public ConverterHandler supplyWriteConverter(
return null;
}

@Override
public LongConsumer supplyUtilizationMetric()
{
return null;
}

@Override
public Path resolvePath(
String location)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
*/
package io.aklivity.zilla.runtime.binding.tcp.internal;

import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_WORKER_CAPACITY;
import static io.aklivity.zilla.runtime.engine.config.KindConfig.CLIENT;
import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER;

import java.util.EnumMap;
import java.util.Map;

import org.agrona.collections.MutableInteger;

import io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpClientFactory;
import io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpServerFactory;
import io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpStreamFactory;
Expand All @@ -41,7 +38,7 @@ final class TcpBindingContext implements BindingContext
TcpConfiguration config,
EngineContext context)
{
MutableInteger capacity = new MutableInteger(ENGINE_WORKER_CAPACITY.getAsInt(config));
TcpCapacityTracker capacity = new TcpCapacityTracker(config, context);
Map<KindConfig, TcpStreamFactory> factories = new EnumMap<>(KindConfig.class);
factories.put(SERVER, new TcpServerFactory(config, context, capacity));
factories.put(CLIENT, new TcpClientFactory(config, context, capacity));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2021-2024 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.tcp.internal;

import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_WORKER_CAPACITY;

import java.util.function.LongConsumer;

import org.agrona.collections.MutableInteger;

import io.aklivity.zilla.runtime.engine.EngineContext;

public final class TcpCapacityTracker
{
private final MutableInteger capacity;
private final LongConsumer capacityUsage;
private final int initialCapacity;

private int capacityPercentage;

public TcpCapacityTracker(
TcpConfiguration config,
EngineContext context)
{
this.initialCapacity = ENGINE_WORKER_CAPACITY.getAsInt(config);
this.capacity = new MutableInteger(initialCapacity);
this.capacityUsage = context.supplyUtilizationMetric();
}

public int capacity()
{
return capacity.get();
}

public int incrementAndGet()
{
int newCapacity = capacity.incrementAndGet();
capacityChanged(newCapacity);

return newCapacity;
}

public int decrementAndGet()
{
int newCapacity = capacity.decrementAndGet();
capacityChanged(newCapacity);

return newCapacity;
}

public int get()
{
return capacity.get();
}

private void capacityChanged(
int newCapacity)
{
final int newCapacityPercentage = 100 - (newCapacity * 100 / initialCapacity);
final int percentageDiff = newCapacityPercentage - capacityPercentage;

if (Math.abs(percentageDiff) >= 1)
{
capacityUsage.accept(percentageDiff);
}

capacityPercentage = newCapacityPercentage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ public String format(
switch (extension.kind())
{
case DNS_FAILED:
{
final TcpDnsFailedExFW ex = extension.dnsFailed();
result = String.format("Unable to resolve host dns for address (%s).", asString(ex.address()));
final TcpDnsFailedExFW dnsFailed = extension.dnsFailed();
result = String.format("Unable to resolve host dns for address (%s).", asString(dnsFailed.address()));
break;
}
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.MutableInteger;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig;
import io.aklivity.zilla.runtime.binding.tcp.internal.TcpCapacityTracker;
import io.aklivity.zilla.runtime.binding.tcp.internal.TcpConfiguration;
import io.aklivity.zilla.runtime.binding.tcp.internal.TcpEventContext;
import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpBindingConfig;
import io.aklivity.zilla.runtime.binding.tcp.internal.types.Flyweight;
import io.aklivity.zilla.runtime.binding.tcp.internal.types.OctetsFW;
Expand Down Expand Up @@ -100,9 +101,10 @@ public class TcpClientFactory implements TcpStreamFactory
public TcpClientFactory(
TcpConfiguration config,
EngineContext context,
MutableInteger capacity)
TcpCapacityTracker capacity)
{
this.router = new TcpClientRouter(context, capacity);
TcpEventContext event = new TcpEventContext(context);
this.router = new TcpClientRouter(context, event, capacity);
this.writeBuffer = context.writeBuffer();
this.writeByteBuffer = ByteBuffer.allocateDirect(writeBuffer.capacity()).order(nativeOrder());
this.bufferPool = context.bufferPool();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import java.util.function.Predicate;

import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.MutableInteger;

import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig;
import io.aklivity.zilla.runtime.binding.tcp.internal.TcpCapacityTracker;
import io.aklivity.zilla.runtime.binding.tcp.internal.TcpEventContext;
import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpBindingConfig;
import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpRouteConfig;
Expand All @@ -49,18 +49,19 @@ public final class TcpClientRouter
private final byte[] ipv6ros = new byte[16];

private final Function<String, InetAddress[]> resolveHost;
private final MutableInteger capacity;
private final TcpCapacityTracker capacity;
private final Long2ObjectHashMap<TcpBindingConfig> bindings;
private final TcpEventContext event;

public TcpClientRouter(
EngineContext context,
MutableInteger capacity)
TcpEventContext event,
TcpCapacityTracker capacity)
{
this.resolveHost = context::resolveHost;
this.event = event;
this.capacity = capacity;
this.bindings = new Long2ObjectHashMap<>();
this.event = new TcpEventContext(context);
}

public void attach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.MutableInteger;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.tcp.config.TcpOptionsConfig;
import io.aklivity.zilla.runtime.binding.tcp.internal.TcpCapacityTracker;
import io.aklivity.zilla.runtime.binding.tcp.internal.TcpConfiguration;
import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpBindingConfig;
import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpRouteConfig;
Expand Down Expand Up @@ -103,7 +103,7 @@ public class TcpServerFactory implements TcpStreamFactory
public TcpServerFactory(
TcpConfiguration config,
EngineContext context,
MutableInteger capacity)
TcpCapacityTracker capacity)
{
this.router = new TcpServerRouter(context, this::handleAccept, capacity);
this.writeBuffer = context.writeBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

import org.agrona.CloseHelper;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.MutableInteger;

import io.aklivity.zilla.runtime.binding.tcp.internal.TcpCapacityTracker;
import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpBindingConfig;
import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpServerBindingConfig;
import io.aklivity.zilla.runtime.engine.EngineContext;
Expand All @@ -40,14 +40,14 @@ public final class TcpServerRouter
private final ToIntFunction<PollerKey> acceptHandler;
private final Function<SelectableChannel, PollerKey> supplyPollerKey;
private final Long2ObjectHashMap<TcpServerBindingConfig> serversById;
private final MutableInteger capacity;
private final TcpCapacityTracker capacity;

private boolean unbound;

public TcpServerRouter(
EngineContext context,
ToIntFunction<PollerKey> acceptHandler,
MutableInteger capacity)
TcpCapacityTracker capacity)
{
this.bindings = new Long2ObjectHashMap<>();
this.supplyPollerKey = context::supplyPollerKey;
Expand Down Expand Up @@ -115,7 +115,8 @@ public void close(
{
CloseHelper.quietClose(channel);

if (unbound && capacity.incrementAndGet() > 0)
int newCapacity = capacity.incrementAndGet();
if (unbound && newCapacity > 0)
{
bindings.values().stream()
.filter(b -> b.kind == SERVER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ public ConverterHandler supplyWriteConverter(
return null;
}

@Override
public LongConsumer supplyUtilizationMetric()
{
return null;
}

@Override
public Path resolvePath(
String location)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ private void calculateColumnWidths()

for (MetricRecord metric : records)
{
namespaceWidth = Math.max(namespaceWidth, metric.namespace().length());
bindingWidth = Math.max(bindingWidth, metric.binding().length());
namespaceWidth = Math.max(namespaceWidth, metric.namespace() != null ? metric.namespace().length() : 0);
bindingWidth = Math.max(bindingWidth, metric.binding() != null ? metric.binding().length() : 0);
metricWidth = Math.max(metricWidth, metric.metric().length());
valueWidth = Math.max(valueWidth, metricValues.get(metric).length());
}
Expand All @@ -84,7 +84,10 @@ private void printRecords(
out.format(format, NAMESPACE_HEADER, BINDING_HEADER, METRIC_HEADER, VALUE_HEADER);
for (MetricRecord metric : records)
{
out.format(format, metric.namespace(), metric.binding(), metric.metric(), metricValues.get(metric));
String namespace = metric.namespace() != null ? metric.namespace() : "";
String binding = metric.binding() != null ? metric.binding() : "";

out.format(format, namespace, binding, metric.metric(), metricValues.get(metric));
}
out.println();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ ConverterHandler supplyReadConverter(
ConverterHandler supplyWriteConverter(
ModelConfig config);

LongConsumer supplyUtilizationMetric();

Path resolvePath(
String location);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2021-2024 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.engine.internal.metrics;

import java.net.URL;
import java.util.Collection;
import java.util.Map;
import java.util.function.Supplier;

import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.metrics.Metric;
import io.aklivity.zilla.runtime.engine.metrics.MetricGroup;

public class EngineMetricGroup implements MetricGroup
{
public static final String NAME = "engine";

private final Map<String, Supplier<Metric>> engineMetrics = Map.of(
"engine.worker.utilization", EngineWorkerUtilizationMetric::new,
"engine.worker.count", EngineWorkerCountMetric::new
);

public EngineMetricGroup(
Configuration config)
{
}

@Override
public String name()
{
return NAME;
}

@Override
public URL type()
{
return getClass().getResource("schema/engine.schema.patch.json");
}

@Override
public Metric supply(
String name)
{
return engineMetrics.getOrDefault(name, () -> null).get();
}

@Override
public Collection<String> metricNames()
{
return engineMetrics.keySet();
}
}
Loading
Loading