Skip to content

Commit 645771e

Browse files
committed
Start working on supporting having a prelude chuck in the response stream
1 parent d8322ff commit 645771e

File tree

6 files changed

+80
-27
lines changed

6 files changed

+80
-27
lines changed

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStream.ILambdaResponseStream.cs

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
using System.Text;
1919
using System.Threading;
2020
using System.Threading.Tasks;
21+
using Amazon.Lambda.RuntimeSupport.Helpers;
2122

2223
namespace Amazon.Lambda.RuntimeSupport
2324
{
@@ -50,29 +51,78 @@ public partial class LambdaResponseStream : Stream, ILambdaResponseStream
5051
/// </summary>
5152
public bool HasError => _hasError;
5253

54+
private readonly byte[] _prelude;
55+
5356

5457
internal Exception ReportedError => _reportedError;
5558

5659
internal LambdaResponseStream()
60+
: this(Array.Empty<byte>())
5761
{
5862
}
5963

64+
internal LambdaResponseStream(byte[] prelude)
65+
{
66+
_prelude = prelude;
67+
}
68+
6069
/// <summary>
6170
/// Called by StreamingHttpContent.SerializeToStreamAsync to provide the HTTP output stream.
6271
/// </summary>
63-
internal void SetHttpOutputStream(Stream httpOutputStream)
72+
internal async Task SetHttpOutputStreamAsync(Stream httpOutputStream, CancellationToken cancellationToken = default)
6473
{
6574
_httpOutputStream = httpOutputStream;
6675
_httpStreamReady.Release();
76+
77+
InternalLogger.GetDefaultLogger().LogDebug($"Writing prelude of {_prelude.Length} bytes to HTTP stream.");
78+
await WritePreludeAsync(cancellationToken);
79+
}
80+
81+
private async Task WritePreludeAsync(CancellationToken cancellationToken = default)
82+
{
83+
if (_prelude?.Length > 0)
84+
{
85+
await _httpStreamReady.WaitAsync(cancellationToken);
86+
try
87+
{
88+
lock (_lock)
89+
{
90+
ThrowIfCompletedOrError();
91+
}
92+
93+
// Write prelude JSON chunk
94+
var chunkSizeHex = _prelude.Length.ToString("X");
95+
var chunkSizeBytes = Encoding.ASCII.GetBytes(chunkSizeHex);
96+
await _httpOutputStream.WriteAsync(chunkSizeBytes, 0, chunkSizeBytes.Length, cancellationToken);
97+
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
98+
await _httpOutputStream.WriteAsync(_prelude, 0, _prelude.Length, cancellationToken);
99+
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
100+
101+
// Write 8 null bytes delimiter chunk
102+
var delimiterBytes = new byte[8];
103+
chunkSizeHex = "8";
104+
chunkSizeBytes = Encoding.ASCII.GetBytes(chunkSizeHex);
105+
await _httpOutputStream.WriteAsync(chunkSizeBytes, 0, chunkSizeBytes.Length, cancellationToken);
106+
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
107+
await _httpOutputStream.WriteAsync(delimiterBytes, 0, delimiterBytes.Length, cancellationToken);
108+
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
109+
110+
await _httpOutputStream.FlushAsync(cancellationToken);
111+
}
112+
finally
113+
{
114+
_httpStreamReady.Release();
115+
}
116+
}
67117
}
68118

69119
/// <summary>
70120
/// Called by StreamingHttpContent.SerializeToStreamAsync to wait until the handler
71121
/// finishes writing (MarkCompleted or ReportErrorAsync).
72122
/// </summary>
73-
internal async Task WaitForCompletionAsync()
123+
internal async Task WaitForCompletionAsync(CancellationToken cancellationToken = default)
74124
{
75-
await _completionSignal.WaitAsync();
125+
await _completionSignal.WaitAsync(cancellationToken);
76126
}
77127

78128
/// <summary>

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/RuntimeApiClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ internal virtual async Task StartStreamingResponseAsync(
206206
request.Headers.Add("Trailer",
207207
$"{StreamingConstants.ErrorTypeTrailer}, {StreamingConstants.ErrorBodyTrailer}");
208208

209-
request.Content = new StreamingHttpContent(responseStream);
209+
request.Content = new StreamingHttpContent(responseStream, cancellationToken);
210210

211211
// SendAsync calls SerializeToStreamAsync, which blocks until the handler
212212
// finishes writing. This is why this method runs concurrently with the handler.

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/StreamingHttpContent.cs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
using System.Net;
1919
using System.Net.Http;
2020
using System.Text;
21+
using System.Threading;
2122
using System.Threading.Tasks;
2223
using Amazon.Lambda.RuntimeSupport.Helpers;
2324

@@ -32,24 +33,26 @@ internal class StreamingHttpContent : HttpContent
3233
private static readonly byte[] FinalChunkBytes = Encoding.ASCII.GetBytes("0\r\n");
3334

3435
private readonly LambdaResponseStream _responseStream;
36+
private readonly CancellationToken _cancellationToken;
3537

36-
public StreamingHttpContent(LambdaResponseStream responseStream)
38+
public StreamingHttpContent(LambdaResponseStream responseStream, CancellationToken cancellationToken = default)
3739
{
3840
_responseStream = responseStream ?? throw new ArgumentNullException(nameof(responseStream));
41+
_cancellationToken = cancellationToken;
3942
}
4043

4144
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
4245
{
4346
// Hand the HTTP output stream to ResponseStream so WriteAsync calls
4447
// can write chunks directly to it.
45-
_responseStream.SetHttpOutputStream(stream);
48+
await _responseStream.SetHttpOutputStreamAsync(stream, _cancellationToken);
4649

4750
InternalLogger.GetDefaultLogger().LogInformation("In SerializeToStreamAsync waiting for the undlying Lambda response stream in indicate it is complete.");
4851
// Wait for the handler to finish writing (MarkCompleted or ReportErrorAsync)
49-
await _responseStream.WaitForCompletionAsync();
52+
await _responseStream.WaitForCompletionAsync(_cancellationToken);
5053

5154
// Write final chunk
52-
await stream.WriteAsync(FinalChunkBytes, 0, FinalChunkBytes.Length);
55+
await stream.WriteAsync(FinalChunkBytes, 0, FinalChunkBytes.Length, _cancellationToken);
5356

5457
// Write error trailers if present
5558
if (_responseStream.HasError)
@@ -59,8 +62,8 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon
5962
}
6063

6164
// Write final CRLF to end the chunked message
62-
await stream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length);
63-
await stream.FlushAsync();
65+
await stream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, _cancellationToken);
66+
await stream.FlushAsync(_cancellationToken);
6467
}
6568

