Skip to content

Commit bdcad25

Browse files
st0o0claude
andcommitted
feat(statemachine): Http10ServerStateMachine defers encode until body bytes delivered
Replace the old ResponseBodyHandle/timer-based body drain with the same two-phase deferred encode pattern used in Http10ClientStateMachine. OnResponse calls Encode (always returns 0), stores the deferredResponse, then OnBodyMessage dispatches OutboundBodyChunk/Complete/Failed to call EncodeDeferred and emit TransportData once the full body is available. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 72e1155 commit bdcad25

2 files changed

Lines changed: 126 additions & 65 deletions

File tree

src/TurboHTTP.Tests/Http10/Http10ServerStateMachineSpec.cs

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1+
using System.Net;
2+
using System.Text;
3+
using Akka.Actor;
14
using Servus.Akka.Transport;
5+
using TurboHTTP.Protocol;
26
using TurboHTTP.Protocol.Syntax.Http10;
37
using TurboHTTP.Protocol.Syntax.Http10.Server;
48
using TurboHTTP.Tests.Shared;
@@ -50,17 +54,61 @@ public void DecodeClientData_should_mark_should_complete()
5054

5155
[Fact(Timeout = 5000)]
5256
[Trait("RFC", "RFC1945")]
53-
public void OnResponse_should_emit_encoded_response()
57+
public void OnResponse_should_not_emit_transport_data_before_body_delivered()
5458
{
5559
var ops = MakeOps();
5660
var sm = new Http10ServerStateMachine(ops);
5761

58-
var response = new HttpResponseMessage(System.Net.HttpStatusCode.OK);
59-
response.Content = new StringContent("test body");
62+
var response = new HttpResponseMessage(HttpStatusCode.OK)
63+
{
64+
Content = new StringContent("test body")
65+
};
6066

6167
sm.OnResponse(response);
6268

63-
Assert.Single(ops.Outbound.OfType<TransportData>());
69+
Assert.DoesNotContain(ops.Outbound, o => o is TransportData);
70+
}
71+
72+
[Fact(Timeout = 5000)]
73+
[Trait("RFC", "RFC1945")]
74+
public async Task OnResponse_with_body_should_emit_transport_data_after_body_chunk()
75+
{
76+
var system = ActorSystem.Create("http10-server-sm-test");
77+
try
78+
{
79+
var inbox = Inbox.Create(system);
80+
var ops = new FakeServerOps { StageActor = inbox.Receiver };
81+
var sm = new Http10ServerStateMachine(ops);
82+
sm.PreStart();
83+
84+
var response = new HttpResponseMessage(HttpStatusCode.OK)
85+
{
86+
Content = new ByteArrayContent(Encoding.UTF8.GetBytes("hello"))
87+
};
88+
sm.OnResponse(response);
89+
90+
// No TransportData yet (deferred)
91+
Assert.DoesNotContain(ops.Outbound, o => o is TransportData);
92+
93+
// Receive the OutboundBodyChunk and feed it back
94+
var msg = await Task.Run(() => inbox.Receive(TimeSpan.FromSeconds(3)));
95+
var chunk = Assert.IsType<OutboundBodyChunk>(msg);
96+
sm.OnBodyMessage(chunk);
97+
98+
var msg2 = await Task.Run(() => inbox.Receive(TimeSpan.FromSeconds(3)));
99+
sm.OnBodyMessage(msg2); // OutboundBodyComplete
100+
101+
// Now TransportData should exist
102+
Assert.Contains(ops.Outbound, o => o is TransportData);
103+
var td = ops.Outbound.OfType<TransportData>().First();
104+
var text = Encoding.ASCII.GetString(td.Buffer.Memory.Span[..td.Buffer.Length]);
105+
Assert.Contains("Content-Length: 5", text);
106+
Assert.Contains("hello", text);
107+
}
108+
finally
109+
{
110+
system.Dispose();
111+
}
64112
}
65113

66114
[Fact(Timeout = 5000)]
Lines changed: 74 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
1+
using System.Buffers;
12
using Servus.Akka.Transport;
23
using TurboHTTP.Protocol.Syntax.Http10.Options;
34
using TurboHTTP.Streams;
5+
using static Servus.Core.Servus;
46

57
namespace TurboHTTP.Protocol.Syntax.Http10.Server;
68

