Skip to content

Commit ccc6276

Browse files
committed
Optimize ASU task manager with lock-free slots
1 parent bb78acc commit ccc6276

2 files changed

Lines changed: 625 additions & 24 deletions

File tree

ucm/transport/kv/asu/common/task_manager_base.h

Lines changed: 200 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,69 +24,245 @@
2424
#pragma once
2525

2626
#include <atomic>
27+
#include <cstddef>
28+
#include <cstdint>
29+
#include <limits>
2730
#include <memory>
28-
#include <mutex>
2931
#include <string>
30-
#include <unordered_map>
3132
#include <utility>
33+
#include <vector>
34+
#include <functional>
35+
3236
#include "asu_transport/types.h"
3337

3438
namespace UC::ASU {
3539

3640
template <typename Context, typename State>
3741
class TaskManagerBase {
3842
public:
39-
TaskManagerBase(State initial_state, std::string task_name)
40-
: initial_state_(initial_state), task_name_(std::move(task_name))
43+
static constexpr std::size_t kMinSlotCount = 1024;
44+
static constexpr std::size_t kDefaultSlotCount = 8192;
45+
46+
static std::size_t RecommendSlotCount(std::size_t max_inflight_tasks)
47+
{
48+
// Keep load factor <= 0.5 for open addressing.
49+
// For example: 4096 inflight tasks -> 8192 slots.
50+
const auto required = std::max<std::size_t>(
51+
kMinSlotCount,
52+
max_inflight_tasks * 2);
53+
return NormalizeSlotCount(required);
54+
}
55+
56+
explicit TaskManagerBase(
57+
State initial_state,
58+
std::string task_name,
59+
std::size_t slot_count = kDefaultSlotCount)
60+
: initial_state_(initial_state),
61+
task_name_(std::move(task_name)),
62+
slots_(NormalizeSlotCount(slot_count)),
63+
slot_mask_(slots_.size() - 1) // Used for efficient slot index calculation: bitwise modulo
4164
{
4265
}
4366

4467
Status Submit(std::unique_ptr<Context> ctx, TaskId& task_id)
4568
{
4669
if (!ctx) {
4770
task_id = kInvalidTaskId;
48-
return Status::Error(StatusCode::INVALID_ARGUMENT,
49-
task_name_ + " task context is null");
71+
return Status::Error(
72+
StatusCode::INVALID_ARGUMENT,
73+
task_name_ + " task context is null");
5074
}
5175

5276
auto shared_ctx = std::shared_ptr<Context>(std::move(ctx));
5377
shared_ctx->state.store(initial_state_, std::memory_order_release);
5478

55-
std::lock_guard<std::mutex> lock(mu_);
79+
TaskId new_task_id = kInvalidTaskId;
5680
do {
57-
task_id = next_task_id_.fetch_add(1, std::memory_order_relaxed);
58-
} while (task_id == kInvalidTaskId || tasks_.find(task_id) != tasks_.end());
81+
new_task_id = next_task_id_.fetch_add(1, std::memory_order_relaxed);
82+
} while (new_task_id == kInvalidTaskId); // kInvalidTaskId is 0, so task id starts from 1 to avoid allocating invalid IDs
83+
84+
shared_ctx->task_id = new_task_id;
85+
86+
const auto start = Hash(new_task_id) & slot_mask_;
87+
const auto capacity = slots_.size();
88+
89+
for (std::size_t probe = 0; probe < capacity; ++probe) {
90+
auto& slot = slots_[(start + probe) & slot_mask_];
91+
92+
// CAS: Try to transition EMPTY → WRITING
93+
std::uint8_t expected = SlotState::EMPTY;
94+
if (!slot.state.compare_exchange_strong(
95+
expected,
96+
SlotState::WRITING,
97+
std::memory_order_acq_rel,
98+
std::memory_order_acquire)) {
99+
continue;
100+
}
101+
102+
AtomicStoreCtx(slot, shared_ctx, std::memory_order_release);
103+
slot.task_id.store(new_task_id, std::memory_order_release);
104+
slot.state.store(SlotState::READY, std::memory_order_release);
105+
106+
task_id = new_task_id;
107+
return Status::OK();
108+
}
109+
110+
task_id = kInvalidTaskId;
59111

60-
shared_ctx->task_id = task_id;
61-
tasks_.emplace(task_id, std::move(shared_ctx));
62-
return Status::OK();
112+
// Consider adding RESOURCE_EXHAUSTED / NO_SPACE error codes to StatusCode
113+
return Status::Error(
114+
StatusCode::INVALID_ARGUMENT,
115+
task_name_ + " task table is full");
63116
}
64117

65118
std::shared_ptr<Context> Get(TaskId task_id)
66119
{
67-
std::lock_guard<std::mutex> lock(mu_);
68-
auto iter = tasks_.find(task_id);
69-
if (iter == tasks_.end()) { return nullptr; }
70-
return iter->second;
120+
if (task_id == kInvalidTaskId) {
121+
return nullptr;
122+
}
123+
124+
const auto start = Hash(task_id) & slot_mask_;
125+
const auto capacity = slots_.size();
126+
127+
for (std::size_t probe = 0; probe < capacity; ++probe) {
128+
auto& slot = slots_[(start + probe) & slot_mask_];
129+
130+
const auto state1 = slot.state.load(std::memory_order_acquire);
131+
if (state1 != SlotState::READY) {
132+
continue;
133+
}
134+
135+
const auto id1 = slot.task_id.load(std::memory_order_acquire);
136+
if (id1 != task_id) {
137+
continue;
138+
}
139+
140+
auto ptr = AtomicLoadCtx(slot, std::memory_order_acquire);
141+
if (!ptr) {
142+
continue;
143+
}
144+
145+
// Double-check to avoid returning a ctx from a reused slot.
146+
const auto id2 = slot.task_id.load(std::memory_order_acquire);
147+
const auto state2 = slot.state.load(std::memory_order_acquire);
148+
149+
if (state2 == SlotState::READY &&
150+
id2 == task_id &&
151+
ptr->task_id == task_id) {
152+
return ptr;
153+
}
154+
}
155+
156+
return nullptr;
71157
}
72158

73159
Status Remove(TaskId task_id)
74160
{
75-
std::lock_guard<std::mutex> lock(mu_);
76-
auto erased = tasks_.erase(task_id);
77-
if (erased == 0) {
78-
return Status::Error(StatusCode::TASK_NOT_FOUND, task_name_ + " task not found");
161+
if (task_id == kInvalidTaskId) {
162+
return Status::Error(
163+
StatusCode::TASK_NOT_FOUND,
164+
task_name_ + " task not found");
165+
}
166+
167+
const auto start = Hash(task_id) & slot_mask_;
168+
const auto capacity = slots_.size();
169+
170+
for (std::size_t probe = 0; probe < capacity; ++probe) {
171+
auto& slot = slots_[(start + probe) & slot_mask_];
172+
173+
const auto state = slot.state.load(std::memory_order_acquire);
174+
if (state != SlotState::READY) {
175+
continue; // Only process slots in READY state
176+
}
177+
178+
const auto id = slot.task_id.load(std::memory_order_acquire);
179+
if (id != task_id) {
180+
continue;
181+
}
182+
183+
std::uint8_t expected = SlotState::READY;
184+
if (!slot.state.compare_exchange_strong(
185+
expected,
186+
SlotState::REMOVING,
187+
std::memory_order_acq_rel,
188+
std::memory_order_acquire)) {
189+
continue; // CAS failed, continue probing
190+
}
191+
192+
AtomicStoreCtx(slot, std::shared_ptr<Context>{}, std::memory_order_release);
193+
slot.task_id.store(kInvalidTaskId, std::memory_order_release);
194+
slot.state.store(SlotState::EMPTY, std::memory_order_release);
195+
196+
return Status::OK();
197+
}
198+
199+
return Status::Error(
200+
StatusCode::TASK_NOT_FOUND,
201+
task_name_ + " task not found");
202+
}
203+
204+
private:
205+
struct SlotState {
206+
static constexpr std::uint8_t EMPTY = 0;
207+
static constexpr std::uint8_t WRITING = 1;
208+
static constexpr std::uint8_t READY = 2;
209+
static constexpr std::uint8_t REMOVING = 3;
210+
};
211+
// Ensure each Slot is aligned to 64 bytes to avoid False Sharing
212+
struct alignas(64) Slot {
213+
std::atomic<std::uint8_t> state{SlotState::EMPTY};
214+
std::atomic<TaskId> task_id{kInvalidTaskId};
215+
216+
// Use atomic_load/atomic_store free functions for shared_ptr.
217+
// This avoids requiring C++20 std::atomic<std::shared_ptr<T>>.
218+
std::shared_ptr<Context> ctx;
219+
};
220+
221+
private:
222+
static std::size_t NormalizeSlotCount(std::size_t n)
223+
{
224+
n = std::max<std::size_t>(n, kMinSlotCount);
225+
226+
std::size_t power = 1;
227+
while (power < n) {
228+
if (power > (std::numeric_limits<std::size_t>::max() >> 1)) {
229+
return power;
230+
}
231+
power <<= 1;
79232
}
80-
return Status::OK();
233+
234+
return power;
235+
}
236+
237+
static std::size_t Hash(TaskId task_id)
238+
{
239+
return std::hash<TaskId>{}(task_id);
240+
}
241+
242+
// Atomically load shared_ptr<Context> from Slot, ensuring thread safety
243+
static std::shared_ptr<Context> AtomicLoadCtx(
244+
const Slot& slot,
245+
std::memory_order order)
246+
{
247+
return std::atomic_load_explicit(&slot.ctx, order);
248+
}
249+
250+
// Atomically store task context shared_ctx into slot
251+
static void AtomicStoreCtx(
252+
Slot& slot,
253+
std::shared_ptr<Context> ptr,
254+
std::memory_order order)
255+
{
256+
std::atomic_store_explicit(&slot.ctx, std::move(ptr), order);
81257
}
82258

83259
private:
84260
State initial_state_;
85261
std::string task_name_;
86262
std::atomic<TaskId> next_task_id_{1};
87-
// TODO: consider using a lock-free structure !
88-
std::mutex mu_;
89-
std::unordered_map<TaskId, std::shared_ptr<Context>> tasks_;
263+
264+
std::vector<Slot> slots_;
265+
std::size_t slot_mask_{0};
90266
};
91267

92268
} // namespace UC::ASU

0 commit comments

Comments
 (0)