Skip to content

Commit 8c660bf

Browse files
committed
tests: init output and allow time to complete
1 parent 2c215c0 commit 8c660bf

3 files changed

Lines changed: 23 additions & 11 deletions

File tree

src/StackExchange.Redis/PhysicalConnection.Write.cs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Buffers;
3+
using System.Diagnostics;
34
using System.IO;
45
using System.IO.Pipelines;
56
using System.Threading;
@@ -20,28 +21,37 @@ public IBufferWriter<byte> Output
2021
}
2122
}
2223

23-
private void CreateOutputPipe()
24+
private Task _writeComplete = Task.CompletedTask;
25+
26+
private void InitOutput(Stream? stream)
2427
{
25-
if (_ioStream is not { } stream) return;
28+
if (stream is null) return;
29+
_ioStream = stream;
2630
var pipe = new Pipe();
2731
_output = pipe.Writer;
28-
_ = Task.Run(() => CopyOutputAsync(this, pipe.Reader, stream));
32+
_writeComplete = Task.Run(() => CopyOutputAsync(this, pipe.Reader), OutputCancel);
2933
}
3034

3135
internal bool HasOutputPipe => _output is not null;
3236

33-
private static async Task CopyOutputAsync(PhysicalConnection connection, PipeReader from, Stream to)
37+
internal Task CompleteOutputAsync(Exception? exception = null)
38+
{
39+
_output?.Complete(exception);
40+
return _writeComplete;
41+
}
42+
43+
private static async Task CopyOutputAsync(PhysicalConnection connection, PipeReader from)
3444
{
3545
try
3646
{
3747
bool pendingFlush = false;
38-
while (true)
48+
while (connection._ioStream is { } stream)
3949
{
4050
if (!from.TryRead(out var read))
4151
{
4252
if (pendingFlush)
4353
{
44-
await to.FlushAsync(connection.OutputCancel).ConfigureAwait(false);
54+
await stream.FlushAsync(connection.OutputCancel).ConfigureAwait(false);
4555
pendingFlush = false;
4656
}
4757
read = await from.ReadAsync(connection.OutputCancel).ConfigureAwait(false);
@@ -55,7 +65,7 @@ private static async Task CopyOutputAsync(PhysicalConnection connection, PipeRea
5565
{
5666
pendingFlush = true;
5767
connection.totalBytesSent += segment.Length;
58-
await to.WriteAsync(buffer.First, connection.OutputCancel).ConfigureAwait(false);
68+
await stream.WriteAsync(buffer.First, connection.OutputCancel).ConfigureAwait(false);
5969
}
6070
}
6171
else
@@ -66,7 +76,7 @@ private static async Task CopyOutputAsync(PhysicalConnection connection, PipeRea
6676
{
6777
pendingFlush = true;
6878
connection.totalBytesSent += segment.Length;
69-
await to.WriteAsync(segment, connection.OutputCancel).ConfigureAwait(false);
79+
await stream.WriteAsync(segment, connection.OutputCancel).ConfigureAwait(false);
7080
}
7181
}
7282
}

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public PhysicalConnection(
110110
_protocol = protocol;
111111
_bridge = new WeakReference(null);
112112
_physicalName = name;
113-
_ioStream = ioStream;
113+
InitOutput(ioStream);
114114
OnCreateEcho();
115115
}
116116
public PhysicalConnection(PhysicalBridge bridge)
@@ -1144,8 +1144,7 @@ static Stream DemandSocketStream(Socket? socket)
11441144
stream ??= DemandSocketStream(socket);
11451145
OnWrapForLogging(ref stream, _physicalName);
11461146

1147-
_ioStream = stream;
1148-
CreateOutputPipe();
1147+
InitOutput(stream);
11491148

11501149
log?.LogInformationConnected(bridge.Name);
11511150

tests/StackExchange.Redis.Tests/RoundTripUnitTests/TestConnection.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ internal static async Task<T> ExecuteAsync<T>(
4949
message.SetSource(box, processor);
5050
conn.WriteOutbound(message, commandMap, channelPrefix);
5151
Assert.Equal(TaskStatus.WaitingForActivation, tcs.Task.Status); // should be pending, since we haven't responded yet
52+
await conn.CompleteOutputAsync();
5253

5354
// check the request
5455
conn.AssertOutbound(requestResp);
@@ -59,6 +60,8 @@ internal static async Task<T> ExecuteAsync<T>(
5960
return await tcs.Task;
6061
}
6162

63+
private Task CompleteOutputAsync(Exception? exception = null) => _physical.CompleteOutputAsync(exception);
64+
6265
public TestConnection(
6366
bool startReading = true,
6467
ConnectionType connectionType = ConnectionType.Interactive,

0 commit comments

Comments
 (0)