Skip to content

Commit ce8d24f

Browse files
committed
Merge branch 'normj/response-streaming' into normj/aspnetcore-responsestreaming
2 parents a121a9a + 1a86609 commit ce8d24f

File tree

2 files changed

+120
-15
lines changed

2 files changed

+120
-15
lines changed

Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/ResponseStreaming/ResponseStream.cs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515

1616
using System;
17+
using System.Buffers;
1718
using System.IO;
1819
using System.Text;
1920
using System.Threading;
@@ -76,34 +77,37 @@ internal ResponseStream(byte[] prelude)
7677
internal async Task SetHttpOutputStreamAsync(Stream httpOutputStream, CancellationToken cancellationToken = default)
7778
{
7879
_httpOutputStream = httpOutputStream;
79-
_httpStreamReady.Release();
80-
81-
await WritePreludeAsync(cancellationToken);
82-
}
8380

84-
private async Task WritePreludeAsync(CancellationToken cancellationToken = default)
85-
{
81+
// Write the prelude BEFORE releasing _httpStreamReady. This prevents a race
82+
// where a handler WriteAsync that is already waiting on the semaphore could
83+
// sneak in and write body data before the prelude, causing intermittent
84+
// "Failed to parse prelude JSON" errors from API Gateway.
8685
if (_prelude?.Length > 0)
8786
{
8887
_logger.LogDebug($"Writing prelude of {_prelude.Length} bytes to HTTP stream.");
89-
await _httpStreamReady.WaitAsync(_httpStreamWaitTimeout, cancellationToken);
90-
try
88+
89+
lock (_lock)
9190
{
92-
lock (_lock)
93-
{
94-
ThrowIfCompletedOrError();
95-
}
91+
ThrowIfCompletedOrError();
92+
}
9693

97-
await _httpOutputStream.WriteAsync(_prelude, 0, _prelude.Length, cancellationToken);
98-
await _httpOutputStream.WriteAsync(PreludeDelimiter, 0, PreludeDelimiter.Length, cancellationToken);
94+
var combinedLength = _prelude.Length + PreludeDelimiter.Length;
95+
var combined = ArrayPool<byte>.Shared.Rent(combinedLength);
96+
try
97+
{
98+
Buffer.BlockCopy(_prelude, 0, combined, 0, _prelude.Length);
99+
Buffer.BlockCopy(PreludeDelimiter, 0, combined, _prelude.Length, PreludeDelimiter.Length);
99100

101+
await _httpOutputStream.WriteAsync(combined, 0, combinedLength, cancellationToken);
100102
await _httpOutputStream.FlushAsync(cancellationToken);
101103
}
102104
finally
103105
{
104-
_httpStreamReady.Release();
106+
ArrayPool<byte>.Shared.Return(combined);
105107
}
106108
}
109+
110+
_httpStreamReady.Release();
107111
}
108112

109113
/// <summary>

Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/ResponseStreamTests.cs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,107 @@ public async Task SetHttpOutputStreamAsync_NullPrelude_WritesNoPreludeBytes()
284284
Assert.Empty(output.ToArray());
285285
}
286286

