Skip to content

Commit 25d9c28

Browse files
committed
Many fixes
1 parent 56b3b59 commit 25d9c28

54 files changed

Lines changed: 1603 additions & 1578 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/TurboHttp.Benchmarks/StreamingThroughputBenchmarks.cs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
using Akka.Actor;
22
using Akka.Configuration;
33
using BenchmarkDotNet.Attributes;
4-
using Google.Protobuf.WellKnownTypes;
54
using Microsoft.Extensions.DependencyInjection;
65
using Microsoft.Extensions.DependencyInjection.Extensions;
76
using Microsoft.Extensions.Options;
87
using TurboHttp.Benchmarks.Internal;
8+
using TurboHttp.Internal;
99

1010
namespace TurboHttp.Benchmarks;
1111

@@ -33,7 +33,8 @@ public class StreamingThroughputBenchmarks
3333
private static readonly Lock ServerLock = new();
3434
private static int _serverRefCount;
3535

36-
private static readonly Config BenchHocon = ConfigurationFactory.Empty;
36+
private static readonly Config BenchHocon =
37+
TurboHttpDispatchers.CreateConfig(TurboClientOptions.DefaultMaxEndpointSubstreams);
3738

3839
private ServiceProvider? _turboProvider;
3940
private ITurboHttpClient? _turboClient;
@@ -62,7 +63,7 @@ public class StreamingThroughputBenchmarks
6263
: _server!.Http11Port;
6364

6465
[GlobalSetup]
65-
public void GlobalSetup()
66+
public async Task GlobalSetup()
6667
{
6768
lock (ServerLock)
6869
{
@@ -81,8 +82,8 @@ public void GlobalSetup()
8182

8283
SetupHttpClient();
8384

84-
WarmupTurbo().GetAwaiter().GetResult();
85-
WarmupHttpClient().GetAwaiter().GetResult();
85+
await WarmupTurbo();
86+
await WarmupHttpClient();
8687
}
8788

8889
[GlobalCleanup]
@@ -143,14 +144,12 @@ public async Task TurboHttp_StreamRequests()
143144

144145
// Fire all requests as fast as the channel accepts them
145146

146-
var tasks = new Task[count];
147147
for (var i = 0; i < count; i++)
148148
{
149149
var request = new HttpRequestMessage(HttpMethod.Get, "/benchmark/simple");
150-
tasks[i] = writer.WriteAsync(request).AsTask();
150+
await writer.WriteAsync(request);
151151
}
152152

153-
await Task.WhenAll(tasks);
154153

155154
// Drain all responses — use manual loop instead of ReadAllAsync to avoid
156155
// IAsyncEnumerator disposal issues when breaking out early
@@ -260,4 +259,4 @@ private sealed class FixedOptionsFactory(TurboClientOptions options) : IOptionsF
260259
{
261260
public TurboClientOptions Create(string name) => options;
262261
}
263-
}
262+
}

src/TurboHttp.Benchmarks/TurboHttpComparativeBenchmarks.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Microsoft.Extensions.DependencyInjection.Extensions;
66
using Microsoft.Extensions.Options;
77
using TurboHttp.Benchmarks.Internal;
8+
using TurboHttp.Internal;
89

910
namespace TurboHttp.Benchmarks;
1011

