Skip to content

Commit c5f9c3e

Browse files
st0o0claude
andcommitted
feat(statemachine): Http11ServerStateMachine handles async body drain via OnBodyMessage
Add _outboundBodyPending flag and OnBodyMessage implementation to both ServerStateMachine and Http11ServerStateMachine. CanAcceptResponse now returns false while an outbound body is in flight. Body bytes arrive via OutboundBodyChunk messages and are forwarded directly to transport. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 4b386f0 commit c5f9c3e

3 files changed

Lines changed: 167 additions & 4 deletions

File tree

src/TurboHTTP.Tests/Http11/ServerStateMachineSpec.cs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using Akka.Actor;
33
using Akka.Event;
44
using Servus.Akka.Transport;
5+
using TurboHTTP.Protocol;
56
using TurboHTTP.Protocol.Http11;
67
using TurboHTTP.Streams;
78

@@ -207,5 +208,110 @@ public void OnResponse_should_set_connection_close_header_when_flag_set()
207208
var responseText = Encoding.ASCII.GetString(transportData.Buffer.Span);
208209
Assert.Contains("Connection: close", responseText);
209210
}
211+
212+
[Fact(Timeout = 5000)]
213+
[Trait("RFC", "RFC9112-4")]
214+
public void OnResponse_should_not_include_body_in_transport_data()
215+
{
216+
var ops = new FakeServerOps();
217+
var sm = new ServerStateMachine(ops);
218+
219+
var requestData = Encoding.ASCII.GetBytes(
220+
"GET / HTTP/1.1\r\n" +
221+
"Host: localhost\r\n" +
222+
"Content-Length: 0\r\n" +
223+
"\r\n");
224+
225+
var buffer = TransportBuffer.Rent(requestData.Length);
226+
requestData.CopyTo(buffer.FullMemory.Span);
227+
buffer.Length = requestData.Length;
228+
229+
sm.DecodeClientData(new TransportData(buffer));
230+
231+
var response = new HttpResponseMessage(System.Net.HttpStatusCode.OK)
232+
{
233+
Content = new ByteArrayContent(Encoding.UTF8.GetBytes("hello world"))
234+
};
235+
236+
sm.OnResponse(response);
237+
238+
var outboundItems = ops.EmittedOutbound.OfType<TransportData>().ToList();
239+
Assert.NotEmpty(outboundItems);
240+
var responseText = Encoding.ASCII.GetString(outboundItems[0].Buffer.Span);
241+
Assert.Contains("HTTP/1.1 200", responseText);
242+
Assert.DoesNotContain("hello world", responseText);
243+
}
244+
245+
[Fact(Timeout = 5000)]
246+
[Trait("RFC", "RFC9112-4")]
247+
public void OnBodyMessage_should_emit_body_chunk_as_transport_data()
248+
{
249+
var ops = new FakeServerOps();
250+
var sm = new ServerStateMachine(ops);
251+
252+
var requestData = Encoding.ASCII.GetBytes(
253+
"GET / HTTP/1.1\r\n" +
254+
"Host: localhost\r\n" +
255+
"Content-Length: 0\r\n" +
256+
"\r\n");
257+
258+
var buffer = TransportBuffer.Rent(requestData.Length);
259+
requestData.CopyTo(buffer.FullMemory.Span);
260+
buffer.Length = requestData.Length;
261+
262+
sm.DecodeClientData(new TransportData(buffer));
263+
264+
var response = new HttpResponseMessage(System.Net.HttpStatusCode.OK)
265+
{
266+
Content = new ByteArrayContent(Encoding.UTF8.GetBytes("hello world"))
267+
};
268+
269+
sm.OnResponse(response);
270+
var countAfterHeaders = ops.EmittedOutbound.Count;
271+
272+
var bodyBytes = Encoding.UTF8.GetBytes("hello world");
273+
var owner = System.Buffers.MemoryPool<byte>.Shared.Rent(bodyBytes.Length);
274+
bodyBytes.CopyTo(owner.Memory.Span);
275+
sm.OnBodyMessage(new OutboundBodyChunk(owner, bodyBytes.Length));
276+
sm.OnBodyMessage(new OutboundBodyComplete());
277+
278+
var bodyItems = ops.EmittedOutbound.Skip(countAfterHeaders).OfType<TransportData>().ToList();
279+
Assert.NotEmpty(bodyItems);
280+
var bodyText = Encoding.UTF8.GetString(bodyItems[0].Buffer.Span);
281+
Assert.Contains("hello world", bodyText);
282+
}
283+
284+
[Fact(Timeout = 5000)]
285+
[Trait("RFC", "RFC9112-9.3")]
286+
public void CanAcceptResponse_should_be_false_when_outbound_body_pending()
287+
{
288+
var ops = new FakeServerOps();
289+
var sm = new ServerStateMachine(ops);
290+
291+
var requestData = Encoding.ASCII.GetBytes(
292+
"GET / HTTP/1.1\r\n" +
293+
"Host: localhost\r\n" +
294+
"Content-Length: 0\r\n" +
295+
"\r\n");
296+
297+
var buffer = TransportBuffer.Rent(requestData.Length);
298+
requestData.CopyTo(buffer.FullMemory.Span);
299+
buffer.Length = requestData.Length;
300+
301+
sm.DecodeClientData(new TransportData(buffer));
302+
303+
var response = new HttpResponseMessage(System.Net.HttpStatusCode.OK)
304+
{
305+
Content = new ByteArrayContent(Encoding.UTF8.GetBytes("hello world"))
306+
};
307+
308+
sm.OnResponse(response);
309+
310+
Assert.False(sm.CanAcceptResponse);
311+
312+
sm.OnBodyMessage(new OutboundBodyComplete());
313+
314+
Assert.False(sm.CanAcceptResponse);
315+
}
210316
}
211317

