|
1 | | -/* |
| 1 | +/* |
2 | 2 | * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. |
3 | 3 | * |
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License"). |
@@ -70,6 +70,7 @@ private class CapturingStreamingRuntimeApiClient : RuntimeApiClient, IRuntimeApi |
70 | 70 | public byte[] CapturedHttpBytes { get; private set; } |
71 | 71 | public ResponseStream LastResponseStream { get; private set; } |
72 | 72 | public Stream LastBufferedOutputStream { get; private set; } |
| 73 | + public Action OnStreamingReady { get; set; } |
73 | 74 |
|
74 | 75 | public new Amazon.Lambda.RuntimeSupport.Helpers.IConsoleLoggerWriter ConsoleLogger { get; } = new Helpers.LogLevelLoggerWriter(new SystemEnvironmentVariables()); |
75 | 76 |
|
@@ -108,6 +109,7 @@ internal override async Task<IDisposable> StartStreamingResponseAsync( |
108 | 109 | await responseStream.SetHttpOutputStreamAsync(captureStream, cancellationToken); |
109 | 110 |
|
110 | 111 | // Wait for the handler to finish writing (mirrors real RawStreamingHttpClient behavior) |
| 112 | + OnStreamingReady?.Invoke(); |
111 | 113 | await responseStream.WaitForCompletionAsync(cancellationToken); |
112 | 114 | CapturedHttpBytes = captureStream.ToArray(); |
113 | 115 | return new NoOpDisposable(); |
@@ -262,42 +264,38 @@ public async Task Buffered_ResponseBodyTransmittedCorrectly() |
262 | 264 | [Fact] |
263 | 265 | public async Task MidstreamError_SetsErrorStateWithExceptionDetails() |
264 | 266 | { |
265 | | - var testSuccess = false; |
266 | | - for (int i = 0; i < 5 && !testSuccess; i++) |
267 | | - { |
268 | | - ResponseStreamFactory.CleanupInvocation(isMultiConcurrency: false); |
269 | | - var client = CreateClient(); |
270 | | - const string errorMessage = "something went wrong mid-stream"; |
271 | | - |
272 | | - LambdaBootstrapHandler handler = async (invocation) => |
273 | | - { |
274 | | - var stream = ResponseStreamFactory.CreateStream(Array.Empty<byte>()); |
275 | | - await stream.WriteAsync(Encoding.UTF8.GetBytes("some data")); |
276 | | - await Task.Delay(1000); |
277 | | - throw new InvalidOperationException(errorMessage); |
278 | | - }; |
279 | | - |
280 | | - using var bootstrap = new LambdaBootstrap(handler, null); |
281 | | - bootstrap.Client = client; |
282 | | - await bootstrap.InvokeOnceAsync(); |
| 267 | + var client = CreateClient(); |
| 268 | + const string errorMessage = "something went wrong mid-stream"; |
283 | 269 |
|
284 | | - if (!client.StartStreamingCalled) |
285 | | - continue; |
286 | | - |
287 | | - Assert.NotNull(client.LastResponseStream); |
288 | | - Assert.True(client.LastResponseStream.HasError); |
289 | | - Assert.NotNull(client.LastResponseStream.ReportedError); |
290 | | - Assert.IsType<InvalidOperationException>(client.LastResponseStream.ReportedError); |
291 | | - Assert.Equal(errorMessage, client.LastResponseStream.ReportedError.Message); |
| 270 | + // Signal so the handler waits until the streaming pipeline is fully |
| 271 | + // established (WaitForCompletionAsync is actively listening) before throwing. |
| 272 | + var streamingReady = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); |
| 273 | + client.OnStreamingReady = () => streamingReady.TrySetResult(true); |
292 | 274 |
|
293 | | - // Verify the handler's data was still captured before the error |
294 | | - var output = Encoding.UTF8.GetString(client.CapturedHttpBytes); |
295 | | - Assert.Contains("some data", output); |
| 275 | + LambdaBootstrapHandler handler = async (invocation) => |
| 276 | + { |
| 277 | + var stream = ResponseStreamFactory.CreateStream(Array.Empty<byte>()); |
| 278 | + await stream.WriteAsync(Encoding.UTF8.GetBytes("some data")); |
| 279 | + // Wait until StartStreamingResponseAsync has reached WaitForCompletionAsync |
| 280 | + // so the completion signal will be observed when ReportError fires. |
| 281 | + await streamingReady.Task; |
| 282 | + throw new InvalidOperationException(errorMessage); |
| 283 | + }; |
296 | 284 |
|
297 | | - testSuccess = true; |
298 | | - } |
| 285 | + using var bootstrap = new LambdaBootstrap(handler, null); |
| 286 | + bootstrap.Client = client; |
| 287 | + await bootstrap.InvokeOnceAsync(); |
299 | 288 |
|
300 | | - Assert.True(testSuccess); |
| 289 | + Assert.True(client.StartStreamingCalled); |
| 290 | + Assert.NotNull(client.LastResponseStream); |
| 291 | + Assert.True(client.LastResponseStream.HasError); |
| 292 | + Assert.NotNull(client.LastResponseStream.ReportedError); |
| 293 | + Assert.IsType<InvalidOperationException>(client.LastResponseStream.ReportedError); |
| 294 | + Assert.Equal(errorMessage, client.LastResponseStream.ReportedError.Message); |
| 295 | + |
| 296 | + // Verify the handler's data was still captured before the error |
| 297 | + var output = Encoding.UTF8.GetString(client.CapturedHttpBytes); |
| 298 | + Assert.Contains("some data", output); |
301 | 299 | } |
302 | 300 |
|
303 | 301 | // ─── 10.4 Multi-concurrency ────────────────────────────────────────────────── |
|
0 commit comments