Skip to content

Commit 24c065a

Browse files
committed
Add Diagnostics and more Stage coverage
1 parent 21512ff commit 24c065a

13 files changed

Lines changed: 2469 additions & 4 deletions

src/Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
<PackageVersion Include="Servus.Akka" Version="0.3.10" />
1313
</ItemGroup>
1414
<ItemGroup Label="Testing">
15-
<PackageVersion Include="xunit.v3.mtp-v2" Version="3.2.2" />
1615
<PackageVersion Include="Microsoft.Testing.Extensions.CodeCoverage" Version="18.6.2" />
16+
<PackageVersion Include="xunit.v3.mtp-v2" Version="3.2.2" />
1717
<PackageVersion Include="PublicApiGenerator" Version="11.5.4" />
1818
<PackageVersion Include="Verify.DiffPlex" Version="3.1.2" />
1919
<PackageVersion Include="Verify.XunitV3" Version="31.16.1" />
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
using Akka.Streams;
2+
using Akka.Streams.Dsl;
3+
using TurboHTTP.Internal;
4+
using TurboHTTP.Streams.Stages.Internal;
5+
using TurboHTTP.Tests.Shared;
6+
7+
namespace TurboHTTP.StreamTests.Streams.Internal;
8+
9+
/// <summary>
10+
/// Tests <see cref="NetworkBufferBatchStage"/> batching behavior, flush semantics,
11+
/// and control-item ordering.
12+
/// </summary>
13+
/// <remarks>
14+
/// Stage under test: <see cref="NetworkBufferBatchStage"/>.
15+
/// Key behaviors: buffer batching up to maxWeight, control-item flush, overflow handling.
16+
/// </remarks>
17+
public sealed class NetworkBufferBatchStageSpec : StreamTestBase
18+
{
19+
// Helpers
20+
21+
private static NetworkBuffer CreateBuffer(int size, byte fill = (byte)'X')
22+
{
23+
var buf = NetworkBuffer.Rent(size);
24+
buf.FullMemory.Span.Fill(fill);
25+
buf.Length = size;
26+
return buf;
27+
}
28+
29+
private sealed class ControlItem : IOutputItem
30+
{
31+
public string Name { get; }
32+
public RequestEndpoint Key { get; } = new() { Host = "test", Port = 80, Scheme = "http", Version = new Version(1, 1) };
33+
34+
public ControlItem(string name = "Control")
35+
{
36+
Name = name;
37+
}
38+
39+
public override string ToString() => Name;
40+
}
41+
42+
private async Task<(List<IOutputItem>, bool)> RunBatchAsync(
43+
IEnumerable<IOutputItem> items,
44+
long maxWeight)
45+
{
46+
var collected = new List<IOutputItem>();
47+
var didComplete = false;
48+
49+
var graph = GraphDsl.Create(
50+
Sink.ForEach<IOutputItem>(item => collected.Add(item)),
51+
(builder, sink) =>
52+
{
53+
var stage = builder.Add(new NetworkBufferBatchStage(maxWeight));
54+
var source = builder.Add(Source.From(items));
55+
56+
builder.From(source).To(stage.Inlet);
57+
builder.From(stage.Outlet).To(sink);
58+
59+
return ClosedShape.Instance;
60+
});
61+
62+
try
63+
{
64+
await RunnableGraph.FromGraph(graph).Run(Materializer);
65+
didComplete = true;
66+
}
67+
catch
68+
{
69+
// Exceptions are expected in some test cases
70+
}
71+
72+
return (collected, didComplete);
73+
}
74+
75+
// NBBS-001: Single buffer with immediate downstream demand pushes immediately
76+
77+
[Fact(Timeout = 5000)]
78+
public async Task NetworkBufferBatch_should_push_buffer_immediately_when_downstream_demands()
79+
{
80+
// Arrange
81+
var buf = CreateBuffer(10);
82+
var items = new List<IOutputItem> { buf };
83+
84+
// Act
85+
var (collected, success) = await RunBatchAsync(items, maxWeight: 100);
86+
87+
// Assert
88+
Assert.True(success);
89+
Assert.Single(collected);
90+
Assert.Same(buf, collected[0]);
91+
}
92+
93+
94+
// NBBS-003: Control item flushes accumulated buffer before emission
95+
96+
[Fact(Timeout = 5000)]
97+
public async Task NetworkBufferBatch_should_flush_buffer_before_control_item()
98+
{
99+
// Arrange
100+
var buf = CreateBuffer(20);
101+
var ctrl = new ControlItem("Flush");
102+
var items = new List<IOutputItem> { buf, ctrl };
103+
104+
// Act
105+
var (collected, success) = await RunBatchAsync(items, maxWeight: 100);
106+
107+
// Assert — buffer then control, never interleaved
108+
Assert.True(success);
109+
Assert.Equal(2, collected.Count);
110+
Assert.IsType<NetworkBuffer>(collected[0]);
111+
Assert.Same(ctrl, collected[1]);
112+
}
113+
114+
// NBBS-004: Overflow splits batches when adding would exceed maxWeight
115+
116+
[Fact(Timeout = 5000)]
117+
public async Task NetworkBufferBatch_should_emit_batch_when_next_buffer_overflows()
118+
{
119+
// Arrange — buf1(15) + buf2(15) would be 30, but maxWeight=25 so emit buf1 first
120+
var buf1 = CreateBuffer(15);
121+
var buf2 = CreateBuffer(15);
122+
var items = new List<IOutputItem> { buf1, buf2 };
123+
124+
// Act
125+
var (collected, success) = await RunBatchAsync(items, maxWeight: 25);
126+
127+
// Assert — buf1 emitted, buf2 emitted separately (or still batching)
128+
Assert.True(success);
129+
Assert.NotEmpty(collected);
130+
// First item should be a buffer (either buf1 merged/alone or buf2)
131+
Assert.IsType<NetworkBuffer>(collected[0]);
132+
}
133+
134+
// NBBS-005: Multiple control items preserve ordering
135+
136+
[Fact(Timeout = 5000)]
137+
public async Task NetworkBufferBatch_should_preserve_control_item_order()
138+
{
139+
// Arrange
140+
var ctrl1 = new ControlItem("C1");
141+
var ctrl2 = new ControlItem("C2");
142+
var ctrl3 = new ControlItem("C3");
143+
var items = new List<IOutputItem> { ctrl1, ctrl2, ctrl3 };
144+
145+
// Act
146+
var (collected, success) = await RunBatchAsync(items, maxWeight: 100);
147+
148+
// Assert — control items emit in order
149+
Assert.True(success);
150+
Assert.Equal(3, collected.Count);
151+
Assert.Same(ctrl1, collected[0]);
152+
Assert.Same(ctrl2, collected[1]);
153+
Assert.Same(ctrl3, collected[2]);
154+
}
155+
156+
// NBBS-006: Upstream completion flushes remaining buffer
157+
158+
[Fact(Timeout = 5000)]
159+
public async Task NetworkBufferBatch_should_emit_remaining_buffer_on_upstream_finish()
160+
{
161+
// Arrange — buffer without downstream demand yet
162+
var buf = CreateBuffer(50);
163+
var items = new List<IOutputItem> { buf };
164+
165+
// Act
166+
var (collected, success) = await RunBatchAsync(items, maxWeight: 100);
167+
168+
// Assert — buffer is emitted even though no pull came
169+
Assert.True(success);
170+
Assert.Single(collected);
171+
Assert.IsType<NetworkBuffer>(collected[0]);
172+
}
173+
174+
// NBBS-007: Empty stream completes immediately
175+
176+
[Fact(Timeout = 5000)]
177+
public async Task NetworkBufferBatch_should_complete_immediately_on_empty_stream()
178+
{
179+
// Arrange
180+
var items = new List<IOutputItem>();
181+
182+
// Act
183+
var (collected, success) = await RunBatchAsync(items, maxWeight: 100);
184+
185+
// Assert
186+
Assert.True(success);
187+
Assert.Empty(collected);
188+
}
189+
190+
// NBBS-008: Control + buffer + control preserves order and flushes correctly
191+
192+
[Fact(Timeout = 5000)]
193+
public async Task NetworkBufferBatch_should_handle_mixed_control_and_buffers()
194+
{
195+
// Arrange
196+
var ctrl1 = new ControlItem("Start");
197+
var buf = CreateBuffer(25);
198+
var ctrl2 = new ControlItem("End");
199+
var items = new List<IOutputItem> { ctrl1, buf, ctrl2 };
200+
201+
// Act
202+
var (collected, success) = await RunBatchAsync(items, maxWeight: 100);
203+
204+
// Assert — control1, buffer, control2 in order
205+
Assert.True(success);
206+
Assert.Equal(3, collected.Count);
207+
Assert.Same(ctrl1, collected[0]);
208+
Assert.IsType<NetworkBuffer>(collected[1]);
209+
Assert.Same(ctrl2, collected[2]);
210+
}
211+
212+
}

0 commit comments

Comments
 (0)