Skip to content

Commit ad84a78

Browse files
committed
implement AwaitableMutex for netfx
1 parent 4249939 commit ad84a78

3 files changed

Lines changed: 513 additions & 5 deletions

File tree

src/StackExchange.Redis/AwaitableMutex.net.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
using System.Threading;
22
using System.Threading.Tasks;
33

4-
// #if NET
4+
#if NET
55
namespace StackExchange.Redis;
66

77
internal partial struct AwaitableMutex
88
{
99
private readonly int _timeoutMilliseconds;
10+
11+
// note: this does not guarantee "fairness", but that's OK for our use-case - we mostly just want
12+
// a sync+async awaitable mutex, which this does; the .NET Framework version has a hand-written
13+
// implementation (see .netfx.cx for reasons), which *is* fair, but we'd rather not pay that overhead
14+
// here. Good-enough-is.
1015
private readonly SemaphoreSlim _mutex;
1116

1217
private partial AwaitableMutex(int timeoutMilliseconds)
@@ -28,4 +33,4 @@ public partial ValueTask<bool> TryTakeAsync(CancellationToken cancellationToken)
2833

2934
public partial void Release() => _mutex.Release();
3035
}
31-
// #endif
36+
#endif
Lines changed: 315 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,321 @@
1-
#if !NET
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
#if !NET
27
namespace StackExchange.Redis;
38

