Skip to content

Commit 4d5dee2

Browse files
committed
Clean up and rework Semaphore locks
1 parent 21d82d8 commit 4d5dee2

File tree

9 files changed

+62
-168
lines changed

9 files changed

+62
-168
lines changed

Libraries/src/Amazon.Lambda.RuntimeSupport/Bootstrap/LambdaBootstrap.cs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -385,15 +385,13 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
385385
{
386386
WriteUnhandledExceptionToLog(exception);
387387

388-
var streamIfCreated = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
389-
if (streamIfCreated != null && streamIfCreated.BytesWritten > 0)
388+
var responseStream = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
389+
if (responseStream != null)
390390
{
391-
// Midstream error — report via trailers on the already-open HTTP connection
392-
await streamIfCreated.ReportErrorAsync(exception);
391+
responseStream.ReportError(exception);
393392
}
394393
else
395394
{
396-
// Error before streaming started — use standard error reporting
397395
await Client.ReportInvocationErrorAsync(invocation.LambdaContext.AwsRequestId, exception, cancellationToken);
398396
}
399397
}
@@ -402,17 +400,20 @@ internal async Task InvokeOnceAsync(CancellationToken cancellationToken = defaul
402400
_logger.LogInformation("Finished invoking handler");
403401
}
404402

405-
// If streaming was started, await the HTTP send task to ensure it completes
406-
var sendTask = LambdaResponseStreamFactory.GetSendTask(isMultiConcurrency);
407-
if (sendTask != null)
403+
var streamIfCreated = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
404+
if (streamIfCreated != null)
408405
{
409-
var stream = LambdaResponseStreamFactory.GetStreamIfCreated(isMultiConcurrency);
410-
if (stream != null && !stream.IsCompleted && !stream.HasError)
406+
streamIfCreated.MarkCompleted();
407+
408+
// If streaming was started, await the HTTP send task to ensure it completes
409+
var sendTask = LambdaResponseStreamFactory.GetSendTask(isMultiConcurrency);
410+
if (sendTask != null)
411411
{
412-
// Handler returned successfully — signal stream completion
413-
stream.MarkCompleted();
412+
// Wait for the streaming response to finish sending before allowing the next invocation to be processed. This ensures that responses are sent in the order the invocations were received.
413+
await sendTask;
414414
}
415-
await sendTask; // Wait for HTTP request to finish
415+
416+
streamIfCreated.ManualDispose();
416417
}
417418
else if (invokeSucceeded)
418419
{

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/ILambdaResponseStream.cs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,25 +45,12 @@ public interface ILambdaResponseStream : IDisposable
4545
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
4646
Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default);
4747

48-
/// <summary>
49-
/// Reports an error that occurred during streaming.
50-
/// This will send error information via HTTP trailing headers.
51-
/// </summary>
52-
/// <param name="exception">The exception to report.</param>
53-
/// <param name="cancellationToken">Optional cancellation token.</param>
54-
/// <returns>A task representing the asynchronous operation.</returns>
55-
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has already been reported.</exception>
56-
Task ReportErrorAsync(Exception exception, CancellationToken cancellationToken = default);
5748

5849
/// <summary>
5950
/// Gets the total number of bytes written to the stream so far.
6051
/// </summary>
6152
long BytesWritten { get; }
6253

63-
/// <summary>
64-
/// Gets whether the stream has been completed.
65-
/// </summary>
66-
bool IsCompleted { get; }
6754

6855
/// <summary>
6956
/// Gets whether an error has been reported.

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/LambdaResponseStream.ILambdaResponseStream.cs

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,6 @@ public partial class LambdaResponseStream : Stream, ILambdaResponseStream
4545
/// </summary>
4646
public long BytesWritten => _bytesWritten;
4747

