Skip to content

Commit 45f7687

Browse files
authored
Add article on MPSC lock-free queue implementation
本文详细介绍了一个基于 C++11 std::atomic 实现的高性能 MPSC (多生产单消费) 无锁队列,分析了内存序和位运算优化等关键技术。
1 parent 6a2b05e commit 45f7687

1 file changed

Lines changed: 255 additions & 0 deletions

File tree

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
Title: 一个“一读多写”无锁队列的实现
2+
Date: 2026-01-17 20:00
3+
Tags: C++, Concurrency, Lock-Free, MPSC, Atomic, Memory Model
4+
Slug: implementation-of-mpsc-lock-free-queue-cpp
5+
Summary: 本文详细介绍了一个基于 C++11 std::atomic 实现的高性能 MPSC (多生产单消费) 无锁队列。文章深入剖析了如何利用 Relaxed/Acquire/Release 内存序构建 Happens-Before 关系,通过位运算优化取模开销,并最终实现线程安全的无锁同步机制。
6+
7+
# 一个“一读多写”无锁队列的实现
8+
9+
标签(空格分隔): C++ 并发编程 Lock-Free 性能优化
10+
11+
---
12+
13+
## 1. 背景与设计目标
14+
15+
在高性能网络编程中,我们经常面临**“一读多写”(MPSC, Multi-Producer Single-Consumer)**的场景,例如多个工作线程需要向同一个 Socket 串行写入数据。
16+
17+
为了避免互斥锁(Mutex)带来的上下文切换开销,我们设计了一个基于 C++11 `std::atomic` 的定长 Ring Buffer。其核心目标包括:
18+
19+
* **零内存动态分配**:使用定长数组,避免频繁 `new/delete`
20+
* **高效位运算**:将队列长度限制为 2 的 N 次幂,利用位与运算(`&`)替代昂贵的取模操作(`%`)。
21+
* **细粒度内存序**:摒弃粗暴的 `seq_cst`,通过 `relaxed``acquire``release` 的精细组合,构建最小开销的同步机制。
22+
23+
---
24+
25+
## 2. 生产者逻辑:try_enqueue
26+
27+
生产者的核心挑战在于:如何在多个线程同时竞争写入时,安全地分配 Slot 并发布数据。
28+
29+
### 代码实现
30+
31+
```cpp
32+
bool try_enqueue(T* item) noexcept
33+
{
34+
if (item == nullptr) return false;
35+
36+
// 1. 获取当前尾部索引的快照(无需同步)
37+
size_t cur_tail = m_tail.load(std::memory_order_relaxed);
38+
39+
for (;;)
40+
{
41+
const size_t next_tail = (cur_tail + 1) & m_mask;
42+
43+
// 2. 获取头部索引(需要同步,以感知消费者的进度)
44+
const size_t head = m_head.load(std::memory_order_acquire);
45+
46+
// 队列满
47+
if (next_tail == head) return false;
48+
49+
// 3. 尝试抢占 Slot (CAS)
50+
if (m_tail.compare_exchange_weak(
51+
cur_tail,
52+
next_tail,
53+
std::memory_order_acq_rel, // 成功:建立同步点
54+
std::memory_order_relaxed)) // 失败:仅重试
55+
{
56+
// 4. 抢占成功,物理写入数据并发布
57+
m_slots[cur_tail].store(item, std::memory_order_release);
58+
return true;
59+
}
60+
61+
// CAS 失败:cur_tail 已被自动更新为最新值,进入下一次循环重试
62+
}
63+
}
64+
65+
```
66+
67+
### 深度拆解:生产者入队 (try_enqueue) 的三部曲
68+
69+
一个标准的无锁入队操作,在逻辑上并非单一步骤,而是一个严密的**“观察 — 抢占 — 发布”**协议。
70+
71+
#### 1. 第一阶段:观察 (Observation)
72+
73+
在动手修改任何状态前,生产者必须先看清楚局势。
74+
75+
```cpp
76+
size_t cur_tail = m_tail.load(std::memory_order_relaxed);
77+
// ...
78+
const size_t head = m_head.load(std::memory_order_acquire);
79+
80+
```
81+
82+
* **`tail` (Relaxed)**:这里读取的 `tail` 仅仅是一个“猜测值”,用于后续 CAS 的基准。即使读到了旧值,CAS 也会失败并自动重试,因此不需要同步代价高昂的内存序。
83+
* **`head` (Acquire)**:这是一个关键的同步点。必须使用 `acquire` 语义,确保生产者能**“看到”**消费者之前释放 Slot 的操作(即 `consumer-release` `producer-acquire`)。如果不加同步,生产者可能错误地认为队列未满,覆盖了尚未被消费的数据。
84+
85+
#### 2. 第二阶段:抢占 (Logical Claim) —— 核心中的核心
86+
87+
这是整个无锁队列最复杂、技术含量最高的一行代码。生产者试图将 `tail` 指针向后移动一位,从而宣告对当前 Slot 的所有权。
88+
89+
```cpp
90+
if (m_tail.compare_exchange_weak(cur_tail, next_tail,
91+
std::memory_order_acq_rel,
92+
std::memory_order_relaxed))
93+
94+
```
95+
96+
这里包含了三个深度的技术决策,我们需要逐一拆解:
97+
98+
** 为什么选 `compare_exchange_weak` 而不是 `strong`**
99+
100+
```cpp
101+
m_tail.compare_exchange_weak(cur_tail, next_tail, ...)
102+
```
103+
104+
在底层硬件(尤其是 ARM/PowerPC 架构)上,CAS 指令可能出现**“伪失败” (Spurious Failure)**——即原子变量的值虽未被修改,但由于总线微观竞争或缓存行失效,指令依然返回 `false`
105+
106+
针对这种情况,C++ 提供了两个版本:
107+
108+
* **Strong 版本(双重循环)**
109+
它承诺“只要值没变,我就帮你试到成功为止”。为此,编译器会在底层生成一个**隐式的内层循环**来屏蔽伪失败。这意味着你的代码变成了嵌套结构:外层是业务重试循环,内层是 CAS 的“原地死磕”循环。
110+
* **Weak 版本(扁平循环)**
111+
它允许伪失败,直接返回 `false`。这使得代码维持高效的**单层循环**结构。
112+
113+
**决策理由:计算成本决定了一切**
114+
115+
既然无论用哪种版本,遇到真正的并发冲突(值被改了)都必须重试,那么选择的关键在于:**当发生伪失败时,“重头再来”的代价有多大?**
116+
117+
1. **在本队列场景下(选 Weak):**
118+
CAS 之前的准备工作(计算 `next_tail`)仅涉及极低成本的位运算。如果使用 `Strong`,是为了省去一次极快的位运算而引入复杂的内层循环和分支跳转,属于“杀鸡用牛刀”。使用 `Weak`,一旦伪失败直接回到外层循环重试,这种**扁平模式**对 CPU 分支预测器更友好,性能更高。
119+
2. **反之,什么时候该用 Strong?**
120+
如果 CAS 之前的**计算逻辑非常昂贵**(例如耗时 5ms 的复杂数学运算),一旦发生伪失败,我们绝对不想回到外层去重新计算一遍数据。此时必须用 `Strong`,在指令层面上死磕,直到成功或检测到值真的发生了变化,从而避免昂贵的**“业务逻辑级重试”**
121+
122+
** 为什么内存序是 `std::memory_order_acq_rel`**
123+
124+
此处 `m_tail` 充当了同步枢纽,我们需要它同时发挥双向屏障的作用:
125+
126+
* **Acquire (向后屏障)**
127+
防止**后续**对 Slot 的写入操作(Step 3)被 CPU 或编译器重排到 CAS **之前**
128+
> **风险**:如果没有 Acquire,可能出现“还没抢到座位(CAS 未完),就开始往座位上放东西(Write Slot)”的情况,导致数据踩踏。
129+
130+
* **Release (向前屏障)**
131+
确保**此前**所有的依赖计算(如 `item` 的准备)对其他线程可见。同时,它向消费者宣告:“`tail` 已经推进,这个位置归我了”。
132+
133+
** 为什么不直接用默认的 `std::memory_order_seq_cst`**
134+
135+
很多开发者习惯一把梭使用 `seq_cst`(顺序一致性),它是 C++ 原子操作的默认选项,提供全局最强的同步保证。但在高性能场景下,它是我们极力避免的。
136+
137+
* **开销巨大**:在 x86 架构下,`seq_cst` 的 Store 操作通常需要 `MFENCE` 指令或 `LOCK` 前缀,这会强制刷新 CPU 的 Store Buffer,导致流水线停顿,开销可达几十到上百个时钟周期。
138+
* **需求过剩**:我们只需要**成对的同步 (Pairwise Synchronization)**——即生产者与消费者之间达成共识。我们并不需要整个系统的所有变量维持一个全局的全序关系。
139+
140+
> **优化结论**:使用 `acq_rel`,我们构建了刚好够用的 Happens-Before 关系,既保证了安全性,又避免了 `seq_cst` 带来的总线同步风暴。
141+
142+
#### 3. 第三阶段:发布 (Physical Publish)
143+
144+
一旦 CAS 成功,当前线程就拥有了 `cur_tail` 指向的 Slot 的独占写入权。
145+
146+
```cpp
147+
m_slots[cur_tail].store(item, std::memory_order_release);
148+
149+
```
150+
151+
* **Release 语义**:这是数据真正对外界“可见”的时刻。它保证了:当消费者随后读取到这个非空指针时,指针所指向的对象内容(`item`)一定已经初始化完毕。
152+
* **配合 Consumer**:这与消费者侧的 `load(..., acquire)` 完美配对,构成了完整的数据发布-订阅链条。
153+
154+
---
155+
156+
## 3. 消费者逻辑:try_dequeue
157+
158+
消费者的处理流程相对线性,但包含了一个非常关键的“等待”逻辑。
159+
160+
### 代码实现
161+
162+
```cpp
163+
T* try_dequeue() noexcept
164+
{
165+
// 1. 本地状态读取(relaxed)
166+
const size_t head = m_head.load(std::memory_order_relaxed);
167+
// 2. 获取尾部索引(acquire,感知生产者的发布)
168+
const size_t tail = m_tail.load(std::memory_order_acquire);
169+
170+
if (head == tail) return nullptr;
171+
172+
T* item = nullptr;
173+
174+
// 3. 关键:自旋等待数据发布完成
175+
while ((item = m_slots[head].load(std::memory_order_acquire)) == nullptr)
176+
{
177+
std::this_thread::yield();
178+
}
179+
180+
// 4. 消费完成,回收 Slot
181+
m_slots[head].store(nullptr, std::memory_order_relaxed);
182+
183+
// 5. 推进 Head,向生产者宣告资源释放
184+
m_head.store((head + 1) & m_mask, std::memory_order_release);
185+
186+
return item;
187+
}
188+
189+
```
190+
191+
### 关键设计:为什么需要 while 循环?
192+
193+
这是本队列设计的精髓所在:**“先逻辑声明,再物理发布”。**
194+
195+
生产者在 `enqueue` 时,先修改了 `tail`(宣告占位),然后才写入 `m_slots`(发布数据)。这意味着存在一个微小的时间窗口:
196+
197+
> `head != tail` (队列看起来非空),但 `m_slots[head]` 仍然是 `nullptr`
198+
199+
因此,消费者必须在读取 Slot 时进行检查。如果读到空指针,说明生产者“占了坑但还没填土”,此时通过 `yield()` 让出 CPU 等待数据就绪。
200+
201+
---
202+
203+
## 4. 深度原理:Happens-Before 关系图谱
204+
205+
判断一个无锁队列是否正确,不能靠“感觉”,必须依靠严格的 **Happens-Before (先行发生)** 关系推导。
206+
207+
在这个实现中,正确性完全由**两条对称的同步链**支撑。
208+
209+
### 链条一:数据发布链 (Producer Consumer)
210+
211+
这条链保证了**消费者读取数据时的安全性**
212+
213+
* **Producer**: `m_slots[i].store(..., release)`
214+
* **Consumer**: `m_slots[i].load(..., acquire)`
215+
216+
**推导结果**
217+
当消费者通过 `acquire` 读到非空指针时,生产者在 `release` 之前的所有内存写入(即 `item` 对象的初始化)都对消费者可见。
218+
219+
> **保证:绝不会读取到未初始化的脏数据。**
220+
221+
### 链条二:资源回收链 (Consumer Producer)
222+
223+
这条链保证了**生产者复用 Slot 时的安全性**
224+
225+
* **Consumer**: `m_head.store(..., release)`
226+
* **Producer**: `m_head.load(..., acquire)`
227+
228+
**推导结果**
229+
当生产者通过 `acquire` 观察到 `head` 前进时,消费者在 `release` 之前的所有操作(即读取并清理 Slot)都已完成。
230+
231+
> **保证:绝不会在数据未被消费前覆盖 Slot。**
232+
233+
### 为什么 Relaxed 是安全的?
234+
235+
在代码中,`relaxed` 被用于:
236+
237+
1. CAS 失败后的重试路径。
238+
2. Consumer 更新 `m_slots``nullptr`
239+
3. Consumer 读取自己的 `head`
240+
241+
这些操作的共同点是:**它们不承载跨线程的“通知”任务**。真正的同步边界(可见性保证)完全由上述两条 `Acquire-Release` 链条严密把控,`relaxed` 仅用于降低非关键路径的 CPU 开销。
242+
243+
---
244+
245+
## 5. 总结
246+
247+
实现一个生产级的高性能无锁队列,难点不在于代码行数,而在于对内存模型的精准把控:
248+
249+
1. **分离关注点**:将“位置抢占”(修改 tail)与“数据发布”(写入 slot)解耦,虽然引入了短暂的中间态,但极大降低了竞争复杂度。
250+
2. **显式同步**:利用 `Release-Acquire` 语义建立清晰的 Happens-Before 桥梁。
251+
3. **最小化开销**:在不破坏因果关系的路径上大胆使用 `Relaxed`
252+
253+
通过这种设计,我们实现了一个既安全又高效的 MPSC 缓冲队列,完美适配高并发网络 I/O 场景。
254+
255+
[Full Code](https://gist.github.com/Wizmann/57e6ec4fb326f7acb8a46fe8a4b658fc)

0 commit comments

Comments
 (0)