Skip to content

Commit 07e5cf5

Browse files
committed
little improvments
1 parent db064fc commit 07e5cf5

29 files changed

Lines changed: 821 additions & 496 deletions

src/Servus.Akka.Tests/IO/ClientByteMoverSpec.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ public async Task ClientByteMover_should_write_large_buffers_directly()
107107
var state = new ClientState(stream, inbound, outbound);
108108

109109
// Write a large buffer (> 16KB) followed by a small buffer
110-
var largeBuf = NetworkBuffer.Rent(17 * 1024);
110+
var largeBuf = NetworkBuffer.Rent(33 * 1024);
111111
largeBuf.Memory.Span.Fill(0xAA);
112-
largeBuf.Length = 17 * 1024;
112+
largeBuf.Length = 33 * 1024;
113113

114114
var smallBuf = NetworkBuffer.Rent(100);
115115
smallBuf.Memory.Span.Fill(0xBB);
@@ -259,17 +259,17 @@ public async Task ClientByteMover_should_handle_alternating_large_small_buffers(
259259
var stream = new CapturingStream(capturedWrites);
260260
var state = new ClientState(stream, inbound, outbound);
261261

262-
var largeBuf = NetworkBuffer.Rent(17 * 1024);
262+
var largeBuf = NetworkBuffer.Rent(33 * 1024);
263263
largeBuf.Memory.Span.Fill(0xAA);
264-
largeBuf.Length = 17 * 1024;
264+
largeBuf.Length = 33 * 1024;
265265

266266
var smallBuf = NetworkBuffer.Rent(100);
267267
smallBuf.Memory.Span.Fill(0xBB);
268268
smallBuf.Length = 100;
269269

270-
var largeBuf2 = NetworkBuffer.Rent(17 * 1024);
270+
var largeBuf2 = NetworkBuffer.Rent(33 * 1024);
271271
largeBuf2.Memory.Span.Fill(0xCC);
272-
largeBuf2.Length = 17 * 1024;
272+
largeBuf2.Length = 33 * 1024;
273273

274274
var smallBuf2 = NetworkBuffer.Rent(100);
275275
smallBuf2.Memory.Span.Fill(0xDD);
@@ -327,9 +327,9 @@ public async Task ClientByteMover_should_flush_coalesce_before_large_buffer()
327327
smallBuf.Memory.Span.Fill(0x11);
328328
smallBuf.Length = 100;
329329

330-
var largeBuf = NetworkBuffer.Rent(17 * 1024);
330+
var largeBuf = NetworkBuffer.Rent(33 * 1024);
331331
largeBuf.Memory.Span.Fill(0xAA);
332-
largeBuf.Length = 17 * 1024;
332+
largeBuf.Length = 33 * 1024;
333333

334334
outbound.Writer.TryWrite(smallBuf);
335335
outbound.Writer.TryWrite(largeBuf);
@@ -341,7 +341,7 @@ public async Task ClientByteMover_should_flush_coalesce_before_large_buffer()
341341

342342
Assert.True(capturedWrites.Count >= 2);
343343
Assert.Equal(100, capturedWrites[0].Length);
344-
Assert.Equal(17 * 1024, capturedWrites[1].Length);
344+
Assert.Equal(33 * 1024, capturedWrites[1].Length);
345345
}
346346

347347
[Fact(Timeout = 5000)]

src/Servus.Akka.Tests/IO/MessagesSpec.cs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using System.Net;
22
using Servus.Akka.IO;
33
using Servus.Akka.IO.Tcp;
4-
using Servus.Akka.Tests.Utils;
54

65
namespace Servus.Akka.Tests.IO;
76

@@ -26,17 +25,6 @@ public void NetworkBuffer_Rent_should_return_buffer_with_capacity()
2625
buf.Dispose();
2726
}
2827