48-
/// <summary>
49-
/// Gets a value indicating whether the operation has completed.
50-
/// </summary>
51-
public bool IsCompleted => _isCompleted;
52-
5348
/// <summary>
5449
/// Gets a value indicating whether an error has occurred.
5550
/// </summary>
@@ -144,10 +139,8 @@ public override async Task WriteAsync(byte[] buffer, int offset, int count, Canc
144139
/// This will send error information via HTTP trailing headers.
145140
/// </summary>
146141
/// <param name="exception">The exception to report.</param>
147-
/// <param name="cancellationToken">Optional cancellation token.</param>
148-
/// <returns>A task representing the asynchronous operation.</returns>
149142
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has already been reported.</exception>
150-
public Task ReportErrorAsync(Exception exception, CancellationToken cancellationToken = default)
143+
internal void ReportError(Exception exception)
151144
{
152145
if (exception == null)
153146
throw new ArgumentNullException(nameof(exception));
@@ -161,22 +154,45 @@ public Task ReportErrorAsync(Exception exception, CancellationToken cancellation
161154

162155
_hasError = true;
163156
_reportedError = exception;
164-
}
165157

158+
159+
_isCompleted = true;
160+
}
166161
// Signal completion so StreamingHttpContent can write error trailers and finish
167162
_completionSignal.Release();
168-
169-
return Task.CompletedTask;
170163
}
171164

172165
internal void MarkCompleted()
173166
{
167+
bool shouldReleaseLock = false;
174168
lock (_lock)
175169
{
170+
// Release lock if not already completed, otherwise do nothing (idempotent)
171+
if (!_isCompleted)
172+
{
173+
shouldReleaseLock = true;
174+
}
176175
_isCompleted = true;
177176
}
178-
// Signal completion so StreamingHttpContent can write the final chunk and finish
179-
_completionSignal.Release();
177+
178+
if (shouldReleaseLock)
179+
{
180+
// Signal completion so StreamingHttpContent can write the final chunk and finish
181+
_completionSignal.Release();
182+
}
183+
}
184+
185+
/// <summary>
186+
/// The resouces like the SemaphoreSlims are manually disposed by LambdaBootstrap after each invocation instead of relying on the
187+
/// Dipose pattern because we don't want the user's Lambda function to trigger Releasing and disposing the semaphores when
188+
/// invocation of the user's code ends.
189+
/// </summary>
190+
internal void ManualDispose()
191+
{
192+
try { _httpStreamReady.Release(); } catch (SemaphoreFullException) { /* Ignore if already released */ }
193+
_httpStreamReady.Dispose();
194+
try { _completionSignal.Release(); } catch (SemaphoreFullException) { /* Ignore if already released */ }
195+
_completionSignal.Dispose();
180196
}
181197

182198
private void ThrowIfCompletedOrError()
@@ -186,18 +202,5 @@ private void ThrowIfCompletedOrError()
186202
if (_hasError)
187203
throw new InvalidOperationException("Cannot write to a stream after an error has been reported.");
188204
}
189-
190-
// ── Dispose ──────────────────────────────────────────────────────────
191-
192-
/// <inheritdoc/>
193-
protected override void Dispose(bool disposing)
194-
{
195-
if (disposing)
196-
{
197-
try { _completionSignal.Release(); } catch (SemaphoreFullException) { }
198-
}
199-
200-
base.Dispose(disposing);
201-
}
202205
}
203206
}

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/RuntimeApiClient.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,6 @@ internal virtual async Task StartStreamingResponseAsync(
214214
request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
215215
response.EnsureSuccessStatusCode();
216216
}
217-
218-
responseStream.MarkCompleted();
219217
}
220218

221219
/// <summary>

Libraries/src/Amazon.Lambda.RuntimeSupport/Client/StreamingHttpContent.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
using System.Net.Http;
2020
using System.Text;
2121
using System.Threading.Tasks;
22+
using Amazon.Lambda.RuntimeSupport.Helpers;
2223