287+
// ---- Prelude + delimiter single-chunk tests (via ChunkedStreamWriter) ----
288+
289+
[Fact]
290+
public async Task SetHttpOutputStreamAsync_WithPrelude_ViaChunkedWriter_ProducesSingleChunk()
291+
{
292+
var preludeJson = Encoding.UTF8.GetBytes("{\"statusCode\":200}");
293+
var rs = new ResponseStream(preludeJson);
294+
var rawOutput = new MemoryStream();
295+
var chunkedWriter = new ChunkedStreamWriter(rawOutput);
296+
297+
await rs.SetHttpOutputStreamAsync(chunkedWriter);
298+
299+
var wireBytes = Encoding.ASCII.GetString(rawOutput.ToArray());
300+
301+
// The prelude (18 bytes) + delimiter (8 bytes) = 26 bytes = 0x1A
302+
// Should be exactly one chunk: "1A\r\n{prelude}{8 null bytes}\r\n"
303+
var expectedDataLength = preludeJson.Length + 8; // 26
304+
var expectedHex = expectedDataLength.ToString("X");
305+
Assert.StartsWith($"{expectedHex}\r\n", wireBytes);
306+
307+
// Verify there is only one chunk header (only one hex size prefix)
308+
var chunkCount = 0;
309+
var remaining = wireBytes;
310+
while (remaining.Length > 0)
311+
{
312+
var crlfIndex = remaining.IndexOf("\r\n", StringComparison.Ordinal);
313+
if (crlfIndex < 0) break;
314+
var sizeStr = remaining.Substring(0, crlfIndex);
315+
if (int.TryParse(sizeStr, System.Globalization.NumberStyles.HexNumber, null, out var chunkSize) && chunkSize >= 0)
316+
{
317+
chunkCount++;
318+
// Skip past: hex\r\n{data}\r\n
319+
remaining = remaining.Substring(crlfIndex + 2 + chunkSize + 2);
320+
}
321+
else
322+
{
323+
break;
324+
}
325+
}
326+
Assert.Equal(1, chunkCount);
327+
}
328+
329+
[Fact]
330+
public async Task SetHttpOutputStreamAsync_WithPrelude_ViaChunkedWriter_DelimiterImmediatelyFollowsPrelude()
331+
{
332+
var preludeJson = Encoding.UTF8.GetBytes("{\"statusCode\":201}");
333+
var rs = new ResponseStream(preludeJson);
334+
var rawOutput = new MemoryStream();
335+
var chunkedWriter = new ChunkedStreamWriter(rawOutput);
336+
337+
await rs.SetHttpOutputStreamAsync(chunkedWriter);
338+
339+
// Parse the chunk to get the raw data payload
340+
var wireBytes = rawOutput.ToArray();
341+
var wireStr = Encoding.ASCII.GetString(wireBytes);
342+
var firstCrlf = wireStr.IndexOf("\r\n", StringComparison.Ordinal);
343+
var dataStart = firstCrlf + 2;
344+
var dataLength = preludeJson.Length + 8;
345+
var chunkData = new byte[dataLength];
346+
Array.Copy(wireBytes, dataStart, chunkData, 0, dataLength);
347+
348+
// First part should be the prelude JSON
349+
Assert.Equal(preludeJson, chunkData[..preludeJson.Length]);
350+
// Immediately followed by 8 null bytes (delimiter)
351+
Assert.Equal(new byte[8], chunkData[preludeJson.Length..]);
352+
}
353+
354+
[Fact]
355+
public async Task SetHttpOutputStreamAsync_WithPrelude_ViaChunkedWriter_HandlerDataInSeparateChunk()
356+
{
357+
var preludeJson = Encoding.UTF8.GetBytes("{\"statusCode\":200}");
358+
var rs = new ResponseStream(preludeJson);
359+
var rawOutput = new MemoryStream();
360+
var chunkedWriter = new ChunkedStreamWriter(rawOutput);
361+
362+
await rs.SetHttpOutputStreamAsync(chunkedWriter);
363+
await rs.WriteAsync(Encoding.UTF8.GetBytes("body data"), 0, 9);
364+
365+
var wireStr = Encoding.ASCII.GetString(rawOutput.ToArray());
366+
367+
// Should have exactly 2 chunks: one for prelude+delimiter, one for body
368+
var chunkCount = 0;
369+
var remaining = wireStr;
370+
while (remaining.Length > 0)
371+
{
372+
var crlfIndex = remaining.IndexOf("\r\n", StringComparison.Ordinal);
373+
if (crlfIndex < 0) break;
374+
var sizeStr = remaining.Substring(0, crlfIndex);
375+
if (int.TryParse(sizeStr, System.Globalization.NumberStyles.HexNumber, null, out var chunkSize) && chunkSize >= 0)
376+
{
377+
chunkCount++;
378+
remaining = remaining.Substring(crlfIndex + 2 + chunkSize + 2);
379+
}
380+
else
381+
{
382+
break;
383+
}
384+
}
385+
Assert.Equal(2, chunkCount);
386+
}
387+
287388
// ---- MarkCompleted idempotency ----
288389

289390
[Fact]

0 commit comments

Comments
 (0)