29-
[Fact(Timeout = 5000)]
30-
public void NetworkBuffer_Rent_should_have_key()
31-
{
32-
var buf = NetworkBuffer.Rent(64);
33-
34-
Assert.Equal(string.Empty, buf.Key.Host);
35-
Assert.Equal(string.Empty, buf.Key.Scheme);
36-
37-
buf.Dispose();
38-
}
39-
4028
[Fact(Timeout = 5000)]
4129
public void NetworkBuffer_should_expose_memory_up_to_length()
4230
{
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
using System.Net;
2+
using Servus.Akka.IO.Tcp;
3+
4+
namespace Servus.Akka.Tests.IO.Tcp;
5+
6+
public sealed class DnsCacheSpec : IDisposable
7+
{
8+
public DnsCacheSpec()
9+
{
10+
DnsCache.Clear();
11+
}
12+
13+
public void Dispose()
14+
{
15+
DnsCache.Clear();
16+
DnsCache.Ttl = TimeSpan.FromSeconds(120);
17+
}
18+
19+
[Fact(Timeout = 5000)]
20+
public async Task ResolveAsync_should_return_literal_ip_without_dns_lookup()
21+
{
22+
var addresses = await DnsCache.ResolveAsync("127.0.0.1", CancellationToken.None);
23+
24+
Assert.Single(addresses);
25+
Assert.Equal(IPAddress.Loopback, addresses[0]);
26+
}
27+
28+
[Fact(Timeout = 5000)]
29+
public async Task ResolveAsync_should_return_ipv6_literal()
30+
{
31+
var addresses = await DnsCache.ResolveAsync("::1", CancellationToken.None);
32+
33+
Assert.Single(addresses);
34+
Assert.Equal(IPAddress.IPv6Loopback, addresses[0]);
35+
}
36+
37+
[Fact(Timeout = 10000)]
38+
public async Task ResolveAsync_should_resolve_localhost()
39+
{
40+
var addresses = await DnsCache.ResolveAsync("localhost", CancellationToken.None);
41+
42+
Assert.NotEmpty(addresses);
43+
}
44+
45+
[Fact(Timeout = 10000)]
46+
public async Task ResolveAsync_should_cache_results()
47+
{
48+
var first = await DnsCache.ResolveAsync("localhost", CancellationToken.None);
49+
var second = await DnsCache.ResolveAsync("localhost", CancellationToken.None);
50+
51+
Assert.Same(first, second);
52+
}
53+
54+
[Fact(Timeout = 10000)]
55+
public async Task ResolveAsync_should_expire_after_ttl()
56+
{
57+
DnsCache.Ttl = TimeSpan.FromMilliseconds(1);
58+
59+
var first = await DnsCache.ResolveAsync("localhost", CancellationToken.None);
60+
await Task.Delay(10);
61+
var second = await DnsCache.ResolveAsync("localhost", CancellationToken.None);
62+
63+
Assert.NotSame(first, second);
64+
}
65+
66+
[Fact(Timeout = 5000)]
67+
public async Task Clear_should_remove_all_entries()
68+
{
69+
await DnsCache.ResolveAsync("127.0.0.1", CancellationToken.None);
70+
DnsCache.Clear();
71+
72+
var addresses = await DnsCache.ResolveAsync("127.0.0.1", CancellationToken.None);
73+
Assert.NotNull(addresses);
74+
}
75+
}

src/Servus.Akka.Tests/IO/Tcp/TcpClientProviderSpec.cs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ await Assert.ThrowsAsync<NotSupportedException>(() =>
7474
provider.AcceptInboundStreamAsync(CancellationToken.None));
7575
}
7676

77-
[Fact(Timeout = 5000)]
77+
[Fact(Timeout = 10_000)]
7878
public async Task TcpClientProvider_should_resolve_proxy_when_configured()
7979
{
8080
var proxyUri = new Uri("http://proxy.local:8080");
@@ -90,15 +90,15 @@ public async Task TcpClientProvider_should_resolve_proxy_when_configured()
9090

9191
var provider = new TcpClientProvider(options);
9292

93-
// Verify GetStreamAsync uses proxy for connection (DNS lookup will fail, which is ok for this test)
94-
// This test verifies the proxy resolution path works without requiring actual network
93+
// proxy.local is a .local mDNS domain — resolution may be slow on Linux.
94+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
9595
try
9696
{
97-
await provider.GetStreamAsync(CancellationToken.None);
97+
await provider.GetStreamAsync(cts.Token);
9898
}
99-
catch (SocketException)
99+
catch (Exception ex) when (ex is SocketException or OperationCanceledException)
100100
{
101-
// Expected: DNS resolution fails for "proxy.local"
101+
// Expected: DNS resolution fails or times out for "proxy.local"
102102
}
103103

104104
await provider.DisposeAsync();
@@ -196,7 +196,7 @@ public async Task TcpClientProvider_should_apply_default_proxy_credentials()
196196
await provider.DisposeAsync();
197197
}
198198

