Skip to content

Commit 9b25e60

Browse files
committed
- more "supports simulate failure" checks
- replace StreamConnection
1 parent 9991395 commit 9b25e60

6 files changed

Lines changed: 143 additions & 3 deletions

File tree

tests/StackExchange.Redis.Tests/BasicOpTests.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,13 +270,15 @@ public async Task GetWithExpiryWrongTypeSync()
270270
[Fact]
271271
public async Task TestSevered()
272272
{
273-
SetExpectedAmbientFailureCount(2);
274273
await using var conn = Create(allowAdmin: true, shared: false);
275274
var db = conn.GetDatabase();
276275
string key = Me();
277276
db.KeyDelete(key, CommandFlags.FireAndForget);
278277
db.StringSet(key, key, flags: CommandFlags.FireAndForget);
279278
var server = GetServer(conn);
279+
Assert.SkipUnless(server.CanSimulateConnectionFailure(), "Skipping because server cannot simulate connection failure");
280+
281+
SetExpectedAmbientFailureCount(2);
280282
server.SimulateConnectionFailure(SimulatedFailureType.All);
281283
var watch = Stopwatch.StartNew();
282284
await UntilConditionAsync(TimeSpan.FromSeconds(10), () => server.IsConnected);

tests/StackExchange.Redis.Tests/ConnectionFailedErrorsTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ void InnerScenario()
180180
{
181181
conn.GetDatabase();
182182
var server = conn.GetServer(conn.GetEndPoints()[0]);
183+
Assert.SkipUnless(server.CanSimulateConnectionFailure(), "Skipping because server cannot simulate connection failure");
183184

184185
conn.AllowConnect = false;
185186

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
using System;
2+
using System.IO;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace StackExchange.Redis.Tests;
7+
8+
/// <summary>
9+
/// Combines separate input and output streams into a single duplex stream.
10+
/// </summary>
11+
internal sealed class DuplexStream(Stream inputStream, Stream outputStream) : Stream
12+
{
13+
private readonly Stream _inputStream = inputStream ?? throw new ArgumentNullException(nameof(inputStream));
14+
private readonly Stream _outputStream = outputStream ?? throw new ArgumentNullException(nameof(outputStream));
15+
16+
public override bool CanRead => _inputStream.CanRead;
17+
public override bool CanWrite => _outputStream.CanWrite;
18+
public override bool CanSeek => false;
19+
public override bool CanTimeout => _inputStream.CanTimeout || _outputStream.CanTimeout;
20+
21+
public override int ReadTimeout
22+
{
23+
get => _inputStream.ReadTimeout;
24+
set => _inputStream.ReadTimeout = value;
25+
}
26+
27+
public override int WriteTimeout
28+
{
29+
get => _outputStream.WriteTimeout;
30+
set => _outputStream.WriteTimeout = value;
31+
}
32+
33+
public override long Length => throw new NotSupportedException($"{nameof(DuplexStream)} does not support seeking.");
34+
public override long Position
35+
{
36+
get => throw new NotSupportedException($"{nameof(DuplexStream)} does not support seeking.");
37+
set => throw new NotSupportedException($"{nameof(DuplexStream)} does not support seeking.");
38+
}
39+
40+
public override int Read(byte[] buffer, int offset, int count)
41+
=> _inputStream.Read(buffer, offset, count);
42+
43+
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
44+
=> _inputStream.ReadAsync(buffer, offset, count, cancellationToken);
45+
46+
#if NET6_0_OR_GREATER
47+
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
48+
=> _inputStream.ReadAsync(buffer, cancellationToken);
49+
50+
public override int Read(Span<byte> buffer)
51+
=> _inputStream.Read(buffer);
52+
#endif
53+
54+
public override int ReadByte()
55+
=> _inputStream.ReadByte();
56+
57+
public override void Write(byte[] buffer, int offset, int count)
58+
=> _outputStream.Write(buffer, offset, count);
59+
60+
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
61+
=> _outputStream.WriteAsync(buffer, offset, count, cancellationToken);
62+
63+
#if NET6_0_OR_GREATER
64+
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
65+
=> _outputStream.WriteAsync(buffer, cancellationToken);
66+
67+
public override void Write(ReadOnlySpan<byte> buffer)
68+
=> _outputStream.Write(buffer);
69+
#endif
70+
71+
public override void WriteByte(byte value)
72+
=> _outputStream.WriteByte(value);
73+
74+
public override void Flush()
75+
=> _outputStream.Flush();
76+
77+
public override Task FlushAsync(CancellationToken cancellationToken)
78+
=> _outputStream.FlushAsync(cancellationToken);
79+
80+
public override long Seek(long offset, SeekOrigin origin)
81+
=> throw new NotSupportedException($"{nameof(DuplexStream)} does not support seeking.");
82+
83+
public override void SetLength(long value)
84+
=> throw new NotSupportedException($"{nameof(DuplexStream)} does not support seeking.");
85+
86+
public override void Close()
87+
{
88+
_inputStream.Close();
89+
_outputStream.Close();
90+
base.Close();
91+
}
92+
93+
protected override void Dispose(bool disposing)
94+
{
95+
if (disposing)
96+
{
97+
_inputStream.Dispose();
98+
_outputStream.Dispose();
99+
}
100+
base.Dispose(disposing);
101+
}
102+
103+
#if NET6_0_OR_GREATER
104+
public override async ValueTask DisposeAsync()
105+
{
106+
await _inputStream.DisposeAsync().ConfigureAwait(false);
107+
await _outputStream.DisposeAsync().ConfigureAwait(false);
108+
await base.DisposeAsync().ConfigureAwait(false);
109+
}
110+
#endif
111+
112+
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
113+
=> _inputStream.BeginRead(buffer, offset, count, callback, state);
114+
115+
public override int EndRead(IAsyncResult asyncResult)
116+
=> _inputStream.EndRead(asyncResult);
117+
118+
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
119+
=> _outputStream.BeginWrite(buffer, offset, count, callback, state);
120+
121+
public override void EndWrite(IAsyncResult asyncResult)
122+
=> _outputStream.EndWrite(asyncResult);
123+
124+
#if NET6_0_OR_GREATER
125+
public override void CopyTo(Stream destination, int bufferSize)
126+
=> _inputStream.CopyTo(destination, bufferSize);
127+
#endif
128+
129+
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
130+
=> _inputStream.CopyToAsync(destination, bufferSize, cancellationToken);
131+
}

tests/StackExchange.Redis.Tests/ExceptionFactoryTests.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ public async Task MultipleEndpointsThrowConnectionException()
3737

3838
foreach (var endpoint in conn.GetEndPoints())
3939
{
40-
conn.GetServer(endpoint).SimulateConnectionFailure(SimulatedFailureType.All);
40+
var server = conn.GetServer(endpoint);
41+
Assert.SkipUnless(server.CanSimulateConnectionFailure(), "Skipping because server cannot simulate connection failure");
42+
server.SimulateConnectionFailure(SimulatedFailureType.All);
4143
}
4244

4345
var ex = ExceptionFactory.NoConnectionAvailable(conn.UnderlyingMultiplexer, null, null);

tests/StackExchange.Redis.Tests/FailoverTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ public async Task SubscriptionsSurviveConnectionFailureAsync()
222222
Assert.Equal(1, counter1);
223223

224224
var server = GetServer(conn);
225+
Assert.SkipUnless(server.CanSimulateConnectionFailure(), "Skipping because server cannot simulate connection failure");
225226
var socketCount = server.GetCounters().Subscription.SocketCount;
226227
Log($"Expecting 1 socket, got {socketCount}");
227228
Assert.Equal(1, socketCount);

tests/StackExchange.Redis.Tests/InProcessTestServer.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ private sealed class InProcTunnel(
112112
var serverToClient = new Pipe(pipeOptions ?? PipeOptions.Default);
113113
var serverSide = new Duplex(clientToServer.Reader, serverToClient.Writer);
114114
Task.Run(async () => await server.RunClientAsync(serverSide, node: node), cancellationToken).RedisFireAndForget();
115-
var clientSide = StreamConnection.GetDuplex(serverToClient.Reader, clientToServer.Writer);
115+
116+
var readStream = serverToClient.Reader.AsStream();
117+
var writeStream = clientToServer.Writer.AsStream();
118+
var clientSide = new DuplexStream(readStream, writeStream);
116119
return new(clientSide);
117120
}
118121
return base.BeforeAuthenticateAsync(endpoint, connectionType, socket, cancellationToken);

0 commit comments

Comments
 (0)