src/TurboHTTP/Protocol/Http11/ServerStateMachine.cs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using System.Globalization;
22
using System.Net;
3+
using Akka.Event;
34
using Servus.Akka.Transport;
45
using TurboHTTP.Internal;
6+
using TurboHTTP.Protocol;
57
using TurboHTTP.Streams;
68

79
namespace TurboHTTP.Protocol.Http11;
@@ -22,8 +24,9 @@ internal sealed class ServerStateMachine : IServerStateMachine
2224
private RequestBodyHandle? _activeBody;
2325
private ResponseBodyHandle? _responseBody;
2426
private int _pendingResponseCount;
27+
private bool _outboundBodyPending;
2528

26-
public bool CanAcceptResponse => _pendingResponseCount > 0;
29+
public bool CanAcceptResponse => _pendingResponseCount > 0 && !_outboundBodyPending;
2730
public bool ShouldComplete { get; private set; }
2831

2932
public ServerStateMachine(
@@ -182,6 +185,7 @@ public void OnResponse(HttpResponseMessage response)
182185
return;
183186
}
184187

188+
_outboundBodyPending = true;
185189
_responseBody = new ResponseBodyHandle(
186190
response.Content,
187191
isChunked: isChunked,
@@ -209,7 +213,28 @@ public void OnTimerFired(string name)
209213
}
210214
}
211215

212-
public void OnBodyMessage(object msg) { }
216+
public void OnBodyMessage(object msg)
217+
{
218+
switch (msg)
219+
{
220+
case OutboundBodyChunk chunk:
221+
var buf = TransportBuffer.Rent(chunk.Length);
222+
chunk.Owner.Memory.Span[..chunk.Length].CopyTo(buf.FullMemory.Span);
223+
buf.Length = chunk.Length;
224+
chunk.Owner.Dispose();
225+
_ops.OnOutbound(new TransportData(buf));
226+
break;
227+
228+
case OutboundBodyComplete:
229+
_outboundBodyPending = false;
230+
break;
231+
232+
case OutboundBodyFailed failed:
233+
_outboundBodyPending = false;
234+
_ops.Log.Warning("Failed to encode HTTP/1.1 response body: {0}", failed.Reason.Message);
235+
break;
236+
}
237+
}
213238

214239
public void Cleanup()
215240
{
@@ -223,6 +248,7 @@ public void Cleanup()
223248
_responseBody = null;
224249

225250
_pendingResponseCount = 0;
251+
_outboundBodyPending = false;
226252
ShouldComplete = false;
227253
_decoder.Dispose();
228254
}

src/TurboHTTP/Protocol/Syntax/Http11/Server/Http11ServerStateMachine.cs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
using Akka.Event;
12
using Servus.Akka.Transport;
3+
using TurboHTTP.Protocol;
24
using TurboHTTP.Protocol.Syntax.Http11.Options;
35
using TurboHTTP.Streams;
46

@@ -17,8 +19,9 @@ internal sealed class Http11ServerStateMachine : IServerStateMachine
1719
private RequestBodyHandle? _activeBody;
1820
private ResponseBodyHandle? _responseBody;
1921
private int _requestsPipelined;
22+
private bool _outboundBodyPending;
2023

21-
public bool CanAcceptResponse => true;
24+
public bool CanAcceptResponse => !_outboundBodyPending;
2225
public bool ShouldComplete { get; private set; }
2326

2427
public Http11ServerStateMachine(IServerStageOperations ops, long maxRequestBodySize = 10_485_760, int maxPipelineDepth = 10)
@@ -84,6 +87,11 @@ public void OnResponse(HttpResponseMessage response)
8487
responseBuffer.Length = written;
8588
_ops.OnOutbound(new TransportData(responseBuffer));
8689

90+
if (response.Content is not null)
91+
{
92+
_outboundBodyPending = true;
93+
}
94+
8795
_responseBody = new ResponseBodyHandle(
8896
response.Content,
8997
isChunked: isChunked,
@@ -103,7 +111,28 @@ public void OnTimerFired(string name)
103111
}
104112
}
105113

106-
public void OnBodyMessage(object msg) { }
114+
public void OnBodyMessage(object msg)
115+
{
116+
switch (msg)
117+
{
118+
case OutboundBodyChunk chunk:
119+
var buf = TransportBuffer.Rent(chunk.Length);
120+
chunk.Owner.Memory.Span[..chunk.Length].CopyTo(buf.FullMemory.Span);
121+
buf.Length = chunk.Length;
122+
chunk.Owner.Dispose();
123+
_ops.OnOutbound(new TransportData(buf));
124+
break;
125+
126+
case OutboundBodyComplete:
127+
_outboundBodyPending = false;
128+
break;
129+
130+
case OutboundBodyFailed failed:
131+
_outboundBodyPending = false;
132+
_ops.Log.Warning("Failed to encode HTTP/1.1 response body: {0}", failed.Reason.Message);
133+
break;
134+
}
135+
}
107136

108137
public void Cleanup()
109138
{
@@ -112,6 +141,8 @@ public void Cleanup()
112141

113142
_responseBody?.Abort();
114143
_responseBody = null;
144+
145+
_outboundBodyPending = false;
115146
}
116147

117148
private void DrainResponseBody()

0 commit comments

Comments
 (0)