2324
namespace Amazon.Lambda.RuntimeSupport
2425
{
@@ -43,6 +44,7 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon
4344
// can write chunks directly to it.
4445
_responseStream.SetHttpOutputStream(stream);
4546

47+
InternalLogger.GetDefaultLogger().LogInformation("In SerializeToStreamAsync waiting for the undlying Lambda response stream in indicate it is complete.");
4648
// Wait for the handler to finish writing (MarkCompleted or ReportErrorAsync)
4749
await _responseStream.WaitForCompletionAsync();
4850

@@ -52,6 +54,7 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon
5254
// Write error trailers if present
5355
if (_responseStream.HasError)
5456
{
57+
InternalLogger.GetDefaultLogger().LogError(_responseStream.ReportedError, "An error occurred during Lambda execution. Writing error trailers to response.");
5558
await WriteErrorTrailersAsync(stream, _responseStream.ReportedError);
5659
}
5760

Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/ResponseStreamTests.cs

Lines changed: 10 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ public void Constructor_InitializesStateCorrectly()
4545
var stream = new LambdaResponseStream();
4646

4747
Assert.Equal(0, stream.BytesWritten);
48-
Assert.False(stream.IsCompleted);
4948
Assert.False(stream.HasError);
5049
Assert.Null(stream.ReportedError);
5150
}
@@ -192,7 +191,6 @@ public async Task MarkCompleted_ReleasesCompletionSignal()
192191
// Should complete within a reasonable time
193192
var completed = await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(2)));
194193
Assert.Same(waitTask, completed);
195-
Assert.True(stream.IsCompleted);
196194
}
197195

198196
// ---- Completion signaling: ReportErrorAsync releases _completionSignal ----
@@ -209,54 +207,13 @@ public async Task ReportErrorAsync_ReleasesCompletionSignal()
209207
var waitTask = stream.WaitForCompletionAsync();
210208
Assert.False(waitTask.IsCompleted, "WaitForCompletionAsync should block before ReportErrorAsync");
211209

212-
await stream.ReportErrorAsync(new Exception("test error"));
210+
stream.ReportError(new Exception("test error"));
213211

214212
var completed = await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(2)));
215213
Assert.Same(waitTask, completed);
216214
Assert.True(stream.HasError);
217215
}
218216

219-
// ---- Property 6: Size Limit Enforcement ----
220-
221-
/// <summary>
222-
/// Property 6: Size Limit Enforcement — single write exceeding limit throws.
223-
/// Validates: Requirements 3.6, 3.7
224-
/// </summary>
225-
[Theory]
226-
[InlineData(21 * 1024 * 1024)]
227-
public async Task SizeLimit_SingleWriteExceedingLimit_Throws(int writeSize)
228-
{
229-
var (stream, _) = CreateWiredStream();
230-
var data = new byte[writeSize];
231-
232-
await Assert.ThrowsAsync<InvalidOperationException>(() => stream.WriteAsync(data));
233-
}
234-
235-
/// <summary>
236-
/// Property 6: Size Limit Enforcement — multiple writes exceeding limit throws.
237-
/// Validates: Requirements 3.6, 3.7
238-
/// </summary>
239-
[Fact]
240-
public async Task SizeLimit_MultipleWritesExceedingLimit_Throws()
241-
{
242-
var (stream, _) = CreateWiredStream();
243-
244-
await stream.WriteAsync(new byte[10 * 1024 * 1024]);
245-
await Assert.ThrowsAsync<InvalidOperationException>(
246-
() => stream.WriteAsync(new byte[11 * 1024 * 1024]));
247-
}
248-
249-
[Fact]
250-
public async Task SizeLimit_ExactlyAtLimit_Succeeds()
251-
{
252-
var (stream, _) = CreateWiredStream();
253-
var data = new byte[20 * 1024 * 1024];
254-
255-
await stream.WriteAsync(data);
256-
257-
Assert.Equal(data.Length, stream.BytesWritten);
258-
}
259-
260217
// ---- Property 19: Writes After Completion Rejected ----
261218

