Skip to content

Commit 1f17a58

Browse files
committed
Rework to support class library programming model
1 parent 645771e commit 1f17a58

18 files changed

+445
-334
lines changed
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
#if NET8_0_OR_GREATER
4+
using System;
5+
using System.IO;
6+
using System.Runtime.Versioning;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
10+
namespace Amazon.Lambda.Core
11+
{
12+
/// <summary>
13+
/// Factory to create Lambda response streams for writing streaming responses in AWS Lambda functions. The created streams are write-only and non-seekable.
14+
/// </summary>
15+
[RequiresPreviewFeatures(LambdaResponseStreamFactory.ParameterizedPreviewMessage)]
16+
public class LambdaResponseStreamFactory
17+
{
18+
internal const string ParameterizedPreviewMessage =
19+
"Response streaming is in preview till a new version of .NET Lambda runtime client that supports response streaming " +
20+
"has been deployed to the .NET Lambda managed runtime. Till deployment has been made the feature can be used by deploying as an " +
21+
"executable including the latest version of Amazon.Lambda.RuntimeSupport and setting the \"EnablePreviewFeatures\" in the Lambda " +
22+
"project file to \"true\"";
23+
24+
private static Func<byte[], ILambdaResponseStream> _streamFactory;
25+
26+
internal static void SetLambdaResponseStream(Func<byte[], ILambdaResponseStream> streamFactory)
27+
{
28+
_streamFactory = streamFactory ?? throw new ArgumentNullException(nameof(streamFactory));
29+
}
30+
31+
/// <summary>
32+
/// Creates a <see cref="Stream"/> that can be used to write streaming responses back to callers of the Lambda function. Once
33+
/// A Lambda function creates a response stream all output must be returned by writing to the stream; the Lambda function's handler
34+
/// return value will be ignored. The stream is write-only and non-seekable.
35+
/// </summary>
36+
/// <returns></returns>
37+
public static Stream CreateStream()
38+
{
39+
var runtimeResponseStream = _streamFactory(Array.Empty<byte>());
40+
return new LambdaResponseStream(runtimeResponseStream);
41+
}
42+
}
43+
44+
/// <summary>
45+
/// Interface for writing streaming responses in AWS Lambda functions.
46+
/// Obtained by calling <see cref="LambdaResponseStreamFactory.CreateStream"/> within a handler.
47+
/// </summary>
48+
internal interface ILambdaResponseStream : IDisposable
49+
{
50+
/// <summary>
51+
/// Asynchronously writes a portion of a byte array to the response stream.
52+
/// </summary>
53+
/// <param name="buffer">The byte array containing data to write.</param>
54+
/// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes.</param>
55+
/// <param name="count">The number of bytes to write.</param>
56+
/// <param name="cancellationToken">Optional cancellation token.</param>
57+
/// <returns>A task representing the asynchronous operation.</returns>
58+
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
59+
Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default);
60+
61+
62+
/// <summary>
63+
/// Gets the total number of bytes written to the stream so far.
64+
/// </summary>
65+
long BytesWritten { get; }
66+
67+
68+
/// <summary>
69+
/// Gets whether an error has been reported.
70+
/// </summary>
71+
bool HasError { get; }
72+
}
73+
74+
/// <summary>
75+
/// A write-only, non-seekable <see cref="Stream"/> subclass that streams response data
76+
/// to the Lambda Runtime API. Returned by <see cref="LambdaResponseStreamFactory.CreateStream"/>.
77+
/// Integrates with standard .NET stream consumers such as <see cref="System.IO.StreamWriter"/>.
78+
/// </summary>
79+
[RequiresPreviewFeatures(LambdaResponseStreamFactory.ParameterizedPreviewMessage)]
80+
public class LambdaResponseStream : Stream
81+
{
82+
private readonly ILambdaResponseStream _responseStream;
83+
84+
internal LambdaResponseStream(ILambdaResponseStream responseStream)
85+
{
86+
_responseStream = responseStream;
87+
}
88+
89+
/// <summary>
90+
/// The number of bytes written to the Lambda response stream so far.
91+
/// </summary>
92+
public long BytesWritten => _responseStream.BytesWritten;
93+
94+
/// <summary>
95+
/// Asynchronously writes a byte array to the response stream.
96+
/// </summary>
97+
/// <param name="buffer">The byte array to write.</param>
98+
/// <param name="cancellationToken">Optional cancellation token.</param>
99+
/// <returns>A task representing the asynchronous operation.</returns>
100+
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
101+
public async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default)
102+
{
103+
if (buffer == null)
104+
throw new ArgumentNullException(nameof(buffer));
105+
106+
await WriteAsync(buffer, 0, buffer.Length, cancellationToken);
107+
}
108+
109+
/// <summary>
110+
/// Asynchronously writes a portion of a byte array to the response stream.
111+
/// </summary>
112+
/// <param name="buffer">The byte array containing data to write.</param>
113+
/// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes.</param>
114+
/// <param name="count">The number of bytes to write.</param>
115+
/// <param name="cancellationToken">Optional cancellation token.</param>
116+
/// <returns>A task representing the asynchronous operation.</returns>
117+
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
118+
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
119+
{
120+
await _responseStream.WriteAsync(buffer, offset, count, cancellationToken);
121+
}
122+
123+
#region Noop Overrides
124+
125+
/// <summary>Gets a value indicating whether the stream supports reading. Always <c>false</c>.</summary>
126+
public override bool CanRead => false;
127+
128+
/// <summary>Gets a value indicating whether the stream supports seeking. Always <c>false</c>.</summary>
129+
public override bool CanSeek => false;
130+
131+
/// <summary>Gets a value indicating whether the stream supports writing. Always <c>true</c>.</summary>
132+
public override bool CanWrite => true;
133+
134+
/// <summary>
135+
/// Gets the total number of bytes written to the stream so far.
136+
/// Equivalent to <see cref="BytesWritten"/>.
137+
/// </summary>
138+
public override long Length => BytesWritten;
139+
140+
/// <summary>
141+
/// Getting or setting the position is not supported.
142+
/// </summary>
143+
/// <exception cref="NotSupportedException">Always thrown.</exception>
144+
public override long Position
145+
{
146+
get => throw new NotSupportedException("LambdaResponseStream does not support seeking.");
147+
set => throw new NotSupportedException("LambdaResponseStream does not support seeking.");
148+
}
149+
150+
/// <summary>Not supported.</summary>
151+
/// <exception cref="NotImplementedException">Always thrown.</exception>
152+
public override long Seek(long offset, SeekOrigin origin)
153+
=> throw new NotImplementedException("LambdaResponseStream does not support seeking.");
154+
155+
/// <summary>Not supported.</summary>
156+
/// <exception cref="NotImplementedException">Always thrown.</exception>
157+
public override int Read(byte[] buffer, int offset, int count)
158+
=> throw new NotImplementedException("LambdaResponseStream does not support reading.");
159+
160+
/// <summary>Not supported.</summary>
161+
/// <exception cref="NotImplementedException">Always thrown.</exception>
162+
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
163+
=> throw new NotImplementedException("LambdaResponseStream does not support reading.");
164+
165+
/// <summary>
166+
/// Writes a sequence of bytes to the stream. Delegates to the async path synchronously.
167+
/// Prefer <see cref="WriteAsync(byte[], int, int, CancellationToken)"/> to avoid blocking.
168+
/// </summary>
169+
public override void Write(byte[] buffer, int offset, int count)
170+
=> WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
171+
172+
/// <summary>
173+
/// Flush is a no-op; data is sent to the Runtime API immediately on each write.
174+
/// </summary>
175+
public override void Flush() { }
176+
177+
/// <summary>Not supported.</summary>
178+
/// <exception cref="NotSupportedException">Always thrown.</exception>
179+
public override void SetLength(long value)
180+
=> throw new NotSupportedException("LambdaResponseStream does not support SetLength.");
181+
#endregion
182+
}
183+
}
184+
#endif

Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/LambdaBootstrap.cs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
using System.Threading;
2121
using System.Threading.Tasks;
2222
using Amazon.Lambda.RuntimeSupport.Bootstrap;
23+
using Amazon.Lambda.RuntimeSupport.Client.ResponseStreaming;
2324
using Amazon.Lambda.RuntimeSupport.Helpers;
2425

