Skip to content

Commit 72e1155

Browse files
st0o0claude
andcommitted
feat(statemachine): Http10ClientStateMachine defers encode until body bytes delivered
Implements deferred encode pattern: OnRequest with body stores the request and sets _outboundBodyPending; OnBodyMessage accumulates the chunk then calls EncodeDeferred on OutboundBodyComplete to emit TransportData with Content-Length. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent c5f9c3e commit 72e1155

2 files changed

Lines changed: 135 additions & 5 deletions

File tree

src/TurboHTTP.Tests/Http10/Http10ClientStateMachineSpec.cs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
using System.Net;
2+
using System.Text;
3+
using Akka.Actor;
24
using Servus.Akka.Transport;
5+
using TurboHTTP.Protocol;
36
using TurboHTTP.Protocol.Syntax.Http10;
47
using TurboHTTP.Protocol.Syntax.Http10.Client;
58
using TurboHTTP.Tests.Shared;
@@ -158,4 +161,62 @@ public void Cleanup_should_clear_in_flight_request()
158161

159162
Assert.False(sm.HasInFlightRequest);
160163
}
164+
165+
[Fact(Timeout = 5000)]
166+
[Trait("RFC", "RFC1945-5")]
167+
public async Task OnRequest_with_body_should_emit_transport_data_after_body_chunk()
168+
{
169+
var system = ActorSystem.Create("sm-test");
170+
try
171+
{
172+
var inbox = Inbox.Create(system);
173+
var ops = new FakeOps { StageActor = inbox.Receiver };
174+
var sm = new Http10ClientStateMachine(ops, MakeConfig());
175+
sm.PreStart();
176+
177+
var request = new HttpRequestMessage(HttpMethod.Post, "http://example.com/")
178+
{
179+
Content = new ByteArrayContent(Encoding.UTF8.GetBytes("hello"))
180+
};
181+
sm.OnRequest(request);
182+
183+
// No TransportData yet (deferred)
184+
Assert.DoesNotContain(ops.Outbound, o => o is TransportData);
185+
186+
// Receive the OutboundBodyChunk and feed it back
187+
var msg = await Task.Run(() => inbox.Receive(TimeSpan.FromSeconds(3)));
188+
var chunk = Assert.IsType<OutboundBodyChunk>(msg);
189+
sm.OnBodyMessage(chunk);
190+
191+
var msg2 = await Task.Run(() => inbox.Receive(TimeSpan.FromSeconds(3)));
192+
sm.OnBodyMessage(msg2); // OutboundBodyComplete
193+
194+
// Now TransportData should exist
195+
Assert.Contains(ops.Outbound, o => o is TransportData);
196+
var td = ops.Outbound.OfType<TransportData>().First();
197+
var text = Encoding.ASCII.GetString(td.Buffer.Memory.Span[..td.Buffer.Length]);
198+
Assert.Contains("Content-Length: 5", text);
199+
Assert.Contains("hello", text);
200+
}
201+
finally
202+
{
203+
system.Dispose();
204+
}
205+
}
206+
207+
[Fact(Timeout = 5000)]
208+
[Trait("RFC", "RFC1945-5")]
209+
public void OnRequest_with_body_should_block_CanAcceptRequest_until_body_complete()
210+
{
211+
var ops = new FakeOps();
212+
var sm = new Http10ClientStateMachine(ops, MakeConfig());
213+
214+
var request = new HttpRequestMessage(HttpMethod.Post, "http://example.com/")
215+
{
216+
Content = new ByteArrayContent(Encoding.UTF8.GetBytes("hello"))
217+
};
218+
sm.OnRequest(request);
219+
220+
Assert.False(sm.CanAcceptRequest);
221+
}
161222
}

src/TurboHTTP/Protocol/Syntax/Http10/Client/Http10ClientStateMachine.cs

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Buffers;
12
using Servus.Akka.Transport;
23
using TurboHTTP.Internal;
34
using TurboHTTP.Protocol.Syntax.Http10.Options;
@@ -21,8 +22,12 @@ internal sealed class Http10ClientStateMachine : IHttpStateMachine
2122
private int _reconnectAttempts;
2223
private RequestBodyHandle? _responseBodyHandle;
2324
private bool _lastRequestWasHead;
25+
private bool _outboundBodyPending;
26+
private HttpRequestMessage? _deferredRequest;
27+
private IMemoryOwner<byte>? _deferredBodyOwner;
28+
private int _deferredBodyLength;
2429

