Skip to content

Commit 3fb3bd6

Browse files
committed
Simplify tcp capacity exceeded client IT
1 parent 08f4beb commit 3fb3bd6

7 files changed

Lines changed: 78 additions & 80 deletions

File tree

runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpCapacityTracker.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,47 +19,45 @@
1919

2020
import java.util.function.LongConsumer;
2121

22-
import org.agrona.collections.MutableInteger;
23-
2422
import io.aklivity.zilla.runtime.engine.EngineContext;
2523

2624
public final class TcpCapacityTracker
2725
{
2826
private final int capacity;
29-
private final MutableInteger usage;
3027
private final LongConsumer recordUsage;
3128

29+
private int usage;
30+
3231
public TcpCapacityTracker(
3332
TcpConfiguration config,
3433
EngineContext context)
3534
{
3635
this.capacity = ENGINE_WORKER_CAPACITY.getAsInt(config);
37-
this.usage = new MutableInteger(0);
3836
this.recordUsage = context.supplyUtilizationMetric();
3937
}
4038

4139
public int available()
4240
{
43-
return capacity - usage.get();
41+
return capacity - usage;
4442
}
4543

46-
public void onConnected()
44+
public void claim()
4745
{
48-
int newUsage = usage.incrementAndGet();
49-
assert newUsage <= capacity;
46+
int newUsage = ++usage;
47+
assert newUsage <= capacity : "newUsage = %d, capacity = %d".formatted(newUsage, capacity);
5048

51-
onUsageChanged(newUsage);
49+
record(newUsage);
5250
}
5351

54-
public void onClosed()
52+
public void released()
5553
{
56-
int newUsage = usage.decrementAndGet();
57-
assert newUsage >= 0;
54+
int newUsage = --usage;
55+
assert newUsage >= 0 : "newUsage = %d".formatted(newUsage);
5856

59-
onUsageChanged(newUsage);
57+
record(newUsage);
6058
}
6159

62-
private void onUsageChanged(
60+
private void record(
6361
int newUsage)
6462
{
6563
final int newUsageAsPercentage = newUsage * 100 / capacity;

runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ else if (binding.routes == TcpBindingConfig.DEFAULT_CLIENT_ROUTES)
180180

181181
if (resolved != null)
182182
{
183-
capacity.onConnected();
183+
capacity.claim();
184184
}
185185

186186
return resolved;
@@ -194,7 +194,7 @@ public void detach(
194194

195195
public void close()
196196
{
197-
capacity.onClosed();
197+
capacity.released();
198198
}
199199

200200
@Override

runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public SocketChannel accept(
9595

9696
if (channel != null)
9797
{
98-
capacity.onConnected();
98+
capacity.claim();
9999
}
100100
}
101101

@@ -114,7 +114,7 @@ public void close(
114114
SocketChannel channel)
115115
{
116116
CloseHelper.quietClose(channel);
117-
capacity.onClosed();
117+
capacity.released();
118118

119119
if (unbound && capacity.available() > 0)
120120
{

runtime/binding-tcp/src/test/java/io/aklivity/zilla/runtime/binding/tcp/internal/streams/ClientIT.java

Lines changed: 32 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@
2121
import static org.junit.Assert.assertEquals;
2222
import static org.junit.rules.RuleChain.outerRule;
2323

24-
import java.io.IOException;
2524
import java.net.InetAddress;
2625
import java.net.InetSocketAddress;
2726
import java.net.UnknownHostException;
2827
import java.nio.ByteBuffer;
29-
import java.nio.channels.SelectionKey;
30-
import java.nio.channels.Selector;
3128
import java.nio.channels.ServerSocketChannel;
3229
import java.nio.channels.SocketChannel;
30+
import java.util.function.LongSupplier;
3331

3432
import org.junit.Rule;
3533
import org.junit.Test;
@@ -355,62 +353,53 @@ public void shouldWriteDataAfterReceiveEnd() throws Exception
355353
value = "2")
356354
public void shouldResetWhenConnectionsExceeded() throws Exception
357355
{
358-
try (ServerSocketChannel server = ServerSocketChannel.open();
359-
Selector selector = Selector.open())
356+
final LongSupplier utilization = engine.context().gauge(null, null, "engine.worker.utilization");
357+
358+
try (ServerSocketChannel server = ServerSocketChannel.open())
360359
{
361-
server.configureBlocking(false);
362360
server.setOption(SO_REUSEADDR, true);
363361
server.bind(new InetSocketAddress("127.0.0.1", 12345));
364-
server.register(selector, SelectionKey.OP_ACCEPT);
365362

366363
k3po.start();
367364

368-
AcceptHandler handler = channel ->
365+
ByteBuffer buf = ByteBuffer.allocate(0);
366+
367+
while (utilization.getAsLong() != 100L)
369368
{
370-
channel.configureBlocking(true);
371-
channel.close();
372-
};
369+
Thread.onSpinWait();
370+
}
371+
372+
SocketChannel client1 = server.accept();
373+
k3po.notifyBarrier("CONNECTION_ACCEPTED_1");
374+
client1.read(buf);
375+
client1.close();
373376

374-
int accepted = 0;
375-
while (accepted < 2)
377+
while (utilization.getAsLong() != 50L)
376378
{
377-
selector.select();
378-
for (SelectionKey key : selector.selectedKeys())
379-
{
380-
if (key.isAcceptable())
381-
{
382-
SocketChannel client = server.accept();
383-
handler.handle(client);
384-
accepted++;
385-
}
386-
}
387-
388-
if (accepted == 2)
389-
{
390-
k3po.notifyBarrier("CONNECTION_ACCEPTED_1");
391-
k3po.notifyBarrier("CONNECTION_ACCEPTED_2");
392-
}
379+
Thread.onSpinWait();
393380
}
394381

395-
accepted = 0;
396-
while (accepted < 2)
382+
SocketChannel client2 = server.accept();
383+
k3po.notifyBarrier("CONNECTION_ACCEPTED_2");
384+
client2.read(buf);
385+
client2.close();
386+
387+
while (utilization.getAsLong() != 100L)
397388
{
398-
selector.select();
399-
for (SelectionKey key : selector.selectedKeys())
400-
{
401-
if (key.isAcceptable())
402-
{
403-
SocketChannel client = server.accept();
404-
handler.handle(client);
405-
accepted++;
406-
}
407-
}
389+
Thread.onSpinWait();
408390
}
409391

410-
assert accepted == 2;
392+
SocketChannel client3 = server.accept();
393+
client3.read(buf);
394+
client3.close();
395+
396+
SocketChannel client4 = server.accept();
397+
client4.read(buf);
398+
client4.close();
411399

412-
selector.selectedKeys().clear();
413400
k3po.finish();
401+
402+
assert utilization.getAsLong() == 0L;
414403
}
415404
}
416405

@@ -419,10 +408,4 @@ public static InetAddress[] resolveHost(
419408
{
420409
throw new UnknownHostException();
421410
}
422-
423-
@FunctionalInterface
424-
interface AcceptHandler
425-
{
426-
void handle(SocketChannel channel) throws IOException;
427-
}
428411
}

runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package io.aklivity.zilla.runtime.engine;
1717

1818
import static io.aklivity.zilla.runtime.engine.internal.layouts.metrics.HistogramsLayout.BUCKETS;
19+
import static io.aklivity.zilla.runtime.engine.namespace.NamespacedId.NO_LOCAL_ID;
20+
import static io.aklivity.zilla.runtime.engine.namespace.NamespacedId.NO_NAMESPACE_ID;
1921
import static java.util.concurrent.Executors.newFixedThreadPool;
2022
import static java.util.stream.Collectors.toList;
2123
import static org.agrona.LangUtil.rethrowUnchecked;
@@ -568,12 +570,24 @@ public LongSupplier counter(
568570
String binding,
569571
String metric)
570572
{
571-
int namespaceId = supplyLabelId.applyAsInt(namespace);
572-
int bindingId = supplyLabelId.applyAsInt(binding);
573+
int namespaceId = namespace != null ? supplyLabelId.applyAsInt(namespace) : NO_NAMESPACE_ID;
574+
int bindingId = binding != null ? supplyLabelId.applyAsInt(binding) : NO_LOCAL_ID;
573575
int metricId = supplyLabelId.applyAsInt(metric);
574-
long namespacedBindingId = NamespacedId.id(namespaceId, bindingId);
575-
long namespacedMetricId = NamespacedId.id(namespaceId, metricId);
576-
return Engine.this.counter(namespacedBindingId, namespacedMetricId);
576+
long namespacedId = NamespacedId.id(namespaceId, bindingId);
577+
return Engine.this.counter(namespacedId, metricId);
578+
}
579+
580+
@Override
581+
public LongSupplier gauge(
582+
String namespace,
583+
String binding,
584+
String metric)
585+
{
586+
int namespaceId = namespace != null ? supplyLabelId.applyAsInt(namespace) : NO_NAMESPACE_ID;
587+
int bindingId = binding != null ? supplyLabelId.applyAsInt(binding) : NO_LOCAL_ID;
588+
int metricId = supplyLabelId.applyAsInt(metric);
589+
long namespacedId = NamespacedId.id(namespaceId, bindingId);
590+
return Engine.this.gauge(namespacedId, metricId);
577591
}
578592

579593
// required for testing
@@ -583,12 +597,11 @@ public LongConsumer counterWriter(
583597
String metric,
584598
int core)
585599
{
586-
int namespaceId = supplyLabelId.applyAsInt(namespace);
587-
int bindingId = supplyLabelId.applyAsInt(binding);
600+
int namespaceId = namespace != null ? supplyLabelId.applyAsInt(namespace) : NO_NAMESPACE_ID;
601+
int bindingId = binding != null ? supplyLabelId.applyAsInt(binding) : NO_LOCAL_ID;
588602
int metricId = supplyLabelId.applyAsInt(metric);
589-
long namespacedBindingId = NamespacedId.id(namespaceId, bindingId);
590-
long namespacedMetricId = NamespacedId.id(namespaceId, metricId);
591-
return Engine.this.counterWriter(namespacedBindingId, namespacedMetricId, core);
603+
long namespacedId = NamespacedId.id(namespaceId, bindingId);
604+
return Engine.this.counterWriter(namespacedId, metricId, core);
592605
}
593606
}
594607
}

runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/ext/EngineExtContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,9 @@ LongSupplier counter(
3030
String namespace,
3131
String binding,
3232
String metric);
33+
34+
LongSupplier gauge(
35+
String namespace,
36+
String binding,
37+
String metric);
3338
}

runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@
126126
import io.aklivity.zilla.runtime.engine.internal.layouts.metrics.CountersLayout;
127127
import io.aklivity.zilla.runtime.engine.internal.layouts.metrics.GaugesLayout;
128128
import io.aklivity.zilla.runtime.engine.internal.layouts.metrics.HistogramsLayout;
129-
import io.aklivity.zilla.runtime.engine.internal.layouts.metrics.ScalarsLayout;
130129
import io.aklivity.zilla.runtime.engine.internal.poller.Poller;
131130
import io.aklivity.zilla.runtime.engine.internal.stream.StreamId;
132131
import io.aklivity.zilla.runtime.engine.internal.stream.Target;
@@ -225,8 +224,8 @@ public class EngineWorker implements EngineContext, Agent
225224
private final Supplier<IdleStrategy> supplyIdleStrategy;
226225
private final Consumer<Throwable> reporter;
227226
private final ErrorHandler errorHandler;
228-
private final ScalarsLayout countersLayout;
229-
private final ScalarsLayout gaugesLayout;
227+
private final CountersLayout countersLayout;
228+
private final GaugesLayout gaugesLayout;
230229
private final HistogramsLayout histogramsLayout;
231230
private final EventsLayout eventsLayout;
232231
private final Int2ObjectHashMap<String> eventNames;

0 commit comments

Comments
 (0)