199-
[Fact(Timeout = 5000)]
199+
[Fact(Timeout = 10_000)]
200200
public async Task TcpClientProvider_should_not_override_existing_proxy_credentials()
201201
{
202202
var existingCredentials = new NetworkCredential("existing", "existing");
@@ -214,13 +214,15 @@ public async Task TcpClientProvider_should_not_override_existing_proxy_credentia
214214

215215
var provider = new TcpClientProvider(options);
216216

217+
// proxy.local is a .local mDNS domain — resolution may be slow on Linux.
218+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
217219
try
218220
{
219-
await provider.GetStreamAsync(CancellationToken.None);
221+
await provider.GetStreamAsync(cts.Token);
220222
}
221-
catch (SocketException)
223+
catch (Exception ex) when (ex is SocketException or OperationCanceledException)
222224
{
223-
// Expected
225+
// Expected: DNS resolution fails or times out for "proxy.local"
224226
}
225227

226228
// Verify existing credentials were not replaced

src/Servus.Akka.Tests/IO/Tcp/TcpPumpManagerSpec.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,7 @@ public async Task PumpAsync_should_flush_and_grow_batch_when_full()
9494
var pump = new TcpPumpManager(probe.Ref);
9595
var (inbound, handle) = CreateTestHandle();
9696

97-
// Detect the actual ArrayPool bucket size for Rent(8) at runtime (may be 8, 16, etc.)
98-
var sampleBatch = ArrayPool<IInputItem>.Shared.Rent(8);
97+
var sampleBatch = ArrayPool<IInputItem>.Shared.Rent(32);
9998
var initialBatchSize = sampleBatch.Length;
10099
ArrayPool<IInputItem>.Shared.Return(sampleBatch);
101100

src/Servus.Akka/IO/ClientByteMover.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ public static class ClientByteMover
66
{
77
// Threshold below which consecutive small buffers are coalesced into a single write.
88
// Reduces syscall overhead for HTTP/2 frame headers (9 bytes) and small DATA frames.
9-
private const int CoalesceThreshold = 16 * 1024;
9+
private const int CoalesceThreshold = 32 * 1024;
1010

1111
// Cached delegates — created once at class init, reused for every connection.
1212
// Avoids a delegate heap allocation on each MoveStreamToChannel call.

src/Servus.Akka/IO/Messages.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class NetworkBuffer : IInputItem, IOutputItem
6767
{
6868
private static readonly ConcurrentStack<NetworkBuffer> WrapperPool = new();
6969

70-
protected static int MaxPoolSize { get; private set; } = Environment.ProcessorCount * 2;
70+
protected static int MaxPoolSize { get; private set; } = Environment.ProcessorCount * 4;
7171

7272
protected IMemoryOwner<byte>? Owner;
7373

src/Servus.Akka/IO/Tcp/DnsCache.cs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using System.Collections.Concurrent;
2+
using System.Net;
3+
4+
namespace Servus.Akka.IO.Tcp;
5+
6+
internal static class DnsCache
7+
{
8+
private static readonly ConcurrentDictionary<string, DnsEntry> Cache = new(StringComparer.OrdinalIgnoreCase);
9+
10+
private static TimeSpan _ttl = TimeSpan.FromSeconds(120);
11+
12+
public static TimeSpan Ttl
13+
{
14+
get => _ttl;
15+
set => _ttl = value;
16+
}
17+
18+
public static async Task<IPAddress[]> ResolveAsync(string host, CancellationToken ct)
19+
{
20+
if (IPAddress.TryParse(host, out var literal))
21+
{
22+
return [literal];
23+
}
24+
25+
if (Cache.TryGetValue(host, out var entry) && !entry.IsExpired(_ttl))
26+
{
27+
return entry.Addresses;
28+
}
29+
30+
var addresses = await Dns.GetHostAddressesAsync(host, ct).ConfigureAwait(false);
31+
32+
if (addresses.Length > 0)
33+
{
34+
Cache[host] = new DnsEntry(addresses, Environment.TickCount64);
35+
}
36+
37+
return addresses;
38+
}
39+
40+
internal static void Clear() => Cache.Clear();
41+
42+
private readonly record struct DnsEntry(IPAddress[] Addresses, long TimestampMs)
43+
{
44+
public bool IsExpired(TimeSpan ttl)
45+
=> Environment.TickCount64 - TimestampMs > (long)ttl.TotalMilliseconds;
46+
}
47+
}

src/Servus.Akka/IO/Tcp/TcpClientProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public async Task<Stream> GetStreamAsync(CancellationToken ct = default)
2929
try
3030
{
3131
var dnsStart = Stopwatch.GetTimestamp();
32-
addresses = await Dns.GetHostAddressesAsync(connectHost, ct).ConfigureAwait(false);
32+
addresses = await DnsCache.ResolveAsync(connectHost, ct).ConfigureAwait(false);
3333
var dnsDuration = Stopwatch.GetElapsedTime(dnsStart).TotalSeconds;
3434

3535
if (addresses.Length == 0)

src/Servus.Akka/IO/Tcp/TcpPumpManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ private static async Task PumpAsync(
6969
}
7070

7171
chunk.Key = key;
72-
batch ??= ArrayPool<IInputItem>.Shared.Rent(8);
72+
batch ??= ArrayPool<IInputItem>.Shared.Rent(32);
7373

7474
if (count == batch.Length)
7575
{

0 commit comments

Comments
 (0)