Skip to content

Commit 99b83c2

Browse files
committed
Rework encoding and add support for using API Gateway
1 parent 1f17a58 commit 99b83c2

File tree

11 files changed

+209
-447
lines changed

11 files changed

+209
-447
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
#if NET8_0_OR_GREATER
4+
using System.Collections.Generic;
5+
using System.Net;
6+
using System.Runtime.Versioning;
7+
using System.Text.Json;
8+
9+
namespace Amazon.Lambda.Core.ResponseStreaming
10+
{
11+
/// <summary>
12+
/// The HTTP response prelude to be sent as the first chunk of a streaming response when using <see cref="LambdaResponseStreamFactory.CreateHttpStream"/>.
13+
/// </summary>
14+
[RequiresPreviewFeatures(LambdaResponseStreamFactory.ParameterizedPreviewMessage)]
15+
public class HttpResponseStreamPrelude
16+
{
17+
/// <summary>
18+
/// The Http status code to include in the response prelude.
19+
/// </summary>
20+
public HttpStatusCode? StatusCode { get; set; }
21+
22+
/// <summary>
23+
/// The response headers to include in the response prelude. This collection supports setting single value for the same headers.
24+
/// </summary>
25+
public IDictionary<string, string> Headers { get; set; } = new Dictionary<string, string>();
26+
27+
/// <summary>
28+
/// The response headers to include in the response prelude. This collection supports setting multiple values for the same headers.
29+
/// </summary>
30+
public IDictionary<string, IList<string>> MultiValueHeaders { get; set; } = new Dictionary<string, IList<string>>();
31+
32+
/// <summary>
33+
/// The list of cookies to include in the response prelude. This is used for Lambda Function URL responses, which support a separate "cookies" field in the response JSON for setting cookies, rather than requiring cookies to be set via the "Set-Cookie" header.
34+
/// </summary>
35+
public IList<string> Cookies { get; set; } = new List<string>();
36+
37+
internal byte[] ToByteArray()
38+
{
39+
var bufferWriter = new System.Buffers.ArrayBufferWriter<byte>();
40+
using (var writer = new Utf8JsonWriter(bufferWriter))
41+
{
42+
writer.WriteStartObject();
43+
44+
if (StatusCode.HasValue)
45+
writer.WriteNumber("statusCode", (int)StatusCode);
46+
47+
if (Headers?.Count > 0)
48+
{
49+
writer.WriteStartObject("headers");
50+
foreach (var header in Headers)
51+
{
52+
writer.WriteString(header.Key, header.Value);
53+
}
54+
writer.WriteEndObject();
55+
}
56+
57+
if (MultiValueHeaders?.Count > 0)
58+
{
59+
writer.WriteStartObject("multiValueHeaders");
60+
foreach (var header in MultiValueHeaders)
61+
{
62+
writer.WriteStartArray(header.Key);
63+
foreach (var value in header.Value)
64+
{
65+
writer.WriteStringValue(value);
66+
}
67+
writer.WriteEndArray();
68+
}
69+
writer.WriteEndObject();
70+
}
71+
72+
if (Cookies?.Count > 0)
73+
{
74+
writer.WriteStartArray("cookies");
75+
foreach (var cookie in Cookies)
76+
{
77+
writer.WriteStringValue(cookie);
78+
}
79+
writer.WriteEndArray();
80+
}
81+
82+
writer.WriteEndObject();
83+
}
84+
85+
return bufferWriter.WrittenSpan.ToArray();
86+
}
87+
}
88+
}
89+
#endif
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
#if NET8_0_OR_GREATER
4+
using System;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
8+
namespace Amazon.Lambda.Core.ResponseStreaming
9+
{
10+
/// <summary>
11+
/// Interface for writing streaming responses in AWS Lambda functions.
12+
/// Obtained by calling <see cref="LambdaResponseStreamFactory.CreateStream"/> within a handler.
13+
/// </summary>
14+
internal interface ILambdaResponseStream : IDisposable
15+
{
16+
/// <summary>
17+
/// Asynchronously writes a portion of a byte array to the response stream.
18+
/// </summary>
19+
/// <param name="buffer">The byte array containing data to write.</param>
20+
/// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes.</param>
21+
/// <param name="count">The number of bytes to write.</param>
22+
/// <param name="cancellationToken">Optional cancellation token.</param>
23+
/// <returns>A task representing the asynchronous operation.</returns>
24+
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
25+
Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default);
26+
27+
28+
/// <summary>
29+
/// Gets the total number of bytes written to the stream so far.
30+
/// </summary>
31+
long BytesWritten { get; }
32+
33+
34+
/// <summary>
35+
/// Gets whether an error has been reported.
36+
/// </summary>
37+
bool HasError { get; }
38+
}
39+
}
40+
#endif

