Skip to content

Commit 1a5a156

Browse files
committed
[Feat] Implementation of BufferManager and KvProtocol.
## Purpose Implement BufferManager for slot-based memory management with IndexPool, and unify SQE packing/CQE unpacking into a single KvProtocol abstraction with comprehensive request validation and full integration testing. ## Modifications 1. Add BufferManager with IndexPool-based slot allocation, supporting HOST (malloc), HOST_PINNED (aclrtMallocHost + aclrtHostRegister), and ASCEND_DEVICE (aclrtMalloc) memory types via AscendBuffer. 2. Unify Sqe/Cqe into KvProtocol base class with PackSqe/UnpackCqe interfaces, replacing separate SqeManager and CqeManager. 3. Add ValidateRequest to check Request fields before packing, including bit-field overflow, alignment, key length, and batch number validation with actual values in error messages and [[unlikely]] hints. 4. Add ProtocolManager to manage send_buffer and flag_buffer instances, with PackRequest, UnpackResponse, and PollResponseCid interfaces. 5. Use protocol-specific max batch numbers per SSU spec: BatchStore/ BatchRetrieve=110, Delete=254, Exist=256. 6. Rename link_proto to link_protocol for naming consistency. 7. Remove old sqe.h/cpp files, replaced by kv_protocol.h/cpp. 8. Move ScatterGatherEntry from types.h to buffer_manager.h. ## Test BufferManagerTest (19 cases), KvProtocolPackTest (38 cases), and ProtocolManagerTest (13 integration cases) all passed. Total: 70 tests.
1 parent 32513bf commit 1a5a156

13 files changed

Lines changed: 2783 additions & 1280 deletions

ucm/transport/kv/asu/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ target_include_directories(asu_transport
1010
)
1111
target_link_libraries(asu_transport PUBLIC infra_logger pthread)
1212