25-
public bool CanAcceptRequest => _inFlightRequest is null && !IsReconnecting;
30+
public bool CanAcceptRequest => _inFlightRequest is null && !IsReconnecting && !_outboundBodyPending;
2631

2732
public bool HasInFlightRequest => _inFlightRequest is not null;
2833

@@ -138,14 +143,68 @@ public void OnTimerFired(string name)
138143
{
139144
}
140145

141-
public void OnBodyMessage(object msg) { }
146+
public void OnBodyMessage(object msg)
147+
{
148+
switch (msg)
149+
{
150+
case OutboundBodyChunk chunk when _deferredRequest is not null:
151+
_deferredBodyOwner?.Dispose();
152+
_deferredBodyOwner = chunk.Owner;
153+
_deferredBodyLength = chunk.Length;
154+
break;
155+
156+
case OutboundBodyComplete when _deferredRequest is not null && _deferredBodyOwner is not null:
157+
TransportBuffer? item = null;
158+
try
159+
{
160+
var body = _deferredBodyOwner.Memory.Span[.._deferredBodyLength];
161+
var bufferSize = Math.Min(_minBufferSize + _deferredBodyLength, _maxBufferSize);
162+
item = TransportBuffer.Rent(bufferSize);
163+
var written = _encoder.EncodeDeferred(item.FullMemory.Span, _deferredRequest, body);
164+
item.Length = written;
165+
_ops.OnOutbound(new TransportData(item));
166+
}
167+
catch (Exception ex)
168+
{
169+
item?.Dispose();
170+
_deferredRequest.Fail(new HttpRequestException("Failed to encode HTTP/1.0 request body.", ex));
171+
}
172+
finally
173+
{
174+
_deferredBodyOwner.Dispose();
175+
_deferredBodyOwner = null;
176+
_deferredRequest = null;
177+
_outboundBodyPending = false;
178+
}
179+
break;
180+
181+
case OutboundBodyComplete:
182+
_outboundBodyPending = false;
183+
break;
184+
185+
case OutboundBodyFailed failed:
186+
_deferredBodyOwner?.Dispose();
187+
_deferredBodyOwner = null;
188+
_outboundBodyPending = false;
189+
if (_deferredRequest is not null)
190+
{
191+
_deferredRequest.Fail(new HttpRequestException("Failed to read HTTP/1.0 request body.", failed.Reason));
192+
_deferredRequest = null;
193+
}
194+
break;
195+
}
196+
}
142197

143198
public void Cleanup()
144199
{
145200
_inFlightRequest = null;
146201
_responseBodyHandle?.Abort(new OperationCanceledException());
147202
_responseBodyHandle?.Dispose();
148203
_responseBodyHandle = null;
204+
_outboundBodyPending = false;
205+
_deferredBodyOwner?.Dispose();
206+
_deferredBodyOwner = null;
207+
_deferredRequest = null;
149208
_decoder.Reset();
150209
}
151210

@@ -173,9 +232,19 @@ private void EncodeRequest(HttpRequestMessage request)
173232
var span = item.FullMemory.Span;
174233

175234
var written = _encoder.Encode(span, request, _ops.StageActor);
176-
item.Length = written;
177-
178-
_ops.OnOutbound(new TransportData(item));
235+
if (written > 0)
236+
{
237+
item.Length = written;
238+
_ops.OnOutbound(new TransportData(item));
239+
}
240+
else
241+
{
242+
// Deferred — HTTP/1.0 with body; waiting for OutboundBodyChunk + OutboundBodyComplete
243+
item.Dispose();
244+
item = null;
245+
_deferredRequest = request;
246+
_outboundBodyPending = true;
247+
}
179248
}
180249
catch (Exception ex)
181250
{

0 commit comments

Comments
 (0)