4-
// compensating for the fact that netfx SemaphoreSlim is kinda janky
5-
// (https://blog.marcgravell.com/2019/02/fun-with-spiral-of-death.html)
9+
/*
10+
Compensating for the fact that netfx SemaphoreSlim is kinda janky (https://blog.marcgravell.com/2019/02/fun-with-spiral-of-death.html).
11+
12+
This uses a simple queue of sync/async callers, and assumes a reasonable caller (the original MutexSlim is more defensive, as
13+
a general purpose public API).
14+
*/
15+
616
internal partial struct AwaitableMutex
717
{
18+
private readonly State _state;
19+
20+
private partial AwaitableMutex(int timeoutMilliseconds)
21+
{
22+
_state = new(timeoutMilliseconds);
23+
}
24+
25+
public partial void Dispose() => _state?.Dispose();
26+
27+
public partial bool IsAvailable => _state.IsAvailable;
28+
public partial int TimeoutMilliseconds => _state.TimeoutMilliseconds;
29+
30+
public partial bool TryTakeInstant() => _state.TryTakeInstant();
31+
32+
public partial ValueTask<bool> TryTakeAsync(CancellationToken cancellationToken)
33+
=> _state.TryTakeAsync(cancellationToken);
34+
35+
public partial bool TryTakeSync() => _state.TryTakeSync();
36+
37+
public partial void Release() => _state.Release();
38+
39+
private sealed class State : IDisposable
40+
{
41+
private readonly Queue<IPendingCaller> _queue = new();
42+
private bool _isHeld;
43+
private bool _isDisposed;
44+
45+
public State(int timeoutMilliseconds)
46+
{
47+
if (timeoutMilliseconds < Timeout.Infinite) ThrowOutOfRangeException();
48+
TimeoutMilliseconds = timeoutMilliseconds;
49+
50+
static void ThrowOutOfRangeException() => throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
51+
}
52+
53+
public int TimeoutMilliseconds { get; }
54+
55+
public bool IsAvailable
56+
{
57+
get
58+
{
59+
lock (_queue)
60+
{
61+
return !_isDisposed && !_isHeld && _queue.Count == 0;
62+
}
63+
}
64+
}
65+
66+
public bool TryTakeInstant()
67+
{
68+
lock (_queue)
69+
{
70+
ThrowIfDisposed();
71+
return TryTakeInsideLock();
72+
}
73+
}
74+
75+
public bool TryTakeSync()
76+
{
77+
var start = GetTime();
78+
SyncPendingCaller pending;
79+
lock (_queue)
80+
{
81+
ThrowIfDisposed();
82+
if (TryTakeInsideLock()) return true;
83+
if (TimeoutMilliseconds == 0) return false;
84+
85+
pending = new(start, TimeoutMilliseconds);
86+
_queue.Enqueue(pending);
87+
}
88+
89+
return pending.Wait();
90+
}
91+
92+
public ValueTask<bool> TryTakeAsync(CancellationToken cancellationToken)
93+
{
94+
lock (_queue)
95+
{
96+
ThrowIfDisposed();
97+
if (cancellationToken.IsCancellationRequested) return AsyncPendingCaller.Canceled();
98+
if (TryTakeInsideLock()) return new ValueTask<bool>(true);
99+
if (TimeoutMilliseconds == 0) return new ValueTask<bool>(false);
100+
if (cancellationToken.IsCancellationRequested) return AsyncPendingCaller.Canceled();
101+
102+
var pending = new AsyncPendingCaller(TimeoutMilliseconds, cancellationToken);
103+
_queue.Enqueue(pending);
104+
return new ValueTask<bool>(pending.Task);
105+
}
106+
}
107+
108+
public void Release()
109+
{
110+
lock (_queue)
111+
{
112+
ThrowIfDisposed();
113+
if (!_isHeld) ThrowNotHeld();
114+
115+
while (_queue.Count != 0)
116+
{
117+
if (_queue.Dequeue().TryGrant()) return;
118+
}
119+
120+
_isHeld = false;
121+
}
122+
123+
static void ThrowNotHeld() => throw new SemaphoreFullException();
124+
}
125+
126+
private bool TryTakeInsideLock()
127+
{
128+
if (_isHeld || _queue.Count != 0) return false;
129+
_isHeld = true;
130+
return true;
131+
}
132+
133+
public void Dispose()
134+
{
135+
lock (_queue)
136+
{
137+
if (_isDisposed) return;
138+
139+
_isDisposed = true;
140+
_isHeld = false;
141+
while (_queue.Count != 0)
142+
{
143+
_queue.Dequeue().Abort();
144+
}
145+
}
146+
}
147+
148+
private void ThrowIfDisposed()
149+
{
150+
if (_isDisposed) ThrowDisposed();
151+
}
152+
153+
private static void ThrowDisposed() => throw new ObjectDisposedException(nameof(AwaitableMutex));
154+
155+
private static uint GetTime() => (uint)Environment.TickCount;
156+
157+
private static int GetRemainingTimeout(uint startTime, int originalTimeoutMilliseconds)
158+
{
159+
if (originalTimeoutMilliseconds == Timeout.Infinite) return Timeout.Infinite;
160+
161+
var elapsedMilliseconds = GetTime() - startTime;
162+
if (elapsedMilliseconds > int.MaxValue) return 0;
163+
164+
var remaining = originalTimeoutMilliseconds - (int)elapsedMilliseconds;
165+
return remaining <= 0 ? 0 : remaining;
166+
}
167+
168+
private interface IPendingCaller
169+
{
170+
bool TryGrant();
171+
void Abort();
172+
}
173+
174+
private sealed class SyncPendingCaller : IPendingCaller
175+
{
176+
private readonly uint _start;
177+
private readonly int _timeoutMilliseconds;
178+
private bool _isComplete;
179+
private bool _wasGranted;
180+
private bool _wasAborted;
181+
182+
public SyncPendingCaller(uint start, int timeoutMilliseconds)
183+
{
184+
_start = start;
185+
_timeoutMilliseconds = timeoutMilliseconds;
186+
}
187+
188+
public bool Wait()
189+
{
190+
lock (this)
191+
{
192+
while (!_isComplete)
193+
{
194+
var remaining = GetRemainingTimeout(_start, _timeoutMilliseconds);
195+
if (remaining == 0)
196+
{
197+
_isComplete = true;
198+
return false;
199+
}
200+
201+
if (remaining == Timeout.Infinite)
202+
{
203+
Monitor.Wait(this);
204+
}
205+
else
206+
{
207+
Monitor.Wait(this, remaining);
208+
}
209+
}
210+
211+
if (_wasAborted) ThrowDisposed();
212+
return _wasGranted;
213+
}
214+
}
215+
216+
public bool TryGrant()
217+
{
218+
lock (this)
219+
{
220+
if (_isComplete) return false;
221+
_wasGranted = true;
222+
_isComplete = true;
223+
Monitor.Pulse(this);
224+
return true;
225+
}
226+
}
227+
228+
public void Abort()
229+
{
230+
lock (this)
231+
{
232+
if (_isComplete) return;
233+
_wasAborted = true;
234+
_isComplete = true;
235+
Monitor.Pulse(this);
236+
}
237+
}
238+
}
239+
240+
private sealed class AsyncPendingCaller : TaskCompletionSource<bool>, IPendingCaller
241+
{
242+
private static readonly TimerCallback s_onTimeout = state => ((AsyncPendingCaller)state!).TryComplete(CompletionState.TimedOut);
243+
private static readonly Action<object?> s_onCanceled = state => ((AsyncPendingCaller)state!).TryComplete(CompletionState.Canceled);
244+
private static readonly Task<bool> s_canceledTask = CreateCanceledTask();
245+
246+
private readonly CancellationTokenRegistration _cancellation;
247+
private readonly Timer? _timeout;
248+
private int _completionState;
249+
250+
public AsyncPendingCaller(int timeoutMilliseconds, CancellationToken cancellationToken)
251+
: base(TaskCreationOptions.RunContinuationsAsynchronously)
252+
{
253+
if (timeoutMilliseconds != Timeout.Infinite)
254+
{
255+
_timeout = new Timer(s_onTimeout, this, timeoutMilliseconds, Timeout.Infinite);
256+
}
257+
258+
if (cancellationToken.CanBeCanceled)
259+
{
260+
_cancellation = cancellationToken.Register(s_onCanceled, this);
261+
}
262+
}
263+
264+
public static ValueTask<bool> Canceled() => new(s_canceledTask);
265+
266+
public bool TryGrant() => TryComplete(CompletionState.Granted);
267+
268+
public void Abort() => TryComplete(CompletionState.Disposed);
269+
270+
private bool TryComplete(CompletionState completionState)
271+
{
272+
var newState = (int)completionState;
273+
if (Interlocked.CompareExchange(ref _completionState, newState, (int)CompletionState.Pending) != (int)CompletionState.Pending)
274+
{
275+
return false;
276+
}
277+
278+
if (completionState != CompletionState.TimedOut) _timeout?.Dispose();
279+
if (completionState != CompletionState.Canceled) _cancellation.Dispose();
280+
Complete(completionState);
281+
return true;
282+
}
283+
284+
private void Complete(CompletionState completionState)
285+
{
286+
switch (completionState)
287+
{
288+
case CompletionState.Granted:
289+
TrySetResult(true);
290+
break;
291+
case CompletionState.TimedOut:
292+
TrySetResult(false);
293+
break;
294+
case CompletionState.Canceled:
295+
TrySetCanceled();
296+
break;
297+
case CompletionState.Disposed:
298+
TrySetException(new ObjectDisposedException(nameof(AwaitableMutex)));
299+
break;
300+
}
301+
}
302+
303+
private static Task<bool> CreateCanceledTask()
304+
{
305+
var source = new TaskCompletionSource<bool>();
306+
source.SetCanceled();
307+
return source.Task;
308+
}
309+
310+
private enum CompletionState
311+
{
312+
Pending = 0,
313+
Granted = 1,
314+
TimedOut = 2,
315+
Canceled = 3,
316+
Disposed = 4,
317+
}
318+
}
319+
}
8320
}
9321
#endif

0 commit comments

Comments
 (0)