Libraries/src/Amazon.Lambda.Core/LambdaResponseStreamFactory.cs renamed to Libraries/src/Amazon.Lambda.Core/ResponseStreaming/LambdaResponseStream.cs

Lines changed: 2 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,15 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33
#if NET8_0_OR_GREATER
4+
45
using System;
56
using System.IO;
67
using System.Runtime.Versioning;
78
using System.Threading;
89
using System.Threading.Tasks;
910

10-
namespace Amazon.Lambda.Core
11+
namespace Amazon.Lambda.Core.ResponseStreaming
1112
{
12-
/// <summary>
13-
/// Factory to create Lambda response streams for writing streaming responses in AWS Lambda functions. The created streams are write-only and non-seekable.
14-
/// </summary>
15-
[RequiresPreviewFeatures(LambdaResponseStreamFactory.ParameterizedPreviewMessage)]
16-
public class LambdaResponseStreamFactory
17-
{
18-
internal const string ParameterizedPreviewMessage =
19-
"Response streaming is in preview till a new version of .NET Lambda runtime client that supports response streaming " +
20-
"has been deployed to the .NET Lambda managed runtime. Till deployment has been made the feature can be used by deploying as an " +
21-
"executable including the latest version of Amazon.Lambda.RuntimeSupport and setting the \"EnablePreviewFeatures\" in the Lambda " +
22-
"project file to \"true\"";
23-
24-
private static Func<byte[], ILambdaResponseStream> _streamFactory;
25-
26-
internal static void SetLambdaResponseStream(Func<byte[], ILambdaResponseStream> streamFactory)
27-
{
28-
_streamFactory = streamFactory ?? throw new ArgumentNullException(nameof(streamFactory));
29-
}
30-
31-
/// <summary>
32-
/// Creates a <see cref="Stream"/> that can be used to write streaming responses back to callers of the Lambda function. Once
33-
/// A Lambda function creates a response stream all output must be returned by writing to the stream; the Lambda function's handler
34-
/// return value will be ignored. The stream is write-only and non-seekable.
35-
/// </summary>
36-
/// <returns></returns>
37-
public static Stream CreateStream()
38-
{
39-
var runtimeResponseStream = _streamFactory(Array.Empty<byte>());
40-
return new LambdaResponseStream(runtimeResponseStream);
41-
}
42-
}
43-
44-
/// <summary>
45-
/// Interface for writing streaming responses in AWS Lambda functions.
46-
/// Obtained by calling <see cref="LambdaResponseStreamFactory.CreateStream"/> within a handler.
47-
/// </summary>
48-
internal interface ILambdaResponseStream : IDisposable
49-
{
50-
/// <summary>
51-
/// Asynchronously writes a portion of a byte array to the response stream.
52-
/// </summary>
53-
/// <param name="buffer">The byte array containing data to write.</param>
54-
/// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes.</param>
55-
/// <param name="count">The number of bytes to write.</param>
56-
/// <param name="cancellationToken">Optional cancellation token.</param>
57-
/// <returns>A task representing the asynchronous operation.</returns>
58-
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
59-
Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default);
60-
61-
62-
/// <summary>
63-
/// Gets the total number of bytes written to the stream so far.
64-
/// </summary>
65-
long BytesWritten { get; }
66-
67-
68-
/// <summary>
69-
/// Gets whether an error has been reported.
70-
/// </summary>
71-
bool HasError { get; }
72-
}
73-
7413
/// <summary>
7514
/// A write-only, non-seekable <see cref="Stream"/> subclass that streams response data
7615
/// to the Lambda Runtime API. Returned by <see cref="LambdaResponseStreamFactory.CreateStream"/>.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
#if NET8_0_OR_GREATER
4+
using System;
5+
using System.IO;
6+
using System.Runtime.Versioning;
7+
8+
namespace Amazon.Lambda.Core.ResponseStreaming
9+
{
10+
/// <summary>
11+
/// Factory to create Lambda response streams for writing streaming responses in AWS Lambda functions. The created streams are write-only and non-seekable.
12+
/// </summary>
13+
[RequiresPreviewFeatures(LambdaResponseStreamFactory.ParameterizedPreviewMessage)]
14+
public class LambdaResponseStreamFactory
15+
{
16+
internal const string ParameterizedPreviewMessage =
17+
"Response streaming is in preview till a new version of .NET Lambda runtime client that supports response streaming " +
18+
"has been deployed to the .NET Lambda managed runtime. Till deployment has been made the feature can be used by deploying as an " +
19+
"executable including the latest version of Amazon.Lambda.RuntimeSupport and setting the \"EnablePreviewFeatures\" in the Lambda " +
20+
"project file to \"true\"";
21+
22+
private static Func<byte[], ILambdaResponseStream> _streamFactory;
23+
24+
internal static void SetLambdaResponseStream(Func<byte[], ILambdaResponseStream> streamFactory)
25+
{
26+
_streamFactory = streamFactory ?? throw new ArgumentNullException(nameof(streamFactory));
27+
}
28+
29+
/// <summary>
30+
/// Creates a <see cref="Stream"/> that can be used to write streaming responses back to callers of the Lambda function. Once
31+
/// a Lambda function creates a response stream all output must be returned by writing to the stream; the Lambda function's handler
32+
/// return value will be ignored. The stream is write-only and non-seekable.
33+
/// </summary>
34+
/// <returns></returns>
35+
public static Stream CreateStream()
36+
{
37+
var runtimeResponseStream = _streamFactory(Array.Empty<byte>());
38+
return new LambdaResponseStream(runtimeResponseStream);
39+
}
40+
41+
/// <summary>
42+
/// Create a <see cref="Stream"/> for writing streaming responses, with an HTTP response prelude containing status code and headers. This should be used for
43+
/// Lambda functions using response streaming that are invoked via the Lambda Function URLs or API Gateway HTTP APIs, where the response format is expected to be an HTTP response.
44+
/// The prelude will be serialized and sent as the first chunk of the response stream, and should contain any necessary HTTP status code and headers.
45+
/// <para>
46+
/// Once a Lambda function creates a response stream all output must be returned by writing to the stream; the Lambda function's handler
47+
/// return value will be ignored. The stream is write-only and non-seekable.
48+
/// </para>
49+
/// </summary>
50+
/// <param name="prelude">The HTTP response prelude including status code and headers.</param>
51+
/// <returns></returns>
52+
public static Stream CreateHttpStream(HttpResponseStreamPrelude prelude)
53+
{
54+
var runtimeResponseStream = _streamFactory(prelude.ToByteArray());
55+
return new LambdaResponseStream(runtimeResponseStream);
56+
}
57+
}
58+
}
59+
#endif

Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/ResponseStreaming/ResponseStream.cs

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ namespace Amazon.Lambda.RuntimeSupport.Client.ResponseStreaming
2727
/// </summary>
2828
internal class ResponseStream
2929
{
30-
private static readonly byte[] CrlfBytes = Encoding.ASCII.GetBytes("\r\n");
31-
3230
private long _bytesWritten;
3331
private bool _isCompleted;
3432
private bool _hasError;
@@ -41,6 +39,8 @@ internal class ResponseStream
4139
private readonly SemaphoreSlim _httpStreamReady = new SemaphoreSlim(0, 1);
4240
private readonly SemaphoreSlim _completionSignal = new SemaphoreSlim(0, 1);
4341

42+
private static readonly byte[] PreludeDelimiter = new byte[8];
43+
4444
/// <summary>
4545
/// The number of bytes written to the Lambda response stream so far.
4646
/// </summary>
@@ -54,10 +54,14 @@ internal class ResponseStream
5454
private readonly byte[] _prelude;
5555

5656

57+
private readonly InternalLogger _logger;
58+
59+
5760
internal Exception ReportedError => _reportedError;
5861

5962
internal ResponseStream(byte[] prelude)
6063
{
64+
_logger = InternalLogger.GetDefaultLogger();
6165
_prelude = prelude;
6266
}
6367

@@ -69,14 +73,14 @@ internal async Task SetHttpOutputStreamAsync(Stream httpOutputStream, Cancellati
6973
_httpOutputStream = httpOutputStream;
7074
_httpStreamReady.Release();
7175

72-
InternalLogger.GetDefaultLogger().LogDebug($"Writing prelude of {_prelude.Length} bytes to HTTP stream.");
7376
await WritePreludeAsync(cancellationToken);
7477
}
7578

7679
private async Task WritePreludeAsync(CancellationToken cancellationToken = default)
7780
{
7881
if (_prelude?.Length > 0)
7982
{
83+
_logger.LogDebug($"Writing prelude of {_prelude.Length} bytes to HTTP stream.");
8084
await _httpStreamReady.WaitAsync(cancellationToken);
8185
try
8286
{
@@ -85,22 +89,8 @@ private async Task WritePreludeAsync(CancellationToken cancellationToken = defau
8589
ThrowIfCompletedOrError();
8690
}
8791

88-
// Write prelude JSON chunk
89-
var chunkSizeHex = _prelude.Length.ToString("X");
90-
var chunkSizeBytes = Encoding.ASCII.GetBytes(chunkSizeHex);
91-
await _httpOutputStream.WriteAsync(chunkSizeBytes, 0, chunkSizeBytes.Length, cancellationToken);
92-
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
9392
await _httpOutputStream.WriteAsync(_prelude, 0, _prelude.Length, cancellationToken);
94-
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
95-
96-
// Write 8 null bytes delimiter chunk
97-
var delimiterBytes = new byte[8];
98-
chunkSizeHex = "8";
99-
chunkSizeBytes = Encoding.ASCII.GetBytes(chunkSizeHex);
100-
await _httpOutputStream.WriteAsync(chunkSizeBytes, 0, chunkSizeBytes.Length, cancellationToken);
101-
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
102-
await _httpOutputStream.WriteAsync(delimiterBytes, 0, delimiterBytes.Length, cancellationToken);
103-
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
93+
await _httpOutputStream.WriteAsync(PreludeDelimiter, 0, PreludeDelimiter.Length, cancellationToken);
10494

10595
await _httpOutputStream.FlushAsync(cancellationToken);
10696
}
@@ -149,19 +139,15 @@ public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationT
149139
await _httpStreamReady.WaitAsync(cancellationToken);
150140
try
151141
{
142+
_logger.LogDebug($"Writing chuck of {count} bytes to HTTP stream.");
143+
152144
lock (_lock)
153145
{
154146
ThrowIfCompletedOrError();
155147
_bytesWritten += count;
156148
}
157149

158-
// Write chunk directly to the HTTP stream: size(hex) + CRLF + data + CRLF
159-
var chunkSizeHex = count.ToString("X");
160-
var chunkSizeBytes = Encoding.ASCII.GetBytes(chunkSizeHex);
161-
await _httpOutputStream.WriteAsync(chunkSizeBytes, 0, chunkSizeBytes.Length, cancellationToken);
162-
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
163150
await _httpOutputStream.WriteAsync(buffer, offset, count, cancellationToken);
164-
await _httpOutputStream.WriteAsync(CrlfBytes, 0, CrlfBytes.Length, cancellationToken);
165151
await _httpOutputStream.FlushAsync(cancellationToken);
166152
}
167153
finally

Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/ResponseStreaming/ResponseStreamLambdaCoreInitializerIsolated.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
using System;
66
using System.Threading;
77
using System.Threading.Tasks;
8-
using Amazon.Lambda.Core;
8+
using Amazon.Lambda.Core.ResponseStreaming;
99
using Amazon.Lambda.RuntimeSupport.Client.ResponseStreaming;
1010
#pragma warning disable CA2252
1111
namespace Amazon.Lambda.RuntimeSupport

0 commit comments

Comments
 (0)