13+
if(RUNTIME_ENVIRONMENT STREQUAL "ascend")
14+
target_compile_definitions(asu_transport PRIVATE ASCEND_ENABLED)
15+
target_link_libraries(asu_transport PRIVATE trans)
16+
endif()
17+
1318
file(GLOB ASU_CLIENT_SOURCES CONFIGURE_DEPENDS client/src/*.cpp)
1419
add_library(asu_client SHARED ${ASU_CLIENT_SOURCES})
1520
target_include_directories(asu_client
Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
/**
2+
* MIT License
3+
*
4+
* Copyright (c) 2026 Huawei Technologies Co., Ltd. All rights reserved.
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
* */
24+
#include "buffer_manager.h"
25+
#include <cstring>
26+
#include <gtest/gtest.h>
27+
#include <thread>
28+
#include <vector>
29+
30+
namespace UC::ASU {
31+
namespace {
32+
33+
class BufferManagerTest : public ::testing::Test {
34+
protected:
35+
void SetUp() override {}
36+
void TearDown() override {}
37+
};
38+
39+
TEST_F(BufferManagerTest, InitAndDestroy)
40+
{
41+
BufferManager mgr;
42+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 100);
43+
ASSERT_TRUE(status.ok()) << status.message;
44+
}
45+
46+
TEST_F(BufferManagerTest, InitWithZeroSlotSize)
47+
{
48+
BufferManager mgr;
49+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 0, 100);
50+
ASSERT_FALSE(status.ok());
51+
ASSERT_EQ(status.code, StatusCode::INVALID_ARGUMENT);
52+
}
53+
54+
TEST_F(BufferManagerTest, InitWithZeroSlotNum)
55+
{
56+
BufferManager mgr;
57+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 0);
58+
ASSERT_FALSE(status.ok());
59+
ASSERT_EQ(status.code, StatusCode::INVALID_ARGUMENT);
60+
}
61+
62+
TEST_F(BufferManagerTest, DoubleInit)
63+
{
64+
BufferManager mgr;
65+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 100);
66+
ASSERT_TRUE(status.ok());
67+
status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 100);
68+
ASSERT_FALSE(status.ok());
69+
ASSERT_EQ(status.code, StatusCode::INVALID_ARGUMENT);
70+
}
71+
72+
TEST_F(BufferManagerTest, AllocateWithoutInit)
73+
{
74+
BufferManager mgr;
75+
ScatterGatherEntry sge;
76+
auto status = mgr.Allocate(64, sge);
77+
ASSERT_FALSE(status.ok());
78+
ASSERT_EQ(status.code, StatusCode::NOT_INITIALIZED);
79+
}
80+
81+
TEST_F(BufferManagerTest, AllocateZeroSize)
82+
{
83+
BufferManager mgr;
84+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 100);
85+
ASSERT_TRUE(status.ok());
86+
87+
ScatterGatherEntry sge;
88+
status = mgr.Allocate(0, sge);
89+
ASSERT_FALSE(status.ok());
90+
ASSERT_EQ(status.code, StatusCode::INVALID_ARGUMENT);
91+
}
92+
93+
TEST_F(BufferManagerTest, AllocateExceedsSlotSize)
94+
{
95+
BufferManager mgr;
96+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 100);
97+
ASSERT_TRUE(status.ok());
98+
99+
ScatterGatherEntry sge;
100+
status = mgr.Allocate(2048, sge);
101+
ASSERT_FALSE(status.ok());
102+
ASSERT_EQ(status.code, StatusCode::INVALID_ARGUMENT);
103+
}
104+
105+
TEST_F(BufferManagerTest, SingleAllocateAndFree)
106+
{
107+
BufferManager mgr;
108+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 100);
109+
ASSERT_TRUE(status.ok());
110+
111+
ScatterGatherEntry sge;
112+
status = mgr.Allocate(64, sge);
113+
ASSERT_TRUE(status.ok()) << status.message;
114+
ASSERT_NE(sge.addr, 0);
115+
ASSERT_EQ(sge.length, 64);
116+
ASSERT_EQ(sge.lkey, 0);
117+
118+
auto* ptr = reinterpret_cast<void*>(sge.addr);
119+
std::memset(ptr, 0xAB, 64);
120+
121+
status = mgr.Free(ptr);
122+
ASSERT_TRUE(status.ok()) << status.message;
123+
}
124+
125+
TEST_F(BufferManagerTest, MultipleAllocatesAndFrees)
126+
{
127+
BufferManager mgr;
128+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 100);
129+
ASSERT_TRUE(status.ok());
130+
131+
constexpr int kCount = 50;
132+
std::vector<ScatterGatherEntry> sges(kCount);
133+
134+
for (int i = 0; i < kCount; ++i) {
135+
status = mgr.Allocate(128, sges[i]);
136+
ASSERT_TRUE(status.ok()) << "Failed at i=" << i << ": " << status.message;
137+
ASSERT_NE(sges[i].addr, 0);
138+
std::memset(reinterpret_cast<void*>(sges[i].addr), i, 128);
139+
}
140+
141+
for (int i = 0; i < kCount; ++i) {
142+
auto* data = reinterpret_cast<unsigned char*>(sges[i].addr);
143+
for (int j = 0; j < 128; ++j) {
144+
ASSERT_EQ(data[j], static_cast<unsigned char>(i));
145+
}
146+
}
147+
148+
for (int i = 0; i < kCount; ++i) {
149+
status = mgr.Free(reinterpret_cast<void*>(sges[i].addr));
150+
ASSERT_TRUE(status.ok()) << status.message;
151+
}
152+
}
153+
154+
TEST_F(BufferManagerTest, FreeNullPointer)
155+
{
156+
BufferManager mgr;
157+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 100);
158+
ASSERT_TRUE(status.ok());
159+
160+
status = mgr.Free(nullptr);
161+
ASSERT_FALSE(status.ok());
162+
ASSERT_EQ(status.code, StatusCode::INVALID_ARGUMENT);
163+
}
164+
165+
TEST_F(BufferManagerTest, FreeWithoutInit)
166+
{
167+
BufferManager mgr;
168+
int dummy = 0;
169+
auto status = mgr.Free(&dummy);
170+
ASSERT_FALSE(status.ok());
171+
ASSERT_EQ(status.code, StatusCode::NOT_INITIALIZED);
172+
}
173+
174+
TEST_F(BufferManagerTest, FreeOutOfRangePointer)
175+
{
176+
BufferManager mgr;
177+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 100);
178+
ASSERT_TRUE(status.ok());
179+
180+
int dummy = 0;
181+
status = mgr.Free(&dummy);
182+
ASSERT_FALSE(status.ok());
183+
ASSERT_EQ(status.code, StatusCode::INVALID_ARGUMENT);
184+
}
185+
186+
TEST_F(BufferManagerTest, FreeUnalignedPointer)
187+
{
188+
BufferManager mgr;
189+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 100);
190+
ASSERT_TRUE(status.ok());
191+
192+
ScatterGatherEntry sge;
193+
status = mgr.Allocate(64, sge);
194+
ASSERT_TRUE(status.ok());
195+
196+
auto* ptr = reinterpret_cast<char*>(sge.addr) + 1;
197+
status = mgr.Free(ptr);
198+
ASSERT_FALSE(status.ok());
199+
ASSERT_EQ(status.code, StatusCode::INVALID_ARGUMENT);
200+
201+
mgr.Free(reinterpret_cast<void*>(sge.addr));
202+
}
203+
204+
TEST_F(BufferManagerTest, AllocateFullSlotSize)
205+
{
206+
BufferManager mgr;
207+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 10);
208+
ASSERT_TRUE(status.ok());
209+
210+
ScatterGatherEntry sge;
211+
status = mgr.Allocate(1024, sge);
212+
ASSERT_TRUE(status.ok()) << status.message;
213+
ASSERT_EQ(sge.length, 1024);
214+
215+
std::memset(reinterpret_cast<void*>(sge.addr), 0xFF, 1024);
216+
217+
mgr.Free(reinterpret_cast<void*>(sge.addr));
218+
}
219+
220+
TEST_F(BufferManagerTest, ReuseAfterFree)
221+
{
222+
BufferManager mgr;
223+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 1);
224+
ASSERT_TRUE(status.ok());
225+
226+
ScatterGatherEntry sge1;
227+
status = mgr.Allocate(64, sge1);
228+
ASSERT_TRUE(status.ok());
229+
230+
auto* ptr = reinterpret_cast<void*>(sge1.addr);
231+
mgr.Free(ptr);
232+
233+
ScatterGatherEntry sge2;
234+
status = mgr.Allocate(64, sge2);
235+
ASSERT_TRUE(status.ok());
236+
ASSERT_EQ(sge2.addr, sge1.addr);
237+
238+
mgr.Free(reinterpret_cast<void*>(sge2.addr));
239+
}
240+
241+
TEST_F(BufferManagerTest, ConcurrentAllocateAndFree)
242+
{
243+
BufferManager mgr;
244+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 1024, 100);
245+
ASSERT_TRUE(status.ok());
246+
247+
constexpr int kThreadCount = 4;
248+
constexpr int kOpsPerThread = 500;
249+
250+
auto worker = [&mgr](int thread_id) {
251+
for (int i = 0; i < kOpsPerThread; ++i) {
252+
ScatterGatherEntry sge;
253+
auto s = mgr.Allocate(64, sge);
254+
ASSERT_TRUE(s.ok()) << "Thread " << thread_id << " op " << i << ": " << s.message;
255+
256+
std::memset(reinterpret_cast<void*>(sge.addr), thread_id, 64);
257+
258+
s = mgr.Free(reinterpret_cast<void*>(sge.addr));
259+
ASSERT_TRUE(s.ok()) << s.message;
260+
}
261+
};
262+
263+
std::vector<std::thread> threads;
264+
for (int i = 0; i < kThreadCount; ++i) { threads.emplace_back(worker, i); }
265+
for (auto& t : threads) { t.join(); }
266+
}
267+
268+
TEST_F(BufferManagerTest, ConcurrentStressTest)
269+
{
270+
BufferManager mgr;
271+
auto status = mgr.Init("test_buffer", MemoryType::HOST, 256, 16);
272+
ASSERT_TRUE(status.ok());
273+
274+
constexpr int kThreadCount = 4;
275+
constexpr int kOpsPerThread = 1000;
276+
277+
auto worker = [&mgr](int thread_id) {
278+
for (int i = 0; i < kOpsPerThread; ++i) {
279+
ScatterGatherEntry sge;
280+
auto s = mgr.Allocate(128, sge);
281+
ASSERT_TRUE(s.ok());
282+
283+
std::memset(reinterpret_cast<void*>(sge.addr), thread_id, 128);
284+
285+
for (int j = 0; j < 128; ++j) {
286+
ASSERT_EQ(reinterpret_cast<unsigned char*>(sge.addr)[j], thread_id);
287+
}
288+
289+
s = mgr.Free(reinterpret_cast<void*>(sge.addr));
290+
ASSERT_TRUE(s.ok());
291+
}
292+
};
293+
294+
std::vector<std::thread> threads;
295+
for (int i = 0; i < kThreadCount; ++i) { threads.emplace_back(worker, i); }
296+
for (auto& t : threads) { t.join(); }
297+
}
298+
299+
TEST_F(BufferManagerTest, AllocateZeroedReturnsZeroedMemory)
300+
{
301+
BufferManager mgr;
302+
auto status = mgr.Init("test", MemoryType::HOST, 1024, 10);
303+
ASSERT_TRUE(status.ok());
304+
305+
ScatterGatherEntry sge;
306+
status = mgr.AllocateZeroed(512, sge);
307+
ASSERT_TRUE(status.ok());
308+
309+
auto* ptr = reinterpret_cast<uint8_t*>(sge.addr);
310+
for (size_t i = 0; i < 1024; ++i) {
311+
ASSERT_EQ(ptr[i], 0) << "byte " << i << " not zeroed";
312+
}
313+
314+
mgr.Free(reinterpret_cast<void*>(sge.addr));
315+
}
316+
317+
TEST_F(BufferManagerTest, AllocateZeroedRejectsZeroSize)
318+
{
319+
BufferManager mgr;
320+
auto status = mgr.Init("test", MemoryType::HOST, 1024, 10);
321+
ASSERT_TRUE(status.ok());
322+
323+
ScatterGatherEntry sge;
324+
status = mgr.AllocateZeroed(0, sge);
325+
ASSERT_FALSE(status.ok());
326+
ASSERT_EQ(status.code, StatusCode::INVALID_ARGUMENT);
327+
}
328+
329+
} // namespace
330+
} // namespace UC::ASU

0 commit comments

Comments
 (0)