Skip to content

Commit cb174c1

Browse files
MPSCQueue: add queue size
1 parent df94019 commit cb174c1

File tree

4 files changed

+42
-6
lines changed

4 files changed

+42
-6
lines changed

Common/interface/MPSCQueue.hpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ class MPSCQueue
8787
Node* node = AllocateNode(std::move(value));
8888
node->pNext.store(nullptr, std::memory_order_relaxed);
8989

90+
// Increment size before pushing to ensure that the element is not
91+
// dequeued before m_Size reflects the new element.
92+
m_Size.fetch_add(1, std::memory_order_relaxed);
93+
9094
// Standard Lock-Free Enqueue
9195
Node* prev = m_Tail.exchange(node, std::memory_order_acq_rel);
9296
prev->pNext.store(node, std::memory_order_release);
@@ -110,18 +114,39 @@ class MPSCQueue
110114
m_Head = next;
111115
result = std::move(next->Value);
112116

117+
m_Size.fetch_sub(1, std::memory_order_relaxed);
118+
113119
// Current head became the old dummy node, recycle it
114120
RecycleNode(head);
115121

116122
return true;
117123
}
118124

119125
/// Checks if the queue is empty.
126+
127+
/// The method is safe to call concurrently with producers, but must only be
128+
/// called by the consumer thread.
129+
///
130+
/// \note In the presence of concurrent producers, this method may temporarily
131+
/// report the queue as empty while an enqueue is still in progress and
132+
/// the new node has not yet been linked into the queue.
120133
bool IsEmpty() const
121134
{
122135
return m_Head->pNext.load(std::memory_order_acquire) == nullptr;
123136
}
124137

138+
/// Returns the approximate number of items in the queue.
139+
140+
/// \note This value is not exact in the presence of concurrent producers and
141+
/// consumer. It may temporarily overestimate the number of items due to
142+
/// in-flight enqueue operations, and may disagree with IsEmpty() or a
143+
/// subsequent Dequeue() call. It must not be used to predict whether
144+
/// Dequeue() will succeed.
145+
size_t Size() const
146+
{
147+
return m_Size.load(std::memory_order_relaxed);
148+
}
149+
125150
private:
126151
struct Node
127152
{
@@ -196,6 +221,8 @@ class MPSCQueue
196221
// Consumer Data (Hot)
197222
alignas(CacheLineSize) Node* m_Head = nullptr;
198223

224+
std::atomic<size_t> m_Size{0};
225+
199226
// Free List (Shared - Moderate Contention)
200227
// The lock protects the "Pop" side from other Producers
201228
std::mutex m_FreeHeadMtx;

Graphics/GraphicsTools/include/GPUUploadManagerImpl.hpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class GPUUploadManagerImpl final : public ObjectBase<IGPUUploadManager>
138138
Uint32 GetSize() const { return m_Size; }
139139

140140
// Returns the number of pending operations.
141-
size_t GetNumPendingOps() const { return m_NumPendingOps.load(std::memory_order_acquire); }
141+
size_t GetNumPendingOps() const { return m_PendingOps.Size(); }
142142

143143
#ifdef DILIGENT_DEBUG
144144
// Returns the number of active writers. This is used for testing and debugging purposes.
@@ -171,8 +171,7 @@ class GPUUploadManagerImpl final : public ObjectBase<IGPUUploadManager>
171171
static constexpr Uint32 WRITER_MASK = ~SEALED_BIT; // low 31 bits
172172
std::atomic<Uint32> m_State{0};
173173

174-
std::atomic<size_t> m_NumPendingOps{0};
175-
std::atomic<bool> m_Enqueued{false};
174+
std::atomic<bool> m_Enqueued{false};
176175

177176
Uint64 m_FenceValue = 0;
178177

Graphics/GraphicsTools/src/GPUUploadManagerImpl.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,6 @@ bool GPUUploadManagerImpl::Page::ScheduleBufferUpdate(const ScheduleBufferUpdate
251251
Op.DstOffset = UpdateInfo.DstOffset;
252252
Op.NumBytes = UpdateInfo.NumBytes;
253253
m_PendingOps.Enqueue(std::move(Op));
254-
m_NumPendingOps.fetch_add(1, std::memory_order_acq_rel);
255254

256255
return true;
257256
}
@@ -288,7 +287,6 @@ void GPUUploadManagerImpl::Page::ExecutePendingOps(IDeviceContext* pContext, Uin
288287
}
289288
}
290289
}
291-
m_NumPendingOps.store(0);
292290
m_FenceValue = FenceValue;
293291
}
294292

