Skip to content

Commit ff1b587

Browse files
authored
wip (#3024)
1 parent 9b25e60 commit ff1b587

3 files changed

Lines changed: 62 additions & 37 deletions

File tree

src/StackExchange.Redis/PhysicalBridge.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,7 +1325,7 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect
13251325
var result = WriteMessageInsideLock(physical, message);
13261326
if (result == WriteResult.Success)
13271327
{
1328-
var flush = physical.FlushAsync(false);
1328+
var flush = physical.FlushAsync(false, physical.OutputCancel);
13291329
if (!flush.IsCompletedSuccessfully)
13301330
{
13311331
releaseLock = false; // so we don't release prematurely
@@ -1394,7 +1394,7 @@ private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(
13941394

13951395
if (result == WriteResult.Success)
13961396
{
1397-
result = await physical.FlushAsync(false).ForAwait();
1397+
result = await physical.FlushAsync(false, physical.OutputCancel).ForAwait();
13981398
}
13991399

14001400
physical.SetIdle();

src/StackExchange.Redis/PhysicalConnection.Read.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,22 @@ internal sealed partial class PhysicalConnection
2121
private volatile ReadStatus _readStatus = ReadStatus.NotStarted;
2222
internal ReadStatus GetReadStatus() => _readStatus;
2323

24-
internal void StartReading(CancellationToken cancellationToken = default) => ReadAllAsync(cancellationToken).RedisFireAndForget();
24+
internal void StartReading(CancellationToken cancellation = default)
25+
{
26+
if (cancellation.CanBeCanceled)
27+
{
28+
cancellation.ThrowIfCancellationRequested();
29+
if (InputCancel.CanBeCanceled)
30+
{
31+
cancellation = CancellationTokenSource.CreateLinkedTokenSource(cancellation, InputCancel).Token;
32+
}
33+
}
34+
else
35+
{
36+
cancellation = InputCancel;
37+
}
38+
ReadAllAsync(cancellation).RedisFireAndForget();
39+
}
2540

2641
private async Task ReadAllAsync(CancellationToken cancellationToken)
2742
{

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 44 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,26 @@ namespace StackExchange.Redis
2424
{
2525
internal sealed partial class PhysicalConnection : IDisposable
2626
{
27+
// infrastructure to simulate connection death, debug only
28+
private partial bool CanCancel();
29+
[Conditional("DEBUG")]
30+
partial void OnCancel(bool input, bool output);
31+
#if DEBUG
32+
private readonly CancellationTokenSource _inputCancel = new(), _outputCancel = new();
33+
internal CancellationToken InputCancel => _inputCancel.Token;
34+
internal CancellationToken OutputCancel => _outputCancel.Token;
35+
36+
partial void OnCancel(bool input, bool output)
37+
{
38+
if (input) _inputCancel.Cancel();
39+
if (output) _outputCancel.Cancel();
40+
}
41+
private partial bool CanCancel() => true;
42+
#else
43+
internal CancellationToken InputCancel => CancellationToken.None;
44+
internal CancellationToken OutputCancel => CancellationToken.None;
45+
#endif
46+
2747
internal readonly byte[]? ChannelPrefix;
2848

2949
private const int DefaultRedisDatabaseCount = 16;
@@ -208,7 +228,7 @@ static Socket CreateSocket(EndPoint endpoint)
208228
log?.LogInformationStartingRead(new(endpoint));
209229
try
210230
{
211-
StartReading(CancellationToken.None);
231+
StartReading();
212232
// Normal return
213233
}
214234
catch (Exception ex)
@@ -353,7 +373,7 @@ public Task FlushAsync()
353373
if (tmp != null)
354374
{
355375
_writeStatus = WriteStatus.Flushing;
356-
var flush = tmp.FlushAsync();
376+
var flush = tmp.FlushAsync(OutputCancel);
357377
if (!flush.IsCompletedSuccessfully)
358378
{
359379
return AwaitedFlush(flush);
@@ -368,40 +388,23 @@ public Task FlushAsync()
368388

369389
internal void SimulateConnectionFailure(SimulatedFailureType failureType)
370390
{
371-
throw new NotImplementedException(nameof(SimulateConnectionFailure));
372-
/*
373-
var raiseFailed = false;
374-
if (connectionType == ConnectionType.Interactive)
375-
{
376-
if (failureType.HasFlag(SimulatedFailureType.InteractiveInbound))
377-
{
378-
_ioPipe?.Input.Complete(new Exception("Simulating interactive input failure"));
379-
raiseFailed = true;
380-
}
381-
if (failureType.HasFlag(SimulatedFailureType.InteractiveOutbound))
382-
{
383-
_ioPipe?.Output.Complete(new Exception("Simulating interactive output failure"));
384-
raiseFailed = true;
385-
}
386-
}
387-
else if (connectionType == ConnectionType.Subscription)
391+
bool killInput = false, killOutput = false;
392+
switch (connectionType)
388393
{
389-
if (failureType.HasFlag(SimulatedFailureType.SubscriptionInbound))
390-
{
391-
_ioPipe?.Input.Complete(new Exception("Simulating subscription input failure"));
392-
raiseFailed = true;
393-
}
394-
if (failureType.HasFlag(SimulatedFailureType.SubscriptionOutbound))
395-
{
396-
_ioPipe?.Output.Complete(new Exception("Simulating subscription output failure"));
397-
raiseFailed = true;
398-
}
394+
case ConnectionType.Interactive:
395+
killInput = failureType.HasFlag(SimulatedFailureType.InteractiveInbound);
396+
killOutput = failureType.HasFlag(SimulatedFailureType.InteractiveOutbound);
397+
break;
398+
case ConnectionType.Subscription:
399+
killInput = failureType.HasFlag(SimulatedFailureType.SubscriptionInbound);
400+
killOutput = failureType.HasFlag(SimulatedFailureType.SubscriptionOutbound);
401+
break;
399402
}
400-
if (raiseFailed)
403+
if (killInput | killOutput)
401404
{
405+
OnCancel(killInput, killOutput);
402406
RecordConnectionFailed(ConnectionFailureType.SocketFailure);
403407
}
404-
*/
405408
}
406409

407410
public void RecordConnectionFailed(
@@ -886,7 +889,13 @@ private async ValueTask<WriteResult> FlushAsync_Awaited(PhysicalConnection conne
886889
[System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0062:Make local function 'static'", Justification = "DEBUG uses instance data")]
887890
internal WriteResult FlushSync(bool throwOnFailure, int millisecondsTimeout)
888891
{
889-
var cts = _reusableFlushSyncTokenSource ??= new CancellationTokenSource();
892+
var cts = _reusableFlushSyncTokenSource;
893+
if (cts is null)
894+
{
895+
cts = new CancellationTokenSource();
896+
OutputCancel.Register(static s => { ((CancellationTokenSource)s!).Cancel(); }, cts);
897+
_reusableFlushSyncTokenSource = cts;
898+
}
890899
var flush = FlushAsync(throwOnFailure, cts.Token);
891900
if (!flush.IsCompletedSuccessfully)
892901
{
@@ -914,14 +923,15 @@ void ThrowTimeout()
914923
throw new TimeoutException("timeout while synchronously flushing");
915924
}
916925
}
917-
internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure, CancellationToken cancellationToken = default)
926+
internal ValueTask<WriteResult> FlushAsync(bool throwOnFailure, CancellationToken soleCancel)
918927
{
919928
var tmp = _ioStream;
920929
if (tmp == null) return new ValueTask<WriteResult>(WriteResult.NoConnectionAvailable);
921930
try
922931
{
923932
_writeStatus = WriteStatus.Flushing;
924-
var flush = tmp.FlushAsync(cancellationToken);
933+
var flush = tmp.FlushAsync(soleCancel);
934+
925935
if (!flush.IsCompletedSuccessfully) return FlushAsync_Awaited(this, flush, throwOnFailure);
926936
_writeStatus = WriteStatus.Flushed;
927937
UpdateLastWriteTime();

0 commit comments

Comments
 (0)