Skip to content

Commit 1bc0eec

Browse files
committed
Improved end-of-buffer detection for the producer side
1 parent 54a14ed commit 1bc0eec

1 file changed

Lines changed: 96 additions & 95 deletions

File tree

src/DotNext.Threading/Collections/Concurrent/RingBuffer.cs

Lines changed: 96 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System.Diagnostics;
2-
using System.Diagnostics.Contracts;
32
using System.Numerics;
43
using System.Runtime.CompilerServices;
54
using System.Runtime.InteropServices;
@@ -13,21 +12,22 @@ namespace DotNext.Collections.Concurrent;
1312
internal struct RingBuffer<T>
1413
{
1514
private readonly Slot[] slots;
15+
private readonly nuint indexMask;
16+
private readonly int indexBits;
1617
private bool frozenForEnqueues;
17-
private RingBuffer state;
18+
private State state;
1819

19-
public RingBuffer(int maximumRetained)
20+
public RingBuffer(int desiredSize)
2021
{
21-
Debug.Assert(maximumRetained > 0);
22+
Debug.Assert(desiredSize > 0);
2223

23-
var length = nuint.CreateChecked(BitOperations.RoundUpToPowerOf2((ulong)(uint)maximumRetained));
24+
var length = nuint.CreateChecked(BitOperations.RoundUpToPowerOf2((ulong)(uint)desiredSize));
2425
slots = new Slot[length];
25-
state = new(length);
26+
indexMask = length - 1U;
27+
indexBits = int.CreateChecked(nuint.Log2(length));
2628
frozenForEnqueues = false;
2729
}
28-
29-
public readonly bool IsFrozen => Volatile.Read(in frozenForEnqueues);
30-
30+
3131
public bool Freeze()
3232
{
3333
var frozen = Interlocked.FalseToTrue(ref frozenForEnqueues);
@@ -39,32 +39,36 @@ public bool Freeze()
3939

4040
private void WaitForPendingEnqueues()
4141
{
42-
Debug.Assert(frozenForEnqueues);
42+
Debug.Assert(IsFrozen);
4343

4444
// the slots prior to the frozen position can be still in progress by the enqueuer, so wait for it
4545
for (var position = FreezeProducer() - 1U;
46-
this[state.GetIndex(position)].WaitForPendingEnqueue(state.GetGeneration(position));
46+
this[GetIndex(position)].WaitForPendingEnqueue(GetGeneration(position));
4747
position--) ;
4848
}
4949

5050
private nuint FreezeProducer()
5151
{
52-
Debug.Assert(frozenForEnqueues);
52+
Debug.Assert(IsFrozen);
5353

5454
const nuint shift = 2U;
55-
var current = state.Positions.Producer;
56-
for (nuint tmp, offset = (uint)slots.Length * shift;; current = tmp)
55+
var current = state.Producer;
56+
for (nuint tmp, offset = Array.GetLength(slots) * shift;; current = tmp)
5757
{
5858
// Advances the producer too far forward, so this position cannot be reached naturally even
5959
// if the buffer is full. 2 generations forward is enough, because the items in the buffer
6060
// can be in the current and the next generation.
61-
tmp = Interlocked.CompareExchange(ref state.Positions.Producer, current + offset, current);
61+
tmp = Interlocked.CompareExchange(ref state.Producer, current + offset, current);
6262
if (tmp == current)
6363
break;
6464
}
6565

6666
return current;
6767
}
68+
69+
private static nuint StateBit => (nuint)nint.MinValue;
70+
71+
public readonly bool IsFrozen => Volatile.Read(in frozenForEnqueues);
6872

6973
public readonly int Length => slots.Length;
7074

@@ -79,40 +83,40 @@ private readonly ref Slot this[nuint index]
7983
}
8084

8185
public ref Slot TryDequeue(out nuint sequence)
82-
=> ref DoOperation<RingBuffer.DequeueOperation>(ref state.Positions.Consumer, out sequence);
86+
=> ref DoOperation<DequeueOperation>(out sequence);
8387

8488
public readonly bool IsEmpty
8589
{
8690
get
8791
{
88-
var position = state.Positions.Consumer;
89-
var generation = state.GetGeneration(position);
92+
var position = Consumer;
93+
var generation = GetGeneration(position);
9094

91-
return this[state.GetIndex(position)].Sequence != (generation | RingBuffer.StateBit);
95+
return this[GetIndex(position)].Sequence != (generation | StateBit);
9296
}
9397
}
9498

9599
public ref Slot TryEnqueue(out nuint sequence)
96-
=> ref DoOperation<RingBuffer.EnqueueOperation>(ref state.Positions.Producer, out sequence);
97-
98-
private readonly ref Slot DoOperation<TOperation>(scoped ref nuint position, out nuint newSeq)
99-
where TOperation : struct, RingBuffer.IOperation<TOperation>, allows ref struct
100+
=> ref DoOperation<EnqueueOperation>(out sequence);
101+
102+
private ref Slot DoOperation<TOperation>(out nuint newSeq)
103+
where TOperation : struct, IOperation<TOperation>, allows ref struct
100104
{
101105
var spinner = new SpinWait();
102-
for (nuint positionCopy = Volatile.Read(in position), tmp; TOperation.Retry(in frozenForEnqueues); positionCopy = tmp)
106+
for (nuint positionCopy = TOperation.GetPosition(ref state), tmp; TOperation.Retry(in frozenForEnqueues); positionCopy = tmp)
103107
{
104-
ref var slot = ref this[state.GetIndex(positionCopy)];
105-
var context = TOperation.Create(in state, positionCopy);
108+
ref var slot = ref this[GetIndex(positionCopy)];
109+
var context = TOperation.Create(GetGeneration(positionCopy));
106110

107111
if (!context.IsValidSequence(slot.Sequence))
108112
{
109-
if (!context.CanRetry(in state))
113+
if (!TOperation.CanRetry(in this, positionCopy))
110114
break;
111115

112116
tmp = positionCopy + 1U;
113117
spinner.SpinOnce(sleep1Threshold: -1);
114118
}
115-
else if ((tmp = Interlocked.CompareExchange(ref position, positionCopy + 1U, positionCopy)) == positionCopy)
119+
else if ((tmp = Interlocked.CompareExchange(ref TOperation.GetPosition(ref state), positionCopy + 1U, positionCopy)) == positionCopy)
116120
{
117121
newSeq = context.NextSequence;
118122
return ref slot;
@@ -122,110 +126,107 @@ private readonly ref Slot DoOperation<TOperation>(scoped ref nuint position, out
122126
Unsafe.SkipInit(out newSeq);
123127
return ref Unsafe.NullRef<Slot>();
124128
}
125-
126-
[StructLayout(LayoutKind.Auto)]
127-
public struct Slot
128-
{
129-
public T? Item;
130-
public volatile nuint Sequence; // higher bit is reserved for the value presence
131-
132-
public readonly bool WaitForPendingEnqueue(nuint frozenGen)
133-
{
134-
// The slot can be in three states:
135-
// 1. Enqueued, so Sequence == (frozenGen | StateBit) => skip it and check the previous slot
136-
// 2. Dequeued, so Sequence == (frozenGen + 1) & ~StateBit => leave the method
137-
// 3. In-flight, so Sequence == frozenGen => wait for Enqueued or Dequeued
138-
nuint sequence;
139-
for (var spinner = new SpinWait();
140-
(sequence = Sequence) == frozenGen;
141-
spinner.SpinOnce()) ;
142-
143-
return sequence == (frozenGen | RingBuffer.StateBit);
144-
}
145-
}
146-
}
147-
148-
[StructLayout(LayoutKind.Sequential)]
149-
internal struct RingBuffer(nuint length)
150-
{
151-
private readonly nuint indexMask = length - 1U;
152-
private readonly int indexBits = int.CreateChecked(nuint.Log2(length));
153-
public State Positions;
154-
155-
[Pure]
156-
public readonly nuint GetGeneration(nuint position) => position >>> indexBits;
157-
158-
[Pure]
159-
public readonly nuint GetIndex(nuint position) => position & indexMask;
160129

161-
public static nuint StateBit => (nuint)nint.MinValue;
130+
private readonly nuint GetGeneration(nuint position) => position >>> indexBits;
162131

163-
public interface IOperation<out TSelf>
164-
where TSelf : struct, IOperation<TSelf>, allows ref struct
132+
private readonly nuint GetIndex(nuint position) => position & indexMask;
133+
134+
private interface IOperation
165135
{
136+
static abstract ref nuint GetPosition(ref State state);
137+
166138
static abstract bool Retry(ref readonly bool frozenForEnqueues);
167139

168-
bool CanRetry(scoped ref readonly RingBuffer state);
140+
static abstract bool CanRetry(ref readonly RingBuffer<T> state, nuint position);
169141

170142
nuint NextSequence { get; }
171143

172144
bool IsValidSequence(nuint sequence);
145+
}
173146

174-
public static abstract TSelf Create(scoped ref readonly RingBuffer state, nuint position);
147+
private interface IOperation<out TSelf> : IOperation
148+
where TSelf : struct, IOperation<TSelf>, allows ref struct
149+
{
150+
public static abstract TSelf Create(nuint generation);
175151
}
176152

177153
[StructLayout(LayoutKind.Auto)]
178-
public readonly ref struct EnqueueOperation : IOperation<EnqueueOperation>
154+
private readonly ref struct EnqueueOperation : IOperation<EnqueueOperation>
179155
{
180-
private readonly nuint generation, position;
181-
182-
private EnqueueOperation(scoped ref readonly RingBuffer state, nuint producerPosition)
183-
=> generation = state.GetGeneration(position = producerPosition);
156+
private readonly nuint generation;
184157

185-
bool IOperation<EnqueueOperation>.CanRetry(ref readonly RingBuffer state)
186-
{
187-
var consumerPos = Volatile.Read(in state.Positions.Consumer);
158+
private EnqueueOperation(nuint generation) => this.generation = generation;
188159

189-
// the consumer must be in the same generation as the producer, or producer position must be less than the consumer position
190-
return state.GetGeneration(consumerPos) == generation || state.GetIndex(position) < state.GetIndex(consumerPos);
191-
}
160+
static bool IOperation.CanRetry(ref readonly RingBuffer<T> state, nuint producerPosition)
161+
=> producerPosition != state.ConsumerNextGen;
192162

193-
static bool IOperation<EnqueueOperation>.Retry(ref readonly bool frozenForEnqueues) => !Volatile.Read(in frozenForEnqueues);
163+
static bool IOperation.Retry(ref readonly bool frozenForEnqueues) => !Volatile.Read(in frozenForEnqueues);
194164

195165
// the slot becomes available for consumption in the current generation
196-
nuint IOperation<EnqueueOperation>.NextSequence => generation | StateBit;
166+
nuint IOperation.NextSequence => generation | StateBit;
197167

198-
bool IOperation<EnqueueOperation>.IsValidSequence(nuint sequence) => sequence == generation;
168+
bool IOperation.IsValidSequence(nuint sequence) => sequence == generation;
199169

200-
static EnqueueOperation IOperation<EnqueueOperation>.Create(scoped ref readonly RingBuffer state, nuint producerPosition)
201-
=> new(in state, producerPosition);
170+
static EnqueueOperation IOperation<EnqueueOperation>.Create(nuint generation)
171+
=> new(generation);
172+
173+
static ref nuint IOperation.GetPosition(ref State state) => ref state.Producer;
202174

203175
public override string ToString() => generation.ToString("X");
204176
}
205177

206178
[StructLayout(LayoutKind.Auto)]
207-
public readonly ref struct DequeueOperation : IOperation<DequeueOperation>
179+
private readonly ref struct DequeueOperation : IOperation<DequeueOperation>
208180
{
209-
private readonly nuint generation, position;
181+
private readonly nuint generation;
210182

211-
private DequeueOperation(scoped ref readonly RingBuffer state, nuint consumerPosition)
212-
=> generation = state.GetGeneration(position = consumerPosition);
183+
private DequeueOperation(nuint generation) => this.generation = generation;
213184

214-
static bool IOperation<DequeueOperation>.Retry(ref readonly bool frozenForEnqueues) => true;
185+
static bool IOperation.Retry(ref readonly bool frozenForEnqueues) => true;
215186

216-
bool IOperation<DequeueOperation>.CanRetry(ref readonly RingBuffer state)
217-
=> position != Volatile.Read(in state.Positions.Producer);
187+
static bool IOperation.CanRetry(ref readonly RingBuffer<T> state, nuint consumerPosition)
188+
=> consumerPosition != state.Producer;
218189

219190
// the slot can be occupied in the next generation only
220-
nuint IOperation<DequeueOperation>.NextSequence => (generation + 1U) & ~StateBit;
191+
nuint IOperation.NextSequence => (generation + 1U) & ~StateBit;
221192

222-
bool IOperation<DequeueOperation>.IsValidSequence(nuint sequence) => sequence == (generation | StateBit);
193+
bool IOperation.IsValidSequence(nuint sequence) => sequence == (generation | StateBit);
223194

224-
static DequeueOperation IOperation<DequeueOperation>.Create(scoped ref readonly RingBuffer state, nuint consumerPosition)
225-
=> new(in state, consumerPosition);
195+
static DequeueOperation IOperation<DequeueOperation>.Create(nuint generation)
196+
=> new(generation);
197+
198+
static ref nuint IOperation.GetPosition(ref State state) => ref state.Consumer;
226199

227200
public override string ToString() => generation.ToString("X");
228201
}
202+
203+
private readonly nuint Producer => Volatile.Read(in state.Producer);
204+
205+
private readonly nuint Consumer => Volatile.Read(in state.Consumer);
206+
207+
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
208+
private readonly nuint ConsumerNextGen => Volatile.Read(in state.Consumer) + (uint)slots.Length;
209+
210+
[StructLayout(LayoutKind.Auto)]
211+
public struct Slot
212+
{
213+
public T? Item;
214+
public volatile nuint Sequence; // higher bit is reserved for the value presence
215+
216+
public readonly bool WaitForPendingEnqueue(nuint frozenGen)
217+
{
218+
// The slot can be in three states:
219+
// 1. Enqueued, so Sequence == (frozenGen | StateBit) => skip it and check the previous slot
220+
// 2. Dequeued, so Sequence == (frozenGen + 1) & ~StateBit => leave the method
221+
// 3. In-flight, so Sequence == frozenGen => wait for Enqueued or Dequeued
222+
nuint sequence;
223+
for (var spinner = new SpinWait();
224+
(sequence = Sequence) == frozenGen;
225+
spinner.SpinOnce()) ;
226+
227+
return sequence == (frozenGen | StateBit);
228+
}
229+
}
229230
}
230231

231232
// producer/consumer positions are used by different threads, so it's better to avoid memory cache sharing

0 commit comments

Comments
 (0)