Skip to content

Commit 76b3548

Browse files
committed
Intial spike of ambient cancellation
1 parent 0488c42 commit 76b3548

13 files changed

Lines changed: 1171 additions & 4 deletions

File tree

StackExchange.Redis.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ConsoleTest", "tests\Consol
142142
EndProject
143143
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ConsoleTestBaseline", "tests\ConsoleTestBaseline\ConsoleTestBaseline.csproj", "{69A0ACF2-DF1F-4F49-B554-F732DCA938A3}"
144144
EndProject
145+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SimpleCancellationDemo", "tests\SimpleCancellationDemo\SimpleCancellationDemo.csproj", "{368A06AA-9DEB-4C55-B9CF-A1070B85E502}"
146+
EndProject
145147
Global
146148
GlobalSection(SolutionConfigurationPlatforms) = preSolution
147149
Debug|Any CPU = Debug|Any CPU
@@ -192,6 +194,10 @@ Global
192194
{69A0ACF2-DF1F-4F49-B554-F732DCA938A3}.Debug|Any CPU.Build.0 = Debug|Any CPU
193195
{69A0ACF2-DF1F-4F49-B554-F732DCA938A3}.Release|Any CPU.ActiveCfg = Release|Any CPU
194196
{69A0ACF2-DF1F-4F49-B554-F732DCA938A3}.Release|Any CPU.Build.0 = Release|Any CPU
197+
{368A06AA-9DEB-4C55-B9CF-A1070B85E502}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
198+
{368A06AA-9DEB-4C55-B9CF-A1070B85E502}.Debug|Any CPU.Build.0 = Debug|Any CPU
199+
{368A06AA-9DEB-4C55-B9CF-A1070B85E502}.Release|Any CPU.ActiveCfg = Release|Any CPU
200+
{368A06AA-9DEB-4C55-B9CF-A1070B85E502}.Release|Any CPU.Build.0 = Release|Any CPU
195201
EndGlobalSection
196202
GlobalSection(SolutionProperties) = preSolution
197203
HideSolutionNode = FALSE
@@ -214,6 +220,7 @@ Global
214220
{A9F81DA3-DA82-423E-A5DD-B11C37548E06} = {96E891CD-2ED7-4293-A7AB-4C6F5D8D2B05}
215221
{A0F89B8B-32A3-4C28-8F1B-ADE343F16137} = {73A5C363-CA1F-44C4-9A9B-EF791A76BA6A}
216222
{69A0ACF2-DF1F-4F49-B554-F732DCA938A3} = {73A5C363-CA1F-44C4-9A9B-EF791A76BA6A}
223+
{368A06AA-9DEB-4C55-B9CF-A1070B85E502} = {73A5C363-CA1F-44C4-9A9B-EF791A76BA6A}
217224
EndGlobalSection
218225
GlobalSection(ExtensibilityGlobals) = postSolution
219226
SolutionGuid = {193AA352-6748-47C1-A5FC-C9AA6B5F000B}