@@ -300,7 +298,6 @@ void GPUUploadManagerImpl::Page::Reset(IDeviceContext* pContext)
300298
m_Offset.store(0);
301299
// Keep the page sealed until we make it current to prevent any accidental writes.
302300
m_State.store(SEALED_BIT);
303-
m_NumPendingOps.store(0);
304301
m_Enqueued.store(false);
305302
m_FenceValue = 0;
306303

Tests/DiligentCoreTest/src/Common/MPSCQueueTest.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,30 +46,37 @@ TEST(Common_MPSCQueue, EnqueueDequeue)
4646
Queue.Enqueue(84);
4747
Queue.Enqueue(126);
4848
EXPECT_FALSE(Queue.IsEmpty());
49+
EXPECT_EQ(Queue.Size(), size_t{3});
4950

5051
int Value = 0;
5152
EXPECT_TRUE(Queue.Dequeue(Value));
5253
EXPECT_EQ(Value, 42);
54+
EXPECT_EQ(Queue.Size(), size_t{2});
5355
EXPECT_TRUE(Queue.Dequeue(Value));
5456
EXPECT_EQ(Value, 84);
57+
EXPECT_EQ(Queue.Size(), size_t{1});
5558
EXPECT_TRUE(Queue.Dequeue(Value));
5659
EXPECT_EQ(Value, 126);
60+
EXPECT_EQ(Queue.Size(), size_t{0});
5761

5862
EXPECT_TRUE(Queue.IsEmpty());
5963
EXPECT_FALSE(Queue.Dequeue(Value));
6064

6165
Queue.Enqueue(168);
6266
Queue.Enqueue(210);
67+
EXPECT_EQ(Queue.Size(), size_t{2});
6368
EXPECT_TRUE(Queue.Dequeue(Value));
6469
EXPECT_EQ(Value, 168);
6570
EXPECT_TRUE(Queue.Dequeue(Value));
6671
EXPECT_EQ(Value, 210);
6772

73+
EXPECT_EQ(Queue.Size(), size_t{0});
6874
EXPECT_TRUE(Queue.IsEmpty());
6975
EXPECT_FALSE(Queue.Dequeue(Value));
7076

7177
Queue.Enqueue(252);
7278
Queue.Enqueue(294);
79+
EXPECT_EQ(Queue.Size(), size_t{2});
7380
}
7481

7582
TEST(Common_MPSCQueue, EnqueueDequeueMoveOnly)
@@ -79,6 +86,7 @@ TEST(Common_MPSCQueue, EnqueueDequeueMoveOnly)
7986
Queue.Enqueue(std::make_unique<int>(42));
8087
Queue.Enqueue(std::make_unique<int>(84));
8188
Queue.Enqueue(std::make_unique<int>(126));
89+
EXPECT_EQ(Queue.Size(), size_t{3});
8290

8391
std::unique_ptr<int> Value;
8492
EXPECT_TRUE(Queue.Dequeue(Value));
@@ -87,20 +95,24 @@ TEST(Common_MPSCQueue, EnqueueDequeueMoveOnly)
8795
EXPECT_EQ(*Value, 84);
8896
EXPECT_TRUE(Queue.Dequeue(Value));
8997
EXPECT_EQ(*Value, 126);
98+
EXPECT_EQ(Queue.Size(), size_t{0});
9099

91100
EXPECT_FALSE(Queue.Dequeue(Value));
92101

93102
Queue.Enqueue(std::make_unique<int>(168));
94103
Queue.Enqueue(std::make_unique<int>(210));
104+
EXPECT_EQ(Queue.Size(), size_t{2});
95105
EXPECT_TRUE(Queue.Dequeue(Value));
96106
EXPECT_EQ(*Value, 168);
97107
EXPECT_TRUE(Queue.Dequeue(Value));
98108
EXPECT_EQ(*Value, 210);
109+
EXPECT_EQ(Queue.Size(), size_t{0});
99110

100111
EXPECT_FALSE(Queue.Dequeue(Value));
101112

102113
Queue.Enqueue(std::make_unique<int>(252));
103114
Queue.Enqueue(std::make_unique<int>(294));
115+
EXPECT_EQ(Queue.Size(), size_t{2});
104116
}
105117

106118

@@ -157,6 +169,7 @@ TEST(Common_MPSCQueue, EnqueueDequeueParallel)
157169
Consumer.join();
158170

159171
EXPECT_TRUE(Queue.IsEmpty());
172+
EXPECT_EQ(Queue.Size(), size_t{0});
160173

161174
EXPECT_EQ(ProducedData.size(), NumProducers * NumItemsPerProducer);
162175
std::vector<Uint32> CurrProducedValue(NumProducers, 0);

0 commit comments

Comments
 (0)