79
internal sealed class Http10ServerStateMachine : IServerStateMachine
810
{
9-
private const string DrainBodyTimerKey = "drain-body";
10-
1111
private readonly IServerStageOperations _ops;
1212
private readonly Http10ServerDecoder _decoder;
1313
private readonly Http10ServerEncoder _encoder;
1414
private readonly long _maxRequestBodySize;
1515

1616
private RequestBodyHandle? _activeBody;
17-
private ResponseBodyHandle? _responseBody;
17+
private HttpResponseMessage? _deferredResponse;
18+
private IMemoryOwner<byte>? _deferredBodyOwner;
19+
private int _deferredBodyLength;
1820

1921
public bool CanAcceptResponse => true;
2022
public bool ShouldComplete { get; private set; }
@@ -71,17 +73,30 @@ public void OnResponse(HttpResponseMessage response)
7173
response.Headers.Connection.Clear();
7274
response.Headers.Connection.Add("close");
7375

74-
var responseBuffer = TransportBuffer.Rent(8192);
75-
var span = responseBuffer.FullMemory.Span;
76-
var written = _encoder.Encode(span, response, _ops.StageActor);
77-
responseBuffer.Length = written;
78-
_ops.OnOutbound(new TransportData(responseBuffer));
79-
80-
_responseBody = new ResponseBodyHandle(
81-
response.Content,
82-
isChunked: false,
83-
onDataReady: () => _ops.OnScheduleTimer(DrainBodyTimerKey, TimeSpan.Zero));
84-
_responseBody.StartDrain();
76+
// Http10ServerEncoder always returns 0 — it starts the buffered body encoder
77+
// and sends OutboundBodyChunk/OutboundBodyComplete to the stage actor.
78+
var tempBuffer = TransportBuffer.Rent(1);
79+
try
80+
{
81+
var written = _encoder.Encode(tempBuffer.FullMemory.Span, response, _ops.StageActor);
82+
if (written > 0)
83+
{
84+
// Synchronous path (not currently used, kept for safety)
85+
tempBuffer.Length = written;
86+
_ops.OnOutbound(new TransportData(tempBuffer));
87+
return;
88+
}
89+
}
90+
catch
91+
{
92+
tempBuffer.Dispose();
93+
throw;
94+
}
95+
96+
tempBuffer.Dispose();
97+
98+
// Deferred — waiting for OutboundBodyChunk + OutboundBodyComplete via OnBodyMessage
99+
_deferredResponse = response;
85100
}
86101

87102
public void OnDownstreamFinished()
@@ -90,63 +105,61 @@ public void OnDownstreamFinished()
90105

91106
public void OnTimerFired(string name)
92107
{
93-
if (name == DrainBodyTimerKey)
94-
{
95-
DrainResponseBody();
96-
}
97-
}
98-
99-
public void OnBodyMessage(object msg) { }
100-
101-
public void Cleanup()
102-
{
103-
_activeBody?.Abort(new OperationCanceledException("Connection closed"));
104-
_activeBody = null;
105-
106-
_responseBody?.Abort();
107-
_responseBody = null;
108108
}
109109

110-
private void DrainResponseBody()
110+
public void OnBodyMessage(object msg)
111111
{
112-
if (_responseBody is null)
112+
switch (msg)
113113
{
114-
return;
115-
}
116-
117-
while (_responseBody.TryRead(out var result))
118-
{
119-
var data = result.Buffer;
120-
if (data.Length > 0)
121-
{
122-
var outBuf = TransportBuffer.Rent((int)data.Length);
123-
var span = outBuf.FullMemory.Span;
124-
125-
if (data.IsSingleSegment)
114+
case OutboundBodyChunk chunk when _deferredResponse is not null:
115+
_deferredBodyOwner?.Dispose();
116+
_deferredBodyOwner = chunk.Owner;
117+
_deferredBodyLength = chunk.Length;
118+
break;
119+
120+
case OutboundBodyComplete when _deferredResponse is not null && _deferredBodyOwner is not null:
121+
TransportBuffer? item = null;
122+
try
126123
{
127-
data.FirstSpan.CopyTo(span);
124+
var body = _deferredBodyOwner.Memory.Span[.._deferredBodyLength];
125+
var bufferSize = 8192 + _deferredBodyLength;
126+
item = TransportBuffer.Rent(bufferSize);
127+
var written = _encoder.EncodeDeferred(item.FullMemory.Span, _deferredResponse, body);
128+
item.Length = written;
129+
_ops.OnOutbound(new TransportData(item));
128130
}
129-
else
131+
catch (Exception ex)
130132
{
131-
var offset = 0;
132-
foreach (var segment in data)
133-
{
134-
segment.Span.CopyTo(span[offset..]);
135-
offset += segment.Length;
136-
}
133+
item?.Dispose();
134+
Tracing.For("Protocol").Error(this, "Failed to encode HTTP/1.0 response body: {0}", ex.Message);
137135
}
136+
finally
137+
{
138+
_deferredBodyOwner.Dispose();
139+
_deferredBodyOwner = null;
140+
_deferredResponse = null;
141+
}
142+
break;
138143

139-
outBuf.Length = (int)data.Length;
140-
_ops.OnOutbound(new TransportData(outBuf));
141-
}
142-
143-
_responseBody.AdvanceReader(data.End);
144+
case OutboundBodyFailed failed:
145+
_deferredBodyOwner?.Dispose();
146+
_deferredBodyOwner = null;
147+
if (_deferredResponse is not null)
148+
{
149+
Tracing.For("Protocol").Error(this, "Failed to read HTTP/1.0 response body: {0}", failed.Reason.Message);
150+
_deferredResponse = null;
151+
}
152+
break;
144153
}
154+
}
145155

146-
if (_responseBody.IsComplete)
147-
{
148-
_responseBody.Dispose();
149-
_responseBody = null;
150-
}
156+
public void Cleanup()
157+
{
158+
_activeBody?.Abort(new OperationCanceledException("Connection closed"));
159+
_activeBody = null;
160+
161+
_deferredBodyOwner?.Dispose();
162+
_deferredBodyOwner = null;
163+
_deferredResponse = null;
151164
}
152165
}

0 commit comments

Comments
 (0)