2526
namespace Amazon.Lambda.RuntimeSupport
@@ -225,6 +226,19 @@ internal LambdaBootstrap(HttpClient httpClient, LambdaBootstrapHandler handler,
225226
return;
226227
}
227228
#if NET8_0_OR_GREATER
229+
230+
try
231+
{
232+
// Initalize in Amazon.Lambda.Core the factory for creating the response stream and related logic for supporting response streaming.
233+
ResponseStreamLambdaCoreInitializerIsolated.InitializeCore();
234+
}
235+
catch (TypeLoadException)
236+
{
237+
_logger.LogDebug("Failed to configure Amazon.Lambda.Core with factory to create response stream. This happens when the version of Amazon.Lambda.Core referenced by the Lambda function is out of date.");
238+
}
239+
240+
241+
228242
// Check if Initialization type is SnapStart, and invoke the snapshot restore logic.
229243
if (_configuration.IsInitTypeSnapstart)
230244
{
@@ -363,7 +377,7 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
363377
var runtimeApiClient = Client as RuntimeApiClient;
364378
if (runtimeApiClient != null)
365379
{
366-
LambdaResponseStreamFactory.InitializeInvocation(
380+
ResponseStreamFactory.InitializeInvocation(
367381
invocation.LambdaContext.AwsRequestId,
368382
isMultiConcurrency,
369383
runtimeApiClient,
@@ -385,7 +399,7 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
385399
{
386400
WriteUnhandledExceptionToLog(exception);
387401

388-
var responseStream = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
402+
var responseStream = ResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
389403
if (responseStream != null)
390404
{
391405
responseStream.ReportError(exception);
@@ -400,20 +414,20 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
400414
_logger.LogInformation("Finished invoking handler");
401415
}
402416

403-
var streamIfCreated = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
417+
var streamIfCreated = ResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
404418
if (streamIfCreated != null)
405419
{
406420
streamIfCreated.MarkCompleted();
407421

408422
// If streaming was started, await the HTTP send task to ensure it completes
409-
var sendTask = LambdaResponseStreamFactory.GetSendTask(isMultiConcurrency);
423+
var sendTask = ResponseStreamFactory.GetSendTask(isMultiConcurrency);
410424
if (sendTask != null)
411425
{
412426
// Wait for the streaming response to finish sending before allowing the next invocation to be processed. This ensures that responses are sent in the order the invocations were received.
413427
await sendTask;
414428
}
415429

416-
streamIfCreated.ManualDispose();
430+
streamIfCreated.Dispose();
417431
}
418432
else if (invokeSucceeded)
419433
{
@@ -454,7 +468,7 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
454468
{
455469
if (runtimeApiClient != null)
456470
{
457-
LambdaResponseStreamFactory.CleanupInvocation(isMultiConcurrency);
471+
ResponseStreamFactory.CleanupInvocation(isMultiConcurrency);
458472
}
459473
invocation.Dispose();
460474
}

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStream.ILambdaResponseStream.cs renamed to Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/ResponseStreaming/ResponseStream.cs

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@
2020
using System.Threading.Tasks;
2121
using Amazon.Lambda.RuntimeSupport.Helpers;
2222

23-
namespace Amazon.Lambda.RuntimeSupport
23+
namespace Amazon.Lambda.RuntimeSupport.Client.ResponseStreaming
2424
{
2525
/// <summary>
26-
/// A write-only, non-seekable <see cref="Stream"/> subclass that streams response data
27-
/// to the Lambda Runtime API. Returned by <see cref="LambdaResponseStreamFactory.CreateStream"/>.
26+
/// Represents the writable stream used by Lambda handlers to write response data for streaming invocations.
2827
/// </summary>
29-
public partial class LambdaResponseStream : Stream, ILambdaResponseStream
28+
internal class ResponseStream
3029
{
3130
private static readonly byte[] CrlfBytes = Encoding.ASCII.GetBytes("\r\n");
3231

@@ -38,6 +37,7 @@ public partial class LambdaResponseStream : Stream, ILambdaResponseStream
3837

3938
// The live HTTP output stream, set by StreamingHttpContent when SerializeToStreamAsync is called.
4039
private Stream _httpOutputStream;
40+
private bool _disposedValue;
4141
private readonly SemaphoreSlim _httpStreamReady = new SemaphoreSlim(0, 1);
4242
private readonly SemaphoreSlim _completionSignal = new SemaphoreSlim(0, 1);
4343

@@ -56,12 +56,7 @@ public partial class LambdaResponseStream : Stream, ILambdaResponseStream
5656

5757
internal Exception ReportedError => _reportedError;
5858

59-
internal LambdaResponseStream()
60-
: this(Array.Empty<byte>())
61-
{
62-
}
63-
64-
internal LambdaResponseStream(byte[] prelude)
59+
internal ResponseStream(byte[] prelude)
6560
{
6661
_prelude = prelude;
6762
}
@@ -125,18 +120,10 @@ internal async Task WaitForCompletionAsync(CancellationToken cancellationToken =
125120
await _completionSignal.WaitAsync(cancellationToken);
126121
}
127122

128-
/// <summary>
129-
/// Asynchronously writes a byte array to the response stream.
130-
/// </summary>
131-
/// <param name="buffer">The byte array to write.</param>
132-
/// <param name="cancellationToken">Optional cancellation token.</param>
133-
/// <returns>A task representing the asynchronous operation.</returns>
134-
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
135-
public async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default)
123+
internal async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default)
136124
{
137125
if (buffer == null)
138126
throw new ArgumentNullException(nameof(buffer));
139-
140127
await WriteAsync(buffer, 0, buffer.Length, cancellationToken);
141128
}
142129

@@ -149,7 +136,7 @@ public async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken
149136
/// <param name="cancellationToken">Optional cancellation token.</param>
150137
/// <returns>A task representing the asynchronous operation.</returns>
151138
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
152-
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
139+
public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
153140
{
154141
if (buffer == null)
155142
throw new ArgumentNullException(nameof(buffer));
@@ -232,25 +219,42 @@ internal void MarkCompleted()
232219
}
233220
}
234221

235-
/// <summary>
236-
/// The resouces like the SemaphoreSlims are manually disposed by LambdaBootstrap after each invocation instead of relying on the
237-
/// Dipose pattern because we don't want the user's Lambda function to trigger Releasing and disposing the semaphores when
238-
/// invocation of the user's code ends.
239-
/// </summary>
240-
internal void ManualDispose()
241-
{
242-
try { _httpStreamReady.Release(); } catch (SemaphoreFullException) { /* Ignore if already released */ }
243-
_httpStreamReady.Dispose();
244-
try { _completionSignal.Release(); } catch (SemaphoreFullException) { /* Ignore if already released */ }
245-
_completionSignal.Dispose();
246-
}
247-
248222
private void ThrowIfCompletedOrError()
249223
{
250224
if (_isCompleted)
251225
throw new InvalidOperationException("Cannot write to a completed stream.");
252226
if (_hasError)
253227
throw new InvalidOperationException("Cannot write to a stream after an error has been reported.");
254228
}
229+
230+
/// <summary>
231+
/// Disposes the stream. After calling Dispose, no further writes or error reports should be made.
232+
/// </summary>
233+
/// <param name="disposing"></param>
234+
protected virtual void Dispose(bool disposing)
235+
{
236+
if (!_disposedValue)
237+
{
238+
if (disposing)
239+
{
240+
try { _httpStreamReady.Release(); } catch (SemaphoreFullException) { /* Ignore if already released */ }
241+
_httpStreamReady.Dispose();
242+
try { _completionSignal.Release(); } catch (SemaphoreFullException) { /* Ignore if already released */ }
243+
_completionSignal.Dispose();
244+
}
245+
246+
_disposedValue = true;
247+
}
248+
}
249+
250+
/// <summary>
251+
/// Dispose of the stream. After calling Dispose, no further writes or error reports should be made.
252+
/// </summary>
253+
public void Dispose()
254+
{
255+
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
256+
Dispose(disposing: true);
257+
GC.SuppressFinalize(this);
258+
}
255259
}
256260
}

0 commit comments

Comments
 (0)