@@ -16,21 +17,21 @@ namespace TurboHttp.Benchmarks;
1617
/// </summary>
1718
internal sealed class ClientHelper : IAsyncDisposable
1819
{
19-
private static readonly Config BenchHocon = ConfigurationFactory.Empty;
20+
private static readonly Config BenchHocon =
21+
TurboHttpDispatchers.CreateConfig(TurboClientOptions.DefaultMaxEndpointSubstreams);
2022

2123
private readonly ServiceProvider _provider;
22-
private readonly ITurboHttpClient _client;
2324
private readonly ActorSystem _system;
2425

2526
private ClientHelper(ServiceProvider provider, ITurboHttpClient client, ActorSystem system)
2627
{
2728
_provider = provider;
28-
_client = client;
29+
Client = client;
2930
_system = system;
3031
}
3132

3233
/// <summary>The configured <see cref="ITurboHttpClient"/> instance.</summary>
33-
public ITurboHttpClient Client => _client;
34+
public ITurboHttpClient Client { get; }
3435

3536
/// <summary>
3637
/// Creates a new <see cref="ClientHelper"/> with a fully configured TurboHttp client.
@@ -80,18 +81,18 @@ public static ClientHelper CreateClient(int port, Version version)
8081
public async ValueTask DisposeAsync()
8182
{
8283
// Signal pipeline to drain
83-
_client.Requests.TryComplete();
84+
Client.Requests.TryComplete();
8485

8586
try
8687
{
87-
await _client.Responses.Completion.WaitAsync(TimeSpan.FromSeconds(5));
88+
await Client.Responses.Completion.WaitAsync(TimeSpan.FromSeconds(5));
8889
}
8990
catch
9091
{
9192
// Pipeline may complete with an error during shutdown — that is fine.
9293
}
9394

94-
_client.Dispose();
95+
Client.Dispose();
9596

9697
// Terminate the ActorSystem to stop all PinnedDispatcher threads.
9798
// Without this, each BDN parameter combination leaks ~50–100 OS threads, causing

src/TurboHttp.IntegrationTests/LoggingBridgeSpec.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using Microsoft.Extensions.Logging;
88
using TurboHttp.Diagnostics;
99
using TurboHttp.IntegrationTests.Shared;
10+
using TurboHttp.Internal;
1011

1112
namespace TurboHttp.IntegrationTests;
1213

@@ -98,8 +99,10 @@ private ITurboHttpClient BuildClientViaUserDI(bool withTurboTrace = false)
9899
{
99100
var loggerFactory = sp.GetRequiredService<ILoggerFactory>();
100101
var diSetup = DependencyResolverSetup.Create(sp);
102+
var dispatcherConfig = TurboHttpDispatchers.CreateConfig(
103+
TurboClientOptions.DefaultMaxEndpointSubstreams);
101104
var setup = BootstrapSetup.Create()
102-
.WithConfig(LoggingHocon)
105+
.WithConfig(LoggingHocon.WithFallback(dispatcherConfig))
103106
.And(diSetup)
104107
.And(new LoggerFactorySetup(loggerFactory));
105108
return ActorSystem.Create("turbohttp-bridge-test", setup);

src/TurboHttp.IntegrationTests/Shared/ActorSystemFixture.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using Akka.Configuration;
44
using Akka.DependencyInjection;
55
using Microsoft.Extensions.DependencyInjection;
6+
using TurboHttp.Internal;
67

78
namespace TurboHttp.IntegrationTests.Shared;
89

@@ -19,7 +20,9 @@ public ValueTask InitializeAsync()
1920
{
2021
var services = new ServiceCollection();
2122
var diSetup = DependencyResolverSetup.Create(services.BuildServiceProvider());
22-
var bootstrap = BootstrapSetup.Create();
23+
var dispatcherConfig = TurboHttpDispatchers.CreateConfig(
24+
TurboClientOptions.DefaultMaxEndpointSubstreams);
25+
var bootstrap = BootstrapSetup.Create().WithConfig(dispatcherConfig);
2326

2427
var setup = bootstrap.And(diSetup);
2528
System = ActorSystem.Create($"turbohttp-shared-{Guid.NewGuid()}", setup);

src/TurboHttp.IntegrationTests/Shared/ClientHelper.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using Microsoft.Extensions.DependencyInjection.Extensions;
77
using Microsoft.Extensions.Logging;
88
using Microsoft.Extensions.Options;
9+
using TurboHttp.Internal;
910

1011
namespace TurboHttp.IntegrationTests.Shared;
1112

@@ -65,10 +66,14 @@ public static ClientHelper CreateClient(
6566
// Create an ActorSystem with DependencyResolver so that Servus.Akka
6667
// ResolveActor<T> works inside TurboClientStreamManager.
6768
var diSetup = DependencyResolverSetup.Create(services.BuildServiceProvider());
69+
var dispatcherConfig = TurboHttpDispatchers.CreateConfig(
70+
TurboClientOptions.DefaultMaxEndpointSubstreams);
6871
var bootstrap = BootstrapSetup.Create();
6972

7073
if (loggerFactory is not null)
71-
bootstrap = bootstrap.WithConfig(LoggingHocon);
74+
bootstrap = bootstrap.WithConfig(LoggingHocon.WithFallback(dispatcherConfig));
75+
else
76+
bootstrap = bootstrap.WithConfig(dispatcherConfig);
7277

7378
var setup = loggerFactory is not null
7479
? bootstrap.And(diSetup).And(new LoggerFactorySetup(loggerFactory))
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
using Akka.Actor;
2+
using Akka.Configuration;
3+
using Akka.Streams;
4+
using TurboHttp.Internal;
5+
using TurboHttp.Transport.Connection;
6+
7+
namespace TurboHttp.StreamTests.Dispatchers;
8+
9+
/// <summary>
10+
/// Verifies that TurboHttp's custom Akka.NET dispatchers are correctly configured,
11+
/// resolvable from ActorSystem, and applied to the right actors.
12+
/// </summary>
13+
public sealed class DispatcherConfigurationSpec : IAsyncDisposable
14+
{
15+
private readonly ActorSystem _system;
16+
17+
public DispatcherConfigurationSpec()
18+
{
19+
var config = TurboHttpDispatchers.CreateConfig(TurboClientOptions.DefaultMaxEndpointSubstreams);
20+
_system = ActorSystem.Create("dispatcher-spec-" + Guid.NewGuid(), config);
21+
}
22+
23+
public async ValueTask DisposeAsync()
24+
{
25+
await _system.Terminate();
26+
}
27+
28+
[Fact(Timeout = 5000)]
29+
public void CreateConfig_should_produce_ForkJoinDispatcher_for_io_dispatcher()
30+
{
31+
var config = TurboHttpDispatchers.CreateConfig(256);
32+
var type = config.GetString("akka.actor.turbohttp-io-dispatcher.type");
33+
34+
Assert.Equal("ForkJoinDispatcher", type);
35+
}
36+
37+
[Fact(Timeout = 5000)]
38+
public void CreateConfig_should_produce_ForkJoinDispatcher_for_stream_dispatcher()
39+
{
40+
var config = TurboHttpDispatchers.CreateConfig(256);
41+
var type = config.GetString("akka.actor.turbohttp-stream-dispatcher.type");
42+
43+
Assert.Equal("ForkJoinDispatcher", type);
44+
}
45+
46+
[Theory(Timeout = 5000)]
47+
[InlineData(32u)]
48+
[InlineData(64u)]
49+
[InlineData(128u)]
50+
[InlineData(256u)]
51+
[InlineData(512u)]
52+
[InlineData(1024u)]
53+
public void CreateConfig_should_clamp_stream_thread_count(uint maxEndpointSubstreams)
54+
{
55+
var config = TurboHttpDispatchers.CreateConfig(maxEndpointSubstreams);
56+
var threadCount = config.GetInt(
57+
"akka.actor.turbohttp-stream-dispatcher.dedicated-thread-pool.thread-count");
58+
59+
Assert.InRange(threadCount, Environment.ProcessorCount, 64);
60+
}
61+
62+
[Theory(Timeout = 5000)]
63+
[InlineData(32u)]
64+
[InlineData(256u)]
65+
[InlineData(1024u)]
66+
public void CreateConfig_should_clamp_io_thread_count(uint maxEndpointSubstreams)
67+
{
68+
var config = TurboHttpDispatchers.CreateConfig(maxEndpointSubstreams);
69+
var threadCount = config.GetInt(
70+
"akka.actor.turbohttp-io-dispatcher.dedicated-thread-pool.thread-count");
71+
72+
Assert.InRange(threadCount, 4, 16);
73+
}
74+
75+
[Fact(Timeout = 5000)]
76+
public void IoDispatcher_should_be_resolvable_from_ActorSystem()
77+
{
78+
Assert.True(
79+
_system.Dispatchers.HasDispatcher(TurboHttpDispatchers.IoDispatcher));
80+
}
81+
82+
[Fact(Timeout = 5000)]
83+
public void StreamDispatcher_should_be_resolvable_from_ActorSystem()
84+
{
85+
Assert.True(
86+
_system.Dispatchers.HasDispatcher(TurboHttpDispatchers.StreamDispatcher));
87+
}
88+
89+
[Fact(Timeout = 5000)]
90+
public void WithIoDispatcher_should_apply_when_available()
91+
{
92+
var props = Props.Create(() => new ConnectionManagerActor(TimeSpan.FromMinutes(5)));
93+
94+
Assert.Equal(TurboHttpDispatchers.IoDispatcher, props.WithIoDispatcher(_system).Dispatcher);
95+
}
96+
97+
[Fact(Timeout = 5000)]
98+
public void WithIoDispatcher_should_fall_back_to_default_when_missing()
99+
{
100+
using var bareSystem = ActorSystem.Create("bare-" + Guid.NewGuid());
101+
var props = Props.Create(() => new ConnectionManagerActor(TimeSpan.FromMinutes(5)));
102+
103+
// Should NOT have the custom dispatcher — falls back to default
104+
Assert.NotEqual(TurboHttpDispatchers.IoDispatcher, props.WithIoDispatcher(bareSystem).Dispatcher);
105+
}
106+
107+
[Fact(Timeout = 5000)]
108+
public void WithStreamDispatcher_should_apply_when_available()
109+
{
110+
var props = Props.Create(() => new ConnectionManagerActor(TimeSpan.FromMinutes(5)));
111+
112+
Assert.Equal(TurboHttpDispatchers.StreamDispatcher, props.WithStreamDispatcher(_system).Dispatcher);
113+
}
114+
115+
[Fact(Timeout = 5000)]
116+
public void WithStreamDispatcher_should_fall_back_to_default_when_missing()
117+
{
118+
using var bareSystem = ActorSystem.Create("bare-" + Guid.NewGuid());
119+
var settings = ActorMaterializerSettings.Create(bareSystem);
120+
121+
// Should NOT have the custom dispatcher — falls back to default
122+
Assert.NotEqual(TurboHttpDispatchers.StreamDispatcher, settings.WithStreamDispatcher(bareSystem).Dispatcher);
123+
}
124+
125+
[Fact(Timeout = 5000)]
126+
public void CreateConfig_should_use_background_threads()
127+
{
128+
var config = TurboHttpDispatchers.CreateConfig(256);
129+
130+
Assert.Equal("background",
131+
config.GetString("akka.actor.turbohttp-io-dispatcher.dedicated-thread-pool.threadtype"));
132+
Assert.Equal("background",
133+
config.GetString("akka.actor.turbohttp-stream-dispatcher.dedicated-thread-pool.threadtype"));
134+
}
135+
136+
[Fact(Timeout = 5000)]
137+
public void CreateConfig_should_set_expected_throughput_values()
138+
{
139+
var config = TurboHttpDispatchers.CreateConfig(256);
140+
141+
Assert.Equal(32, config.GetInt("akka.actor.turbohttp-io-dispatcher.throughput"));
142+
Assert.Equal(64, config.GetInt("akka.actor.turbohttp-stream-dispatcher.throughput"));
143+
}
144+
145+
[Fact(Timeout = 5000)]
146+
public void CreateConfig_should_scale_stream_threads_with_substreams()
147+
{
148+
// With high MaxEndpointSubstreams, thread count should be higher than with low
149+
var configLow = TurboHttpDispatchers.CreateConfig(32);
150+
var configHigh = TurboHttpDispatchers.CreateConfig(512);
151+
152+
var threadsLow = configLow.GetInt(
153+
"akka.actor.turbohttp-stream-dispatcher.dedicated-thread-pool.thread-count");
154+
var threadsHigh = configHigh.GetInt(
155+
"akka.actor.turbohttp-stream-dispatcher.dedicated-thread-pool.thread-count");
156+
157+
Assert.True(threadsHigh >= threadsLow,
158+
$"Expected high substreams ({threadsHigh}) >= low substreams ({threadsLow})");
159+
}
160+
}

src/TurboHttp.StreamTests/Http10/Http10DecompressionPipelineSpec.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public async Task Http10DecompressionPipeline_should_remove_content_encoding_hea
110110

111111
[Fact(Timeout = 10_000)]
112112
[Trait("RFC", "RFC1945-10.3")]
113-
public async Task Http10DecompressionPipeline_should_update_content_length_when_gzip_decompressed()
113+
public async Task Http10DecompressionPipeline_should_produce_correct_body_length_when_gzip_decompressed()
114114
{
115115
var request = new HttpRequestMessage(HttpMethod.Get, "http://example.com/gzip-len")
116116
{
@@ -126,7 +126,8 @@ public async Task Http10DecompressionPipeline_should_update_content_length_when_
126126
request,
127127
() => rawResponse);
128128

129-
Assert.Equal(original.Length, response.Content.Headers.ContentLength);
129+
var body = await response.Content.ReadAsByteArrayAsync(TestContext.Current.CancellationToken);
130+
Assert.Equal(original.Length, body.Length);
130131
}
131132

132133
// x-gzip decompression

src/TurboHttp.StreamTests/Http11/Chunking/Http11ChunkedTransferSpec.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,17 @@ public async Task Http11ChunkedTransfer_should_concatenate_three_equal_chunks_wh
7979
[Trait("RFC", "RFC9112-7.1")]
8080
public async Task Http11ChunkedTransfer_should_end_stream_when_zero_length_terminator()
8181
{
82-
var responses = await DecodeAllAsync(
83-
"HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n" +
84-
"5\r\nhello\r\n0\r\n\r\n");
82+
// Use Take(1) + Sink.Seq instead of bare Sink.Seq to avoid waiting
83+
// for upstream completion — the decoder stage keeps the stream open
84+
// for potential pipelined responses.
85+
var responses = await Source.From(new[]
86+
{
87+
Chunk("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n" +
88+
"5\r\nhello\r\n0\r\n\r\n")
89+
})
90+
.Via(Flow.FromGraph(new Http11DecoderStage()))
91+
.Take(1)
92+
.RunWith(Sink.Seq<HttpResponseMessage>(), Materializer);
8593

8694
Assert.Single(responses);
8795
var body = await responses[0].Content.ReadAsStringAsync(TestContext.Current.CancellationToken);

src/TurboHttp.StreamTests/Http11/Decoding/Http11DecoderSpec.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,11 @@ public async Task Http11Decoder_should_decode_pipelined_responses_when_two_respo
7070
Chunk("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\nHTTP/1.1 201 Created\r\nContent-Length: 0\r\n\r\n")
7171
]);
7272

73+
// Take(2) ensures the stream completes after collecting both responses
74+
// instead of waiting for upstream completion (which Sink.Seq requires).
7375
var responses = await source
7476
.Via(Flow.FromGraph(new Http11DecoderStage()))
77+
.Take(2)
7578
.RunWith(Sink.Seq<HttpResponseMessage>(), Materializer);
7679

7780
Assert.Equal(2, responses.Count);

0 commit comments

Comments
 (0)