6669
protected override bool TryComputeLength(out long length)
@@ -75,12 +78,12 @@ private async Task WriteErrorTrailersAsync(Stream stream, Exception exception)
7578

7679
var errorTypeHeader = $"{StreamingConstants.ErrorTypeTrailer}: {exceptionInfo.ErrorType}\r\n";
7780
var errorTypeBytes = Encoding.UTF8.GetBytes(errorTypeHeader);
78-
await stream.WriteAsync(errorTypeBytes, 0, errorTypeBytes.Length);
81+
await stream.WriteAsync(errorTypeBytes, 0, errorTypeBytes.Length, _cancellationToken);
7982

8083
var errorBodyJson = LambdaJsonExceptionWriter.WriteJson(exceptionInfo);
8184
var errorBodyHeader = $"{StreamingConstants.ErrorBodyTrailer}: {errorBodyJson}\r\n";
8285
var errorBodyBytes = Encoding.UTF8.GetBytes(errorBodyHeader);
83-
await stream.WriteAsync(errorBodyBytes, 0, errorBodyBytes.Length);
86+
await stream.WriteAsync(errorBodyBytes, 0, errorBodyBytes.Length, _cancellationToken);
8487
}
8588
}
8689
}

Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/ResponseStreamTests.cs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ public class ResponseStreamTests
2929
/// Helper: creates a ResponseStream and wires up a MemoryStream as the HTTP output stream.
3030
/// Returns both so tests can inspect what was written.
3131
/// </summary>
32-
private static (LambdaResponseStream stream, MemoryStream httpOutput) CreateWiredStream()
32+
private static async Task<(LambdaResponseStream stream, MemoryStream httpOutput)> CreateWiredStreamAsync()
3333
{
3434
var rs = new LambdaResponseStream();
3535
var output = new MemoryStream();
36-
rs.SetHttpOutputStream(output);
36+
await rs.SetHttpOutputStreamAsync(output);
3737
return (rs, output);
3838
}
3939