docs/AmbientCancellation.md

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
# Ambient Cancellation Support
2+
3+
StackExchange.Redis now supports ambient cancellation using `AsyncLocal<T>` to provide cancellation tokens to Redis operations without expanding the API surface.
4+
5+
## Overview
6+
7+
The ambient cancellation feature allows you to set a cancellation context that applies to all Redis operations within an async scope. This provides a clean way to handle timeouts and cancellation without adding `CancellationToken` parameters to every method.
8+
9+
## Key Features
10+
11+
- **Zero API Surface Impact**: No new parameters added to existing methods
12+
- **Scoped Cancellation**: Uses `using` statements for proper scope management
13+
- **Timeout Support**: Can specify timeouts that are converted to cancellation tokens
14+
- **Composable**: Can combine cancellation tokens with timeouts
15+
- **Nested Scopes**: Inner scopes override outer scopes
16+
- **Backward Compatible**: All existing code continues to work unchanged
17+
18+
## Usage
19+
20+
### Basic Cancellation
21+
22+
```csharp
23+
using var cts = new CancellationTokenSource();
24+
25+
using (database.WithCancellation(cts.Token))
26+
{
27+
await database.StringSetAsync("key", "value");
28+
var value = await database.StringGetAsync("key");
29+
// Both operations use the cancellation token
30+
}
31+
```
32+
33+
### Timeout Support
34+
35+
```csharp
36+
using (database.WithTimeout(TimeSpan.FromSeconds(5)))
37+
{
38+
await database.StringSetAsync("key", "value");
39+
// Operation will be cancelled if it takes longer than 5 seconds
40+
}
41+
```
42+
43+
### Combined Cancellation and Timeout
44+
45+
```csharp
46+
using var cts = new CancellationTokenSource();
47+
48+
using (database.WithCancellationAndTimeout(cts.Token, TimeSpan.FromSeconds(10)))
49+
{
50+
await database.StringSetAsync("key", "value");
51+
// Operation will be cancelled if either the token is cancelled OR 10 seconds elapse
52+
}
53+
```
54+
55+
### Nested Scopes
56+
57+
```csharp
58+
using var outerToken = new CancellationTokenSource();
59+
using var innerToken = new CancellationTokenSource();
60+
61+
using (database.WithCancellation(outerToken.Token))
62+
{
63+
await database.StringSetAsync("key1", "value1"); // Uses outerToken
64+
65+
using (database.WithCancellation(innerToken.Token))
66+
{
67+
await database.StringSetAsync("key2", "value2"); // Uses innerToken
68+
}
69+
70+
await database.StringSetAsync("key3", "value3"); // Uses outerToken again
71+
}
72+
```
73+
74+
### Pub/Sub Operations
75+
76+
```csharp
77+
using var cts = new CancellationTokenSource();
78+
79+
using (subscriber.WithCancellation(cts.Token))
80+
{
81+
await subscriber.SubscribeAsync("channel", handler);
82+
await subscriber.PublishAsync("channel", "message");
83+
// Both operations use the cancellation token
84+
}
85+
```
86+
87+
## Extension Methods
88+
89+
The functionality is provided through extension methods on `IRedisAsync`:
90+
91+
- `WithCancellation(CancellationToken)` - Sets ambient cancellation token
92+
- `WithTimeout(TimeSpan)` - Sets ambient timeout (converted to cancellation token)
93+
- `WithCancellationAndTimeout(CancellationToken, TimeSpan)` - Sets both cancellation and timeout
94+
95+
## Implementation Details
96+
97+
### AsyncLocal Context
98+
99+
The implementation uses `AsyncLocal<T>` to flow the cancellation context through async operations:
100+
101+
```csharp
102+
private static readonly AsyncLocal<CancellationContext?> _context = new();
103+
```
104+
105+
### Scope Management
106+
107+
Each `WithCancellation` call returns an `IDisposable` that manages the scope:
108+
109+
```csharp
110+
public static IDisposable WithCancellation(this IRedisAsync redis, CancellationToken cancellationToken)
111+
{
112+
return new CancellationScope(cancellationToken, null);
113+
}
114+
```
115+
116+
### Integration Points
117+
118+
The cancellation token is applied at the core execution level:
119+
120+
1. `RedisBase.ExecuteAsync` retrieves the ambient cancellation token
121+
2. `ConnectionMultiplexer.ExecuteAsyncImpl` accepts the cancellation token
122+
3. `TaskResultBox<T>` registers for cancellation and properly handles cancellation
123+
124+
### Timeout Handling
125+
126+
Timeouts are converted to cancellation tokens using `CancellationTokenSource`:
127+
128+
```csharp
129+
public CancellationToken GetEffectiveToken()
130+
{
131+
if (!Timeout.HasValue) return Token;
132+
133+
var timeoutSource = new CancellationTokenSource(Timeout.Value);
134+
return Token.CanBeCanceled
135+
? CancellationTokenSource.CreateLinkedTokenSource(Token, timeoutSource.Token).Token
136+
: timeoutSource.Token;
137+
}
138+
```
139+
140+
## Error Handling
141+
142+
When an operation is cancelled, it throws an `OperationCanceledException`:
143+
144+
```csharp
145+
try
146+
{
147+
using (database.WithCancellation(cancelledToken))
148+
{
149+
await database.StringSetAsync("key", "value");
150+
}
151+
}
152+
catch (OperationCanceledException)
153+
{
154+
// Handle cancellation
155+
}
156+
```
157+
158+
## Performance Considerations
159+
160+
- **Minimal Overhead**: When no ambient cancellation is set, there's virtually no performance impact
161+
- **Efficient Scoping**: Uses struct-based scoping to minimize allocations
162+
- **Proper Cleanup**: Cancellation registrations are properly disposed when operations complete
163+
164+
## Limitations
165+
166+
- **Client-Side Only**: Redis doesn't support server-side cancellation, so cancellation only prevents the client from waiting for a response
167+
- **In-Flight Operations**: Operations that have already been sent to the server will continue executing on the server even if cancelled on the client
168+
- **Connection Health**: Cancelled operations don't affect connection health or availability
169+
170+
## Migration
171+
172+
Existing code requires no changes. The ambient cancellation is purely additive:
173+
174+
```csharp
175+
// This continues to work exactly as before
176+
await database.StringSetAsync("key", "value");
177+
178+
// This adds cancellation support
179+
using (database.WithCancellation(cancellationToken))
180+
{
181+
await database.StringSetAsync("key", "value");
182+
}
183+
```
184+
185+
## Best Practices
186+
187+
1. **Use `using` statements** to ensure proper scope cleanup
188+
2. **Prefer cancellation tokens over timeouts** when possible for better control
189+
3. **Handle `OperationCanceledException`** appropriately in your application
190+
4. **Don't rely on cancellation for server-side operation termination**
191+
5. **Test cancellation scenarios** to ensure your application handles them gracefully
192+
193+
## Examples
194+
195+
See `examples/CancellationExample.cs` for comprehensive usage examples and `tests/StackExchange.Redis.Tests/CancellationTests.cs` for test cases demonstrating the functionality.