262219
/// <summary>
@@ -283,7 +240,7 @@ public async Task WriteAsync_AfterReportError_Throws()
283240
{
284241
var (stream, _) = CreateWiredStream();
285242
await stream.WriteAsync(new byte[] { 1 });
286-
await stream.ReportErrorAsync(new Exception("test"));
243+
stream.ReportError(new Exception("test"));
287244

288245
await Assert.ThrowsAsync<InvalidOperationException>(
289246
() => stream.WriteAsync(new byte[] { 2 }));
@@ -297,7 +254,7 @@ public async Task ReportErrorAsync_SetsErrorState()
297254
var stream = new LambdaResponseStream();
298255
var exception = new InvalidOperationException("something broke");
299256

300-
await stream.ReportErrorAsync(exception);
257+
stream.ReportError(exception);
301258

302259
Assert.True(stream.HasError);
303260
Assert.Same(exception, stream.ReportedError);
@@ -309,28 +266,18 @@ public async Task ReportErrorAsync_AfterCompleted_Throws()
309266
var stream = new LambdaResponseStream();
310267
stream.MarkCompleted();
311268

312-
await Assert.ThrowsAsync<InvalidOperationException>(
313-
() => stream.ReportErrorAsync(new Exception("test")));
269+
Assert.Throws<InvalidOperationException>(
270+
() => stream.ReportError(new Exception("test")));
314271
}
315272

316273
[Fact]
317274
public async Task ReportErrorAsync_CalledTwice_Throws()
318275
{
319276
var stream = new LambdaResponseStream();
320-
await stream.ReportErrorAsync(new Exception("first"));
321-
322-
await Assert.ThrowsAsync<InvalidOperationException>(
323-
() => stream.ReportErrorAsync(new Exception("second")));
324-
}
325-
326-
[Fact]
327-
public void MarkCompleted_SetsCompletionState()
328-
{
329-
var stream = new LambdaResponseStream();
330-
331-
stream.MarkCompleted();
277+
stream.ReportError(new Exception("first"));
332278

333-
Assert.True(stream.IsCompleted);
279+
Assert.Throws<InvalidOperationException>(
280+
() => stream.ReportError(new Exception("second")));
334281
}
335282

336283
// ---- Argument validation ----
@@ -356,7 +303,7 @@ public async Task ReportErrorAsync_NullException_ThrowsArgumentNull()
356303
{
357304
var stream = new LambdaResponseStream();
358305

359-
await Assert.ThrowsAsync<ArgumentNullException>(() => stream.ReportErrorAsync(null));
306+
Assert.Throws<ArgumentNullException>(() => stream.ReportError(null));
360307
}
361308

362309
// ---- Dispose signals completion ----
@@ -369,7 +316,7 @@ public async Task Dispose_ReleasesCompletionSignalIfNotAlreadyReleased()
369316
var waitTask = stream.WaitForCompletionAsync();
370317
Assert.False(waitTask.IsCompleted);
371318

372-
stream.Dispose();
319+
stream.ManualDispose();
373320

374321
var completed = await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(2)));
375322
Assert.Same(waitTask, completed);

Libraries/test/Amazon.Lambda.RuntimeSupport.Tests/Amazon.Lambda.RuntimeSupport.UnitTests/RuntimeApiClientTests.cs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -133,25 +133,6 @@ public async Task StartStreamingResponseAsync_DeclaresTrailerHeaderUpfront()
133133
Assert.Contains(StreamingConstants.ErrorBodyTrailer, trailerValue);
134134
}
135135

136-
// --- Property 18: Stream Finalization ---
137-
138-
/// <summary>
139-
/// Property 18: Stream Finalization
140-
/// For any streaming response that completes successfully, the ResponseStream
141-
/// should be marked as completed (IsCompleted = true) after the HTTP response succeeds.
142-
/// **Validates: Requirements 8.3**
143-
/// </summary>
144-
[Fact]
145-
public async Task StartStreamingResponseAsync_MarksStreamCompletedAfterSuccess()
146-
{
147-
var stream = new LambdaResponseStream();
148-
var client = CreateClientWithMockHandler(stream, out _);
149-
150-
await client.StartStreamingResponseAsync("req-4", stream, CancellationToken.None);
151-
152-
Assert.True(stream.IsCompleted);
153-
}
154-
155136
// --- Property 10: Buffered Responses Exclude Streaming Headers ---
156137

157138
/// <summary>

0 commit comments

Comments
 (0)