@@ -62,7 +62,7 @@ public void Constructor_InitializesStateCorrectly()
6262
[InlineData(new byte[0], "0")] // 0 bytes → "0"
6363
public async Task WriteAsync_WritesChunkedEncodingFormat(byte[] data, string expectedHexSize)
6464
{
65-
var (stream, httpOutput) = CreateWiredStream();
65+
var (stream, httpOutput) = await CreateWiredStreamAsync();
6666

6767
await stream.WriteAsync(data);
6868

@@ -82,7 +82,7 @@ public async Task WriteAsync_WritesChunkedEncodingFormat(byte[] data, string exp
8282
[Fact]
8383
public async Task WriteAsync_WithOffset_WritesCorrectSliceAsChunk()
8484
{
85-
var (stream, httpOutput) = CreateWiredStream();
85+
var (stream, httpOutput) = await CreateWiredStreamAsync();
8686
var data = new byte[] { 0, 1, 2, 3, 0 };
8787

8888
await stream.WriteAsync(data, 1, 3);
@@ -107,7 +107,7 @@ public async Task WriteAsync_WithOffset_WritesCorrectSliceAsChunk()
107107
[Fact]
108108
public async Task WriteAsync_MultipleWrites_EachAppearsImmediately()
109109
{
110-
var (stream, httpOutput) = CreateWiredStream();
110+
var (stream, httpOutput) = await CreateWiredStreamAsync();
111111

112112
await stream.WriteAsync(new byte[] { 0xAA });
113113
var afterFirst = httpOutput.ToArray().Length;
@@ -128,7 +128,7 @@ public async Task WriteAsync_MultipleWrites_EachAppearsImmediately()
128128
[Fact]
129129
public async Task WriteAsync_LargerPayload_HexSizeIsCorrect()
130130
{
131-
var (stream, httpOutput) = CreateWiredStream();
131+
var (stream, httpOutput) = await CreateWiredStreamAsync();
132132
var data = new byte[256]; // 0x100
133133

134134
await stream.WriteAsync(data);
@@ -165,7 +165,7 @@ public async Task WriteAsync_BlocksUntilSetHttpOutputStream()
165165
Assert.False(writeCompleted.IsSet, "WriteAsync should block until SetHttpOutputStream is called");
166166

167167
// Now provide the HTTP stream — the write should complete
168-
rs.SetHttpOutputStream(httpOutput);
168+
await rs.SetHttpOutputStreamAsync(httpOutput);
169169
await writeTask;
170170

171171
Assert.True(writeCompleted.IsSet);
@@ -181,7 +181,7 @@ public async Task WriteAsync_BlocksUntilSetHttpOutputStream()
181181
[Fact]
182182
public async Task MarkCompleted_ReleasesCompletionSignal()
183183
{
184-
var (stream, _) = CreateWiredStream();
184+
var (stream, _) = await CreateWiredStreamAsync();
185185

186186
var waitTask = stream.WaitForCompletionAsync();
187187
Assert.False(waitTask.IsCompleted, "WaitForCompletionAsync should block before MarkCompleted");
@@ -202,7 +202,7 @@ public async Task MarkCompleted_ReleasesCompletionSignal()
202202
[Fact]
203203
public async Task ReportErrorAsync_ReleasesCompletionSignal()
204204
{
205-
var (stream, _) = CreateWiredStream();
205+
var (stream, _) = await CreateWiredStreamAsync();
206206

207207
var waitTask = stream.WaitForCompletionAsync();
208208
Assert.False(waitTask.IsCompleted, "WaitForCompletionAsync should block before ReportErrorAsync");
@@ -223,7 +223,7 @@ public async Task ReportErrorAsync_ReleasesCompletionSignal()
223223
[Fact]
224224
public async Task WriteAsync_AfterMarkCompleted_Throws()
225225
{
226-
var (stream, _) = CreateWiredStream();
226+
var (stream, _) = await CreateWiredStreamAsync();
227227
await stream.WriteAsync(new byte[] { 1 });
228228
stream.MarkCompleted();
229229

@@ -238,7 +238,7 @@ await Assert.ThrowsAsync<InvalidOperationException>(
238238
[Fact]
239239
public async Task WriteAsync_AfterReportError_Throws()
240240
{
241-
var (stream, _) = CreateWiredStream();
241+
var (stream, _) = await CreateWiredStreamAsync();
242242
await stream.WriteAsync(new byte[] { 1 });
243243
stream.ReportError(new Exception("test"));
244244

@@ -285,15 +285,15 @@ public async Task ReportErrorAsync_CalledTwice_Throws()
285285
[Fact]
286286
public async Task WriteAsync_NullBuffer_ThrowsArgumentNull()
287287
{
288-
var (stream, _) = CreateWiredStream();
288+
var (stream, _) = await CreateWiredStreamAsync();
289289

290290
await Assert.ThrowsAsync<ArgumentNullException>(() => stream.WriteAsync((byte[])null));
291291
}
292292

293293
[Fact]
294294
public async Task WriteAsync_NullBufferWithOffset_ThrowsArgumentNull()
295295
{
296-
var (stream, _) = CreateWiredStream();
296+
var (stream, _) = await CreateWiredStreamAsync();
297297

298298
await Assert.ThrowsAsync<ArgumentNullException>(() => stream.WriteAsync(null, 0, 0));
299299
}

Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/StreamingE2EWithMoq.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ internal override async Task StartStreamingResponseAsync(
514514
string awsRequestId, LambdaResponseStream responseStream, CancellationToken cancellationToken = default)
515515
{
516516
// Provide the HTTP output stream so writes don't block
517-
responseStream.SetHttpOutputStream(new MemoryStream());
517+
await responseStream.SetHttpOutputStreamAsync(new MemoryStream());
518518
await responseStream.WaitForCompletionAsync();
519519
}
520520
}

Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/TestHelpers/TestStreamingRuntimeApiClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ internal override async Task StartStreamingResponseAsync(
114114
LastStreamingResponseStream = responseStream;
115115

116116
// Simulate the HTTP stream being available
117-
responseStream.SetHttpOutputStream(new MemoryStream());
117+
await responseStream.SetHttpOutputStreamAsync(new MemoryStream(), cancellationToken);
118118

119119
// Wait for the handler to finish writing (mirrors real SerializeToStreamAsync behavior)
120120
await responseStream.WaitForCompletionAsync();

0 commit comments

Comments
 (0)