src/StackExchange.Redis/ConnectionMultiplexer.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2132,7 +2132,9 @@ static async Task<T> ExecuteAsyncImpl_Awaited(ConnectionMultiplexer @this, Value
21322132
IResultBox<T>? source = null;
21332133
if (!message.IsFireAndForget)
21342134
{
2135-
source = TaskResultBox<T>.Create(out tcs, state);
2135+
// Use the message's cancellation token, which preserves the ambient context from when the message was created
2136+
// This ensures that resent messages (due to MOVED, failover, etc.) still respect the original cancellation
2137+
source = TaskResultBox<T>.Create(out tcs, state, message.CancellationToken);
21362138
}
21372139
var write = TryPushMessageToBridgeAsync(message, processor, source, ref server);
21382140
if (!write.IsCompletedSuccessfully)
@@ -2183,7 +2185,9 @@ static async Task<T> ExecuteAsyncImpl_Awaited(ConnectionMultiplexer @this, Value
21832185
IResultBox<T?>? source = null;
21842186
if (!message.IsFireAndForget)
21852187
{
2186-
source = TaskResultBox<T?>.Create(out tcs, state);
2188+
// Use the message's cancellation token, which preserves the ambient context from when the message was created
2189+
// This ensures that resent messages (due to MOVED, failover, etc.) still respect the original cancellation
2190+
source = TaskResultBox<T?>.Create(out tcs, state, message.CancellationToken);
21872191
}
21882192
var write = TryPushMessageToBridgeAsync(message, processor, source!, ref server);
21892193
if (!write.IsCompletedSuccessfully)

src/StackExchange.Redis/Message.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ internal abstract class Message : ICompletable
8383

8484
private ResultProcessor? resultProcessor;
8585

86+
// Cancellation token for this specific message
87+
private CancellationToken _cancellationToken;
88+
8689
// All for profiling purposes
8790
private ProfiledCommand? performance;
8891
internal DateTime CreatedDateTime;
@@ -116,6 +119,9 @@ protected Message(int db, CommandFlags flags, RedisCommand command)
116119
Flags = flags & UserSelectableFlags;
117120
if (primaryOnly) SetPrimaryOnly();
118121

122+
// Get ambient cancellation token when the message is created
123+
_cancellationToken = RedisCancellationExtensions.GetEffectiveCancellationToken();
124+
119125
CreatedDateTime = DateTime.UtcNow;
120126
CreatedTimestamp = Stopwatch.GetTimestamp();
121127
Status = CommandStatus.WaitingToBeSent;
@@ -217,6 +223,12 @@ internal void WithHighIntegrity(uint value)
217223

218224
public IResultBox? ResultBox => resultBox;
219225

226+
/// <summary>
227+
/// Gets the cancellation token associated with this message.
228+
/// This token is captured when the message is created and preserved across resends.
229+
/// </summary>
230+
internal CancellationToken CancellationToken => _cancellationToken;
231+
220232
public abstract int ArgCount { get; } // note: over-estimate if necessary
221233

222234
public static Message Create(int db, CommandFlags flags, RedisCommand command)
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
1-
#nullable enable
1+
#nullable enable
2+
StackExchange.Redis.RedisCancellationExtensions
3+
static StackExchange.Redis.RedisCancellationExtensions.WithCancellation(this StackExchange.Redis.IRedisAsync! redis, System.Threading.CancellationToken cancellationToken) -> System.IDisposable!
4+
static StackExchange.Redis.RedisCancellationExtensions.WithCancellationAndTimeout(this StackExchange.Redis.IRedisAsync! redis, System.Threading.CancellationToken cancellationToken, System.TimeSpan timeout) -> System.IDisposable!
5+
static StackExchange.Redis.RedisCancellationExtensions.WithTimeout(this StackExchange.Redis.IRedisAsync! redis, System.TimeSpan timeout) -> System.IDisposable!

src/StackExchange.Redis/RedisBase.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,19 @@ internal virtual Task<T> ExecuteAsync<T>(Message? message, ResultProcessor<T>? p
4444
{
4545
if (message is null) return CompletedTask<T>.FromDefault(defaultValue, asyncState);
4646
multiplexer.CheckMessage(message);
47+
48+
// The message already captures the ambient cancellation token when it was created,
49+
// so we don't need to pass it again. This ensures resent messages preserve their original cancellation context.
4750
return multiplexer.ExecuteAsyncImpl<T>(message, processor, asyncState, server, defaultValue);
4851
}
4952

5053
internal virtual Task<T?> ExecuteAsync<T>(Message? message, ResultProcessor<T>? processor, ServerEndPoint? server = null)
5154
{
5255
if (message is null) return CompletedTask<T>.Default(asyncState);
5356
multiplexer.CheckMessage(message);
57+
58+
// The message already captures the ambient cancellation token when it was created,
59+
// so we don't need to pass it again. This ensures resent messages preserve their original cancellation context.
5460
return multiplexer.ExecuteAsyncImpl<T>(message, processor, asyncState, server);
5561
}
5662

0 commit comments

Comments
 (0)