Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/source/user-guide/prefix-cache/pipeline_store.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ ucm_connectors:
* **io_direct** (optional, default: `false`):
Whether to enable direct I/O.

* **cache_use_hugepage** (optional, default: `false`):
Whether CacheStore direct-I/O host buffer allocation may use huge pages. Set this to `true` only when the runtime environment has enough hugepage resources available.

* **stream_number** *(optional, default: 8)*
Number of threads used for data transfer between the Host and Storage.

Expand Down Expand Up @@ -262,4 +265,4 @@ This log indicates that the **Posix Store** has received a **load or dump task**
```text
[UC][D] Posix task({task_id},{operation},{subtask_number},{size}) finished, cost {time}ms. [PID,TID]
```
This log indicates that a load or dump task in the **Posix Store** has completed, along with its execution time in **in ms**.
This log indicates that a load or dump task in the **Posix Store** has completed, along with its execution time in **in ms**.
1 change: 1 addition & 0 deletions examples/ucm_config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ ucm_connectors:
store_pipeline: "Cache|Posix"
storage_backends: "/mnt/test"
io_direct: false
# cache_use_hugepage: false
# cache_buffer_capacity_gb: 256
# posix_capacity_gb: 1024

Expand Down
62 changes: 49 additions & 13 deletions ucm/shared/trans/ascend/ascend_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,28 @@ class HostHugePages : public std::enable_shared_from_this<HostHugePages> {
UC_WARN("Mmap({}) with TLB({}) return: {}.", alignedSize, pageSize, errno);
return ptr;
}
UC_DEBUG("Mmap({}) with TLB({}) success.", alignedSize, pageSize);
size = alignedSize;
return ptr;
}
static void* MMapWithAdvice(size_t& size)
static void* MMapAnonymous(size_t& size, bool useHugePage)
{
const auto pageSize = HUGE_PAGE_SIZE;
const auto alignedSize = (size + pageSize - 1) / pageSize * pageSize;
const auto prot = PROT_WRITE | PROT_READ;
const auto flags = MAP_PRIVATE | MAP_ANONYMOUS;
void* ptr = mmap(nullptr, alignedSize, prot, flags, -1, 0);
if (ptr == MAP_FAILED) {
UC_WARN("Mmap({}) with advice({}) return: {}.", alignedSize, pageSize, errno);
UC_WARN("Mmap({}) anonymous return: {}.", alignedSize, errno);
return ptr;
}
madvise(ptr, alignedSize, MADV_HUGEPAGE);
auto ret = madvise(ptr, alignedSize, useHugePage ? MADV_HUGEPAGE : MADV_NOHUGEPAGE);
if (ret != 0) {
UC_WARN("Madvise {} on mmap({}) return: {}.",
useHugePage ? "hugepage" : "nohugepage", alignedSize, errno);
} else {
UC_DEBUG("Mmap({}) anonymous success, hugepage advice: {}.", alignedSize, useHugePage);
}
size = alignedSize;
return ptr;
}
Expand All @@ -81,29 +88,55 @@ class HostHugePages : public std::enable_shared_from_this<HostHugePages> {
munlock(buffer_, size_);
munmap(buffer_, size_);
}
std::shared_ptr<void> Data()
std::shared_ptr<void> Data(bool useHugePage)
{
if (buffer_ != MAP_FAILED) {
return std::shared_ptr<void>(buffer_, [self = shared_from_this()](auto) {});
}
const auto useGiganticPages = size_ >= GIGANTIC_PAGE_SIZE;
buffer_ = MMapWithTLB(size_, useGiganticPages);
if (buffer_ == MAP_FAILED && useGiganticPages) { buffer_ = MMapWithTLB(size_, false); }
if (buffer_ == MAP_FAILED) { buffer_ = MMapWithAdvice(size_); }
const auto requestedSize = size_;
const auto useGiganticPages = useHugePage && size_ >= GIGANTIC_PAGE_SIZE;
UC_DEBUG("Make direct-io host buffer start, requested size: {}, use hugepage: {}, use 1GiB hugepage: {}.",
requestedSize, useHugePage, useGiganticPages);
if (useHugePage) {
buffer_ = MMapWithTLB(size_, useGiganticPages);
if (buffer_ == MAP_FAILED && useGiganticPages) {
UC_DEBUG("Fallback direct-io host buffer mmap from 1GiB hugepage to 2MiB hugepage, requested size: {}.",
requestedSize);
buffer_ = MMapWithTLB(size_, false);
}
}
if (buffer_ == MAP_FAILED) {
UC_ERROR("Failed to make host buffer({}).", size_);
UC_DEBUG("Make direct-io host buffer mmap with anonymous mmap, requested size: {}, use hugepage advice: {}.",
requestedSize, useHugePage);
buffer_ = MMapAnonymous(size_, useHugePage);
}
if (buffer_ == MAP_FAILED) {
UC_ERROR("Failed to make direct-io host buffer, requested size: {}, aligned size: {}.",
requestedSize, size_);
return nullptr;
}
UC_DEBUG("Make direct-io host buffer mmap success, requested size: {}, actual size: {}, addr: {}.",
requestedSize, size_, buffer_);
UC_DEBUG("Zero direct-io host buffer start, size: {}, addr: {}.", size_, buffer_);
std::memset(buffer_, 0, size_);
mlock(buffer_, size_);
UC_DEBUG("Zero direct-io host buffer success, size: {}, addr: {}.", size_, buffer_);
auto ret = mlock(buffer_, size_);
if (ret != 0) {
UC_WARN("Mlock direct-io host buffer failed, size: {}, errno: {}.", size_, errno);
} else {
UC_DEBUG("Mlock direct-io host buffer success, size: {}.", size_);
}
UC_DEBUG("Register direct-io host buffer start, size: {}, addr: {}.", size_, buffer_);
auto s = Buffer::RegisterHostBuffer(buffer_, size_);
if (s.Failure()) {
UC_ERROR("Failed({}) to register buffer({}).", s, size_);
UC_ERROR("Failed({}) to register direct-io host buffer, requested size: {}, actual size: {}, addr: {}.",
s, requestedSize, size_, buffer_);
munlock(buffer_, size_);
munmap(buffer_, size_);
buffer_ = MAP_FAILED;
return nullptr;
}
UC_DEBUG("Register direct-io host buffer success, size: {}, addr: {}.", size_, buffer_);
return std::shared_ptr<void>(buffer_, [self = shared_from_this()](auto) {});
}
};
Expand All @@ -113,6 +146,7 @@ std::shared_ptr<void> Trans::AscendBuffer::MakeDeviceBuffer(size_t size)
void* device = nullptr;
auto ret = aclrtMalloc(&device, size, ACL_MEM_TYPE_HIGH_BAND_WIDTH);
if (ret == ACL_SUCCESS) { return std::shared_ptr<void>(device, aclrtFree); }
UC_ERROR("Aclrt malloc device buffer failed, size: {}, ret: {}.", size, ret);
return nullptr;
}

Expand All @@ -121,14 +155,16 @@ std::shared_ptr<void> Trans::AscendBuffer::MakeHostBuffer(size_t size)
void* host = nullptr;
auto ret = aclrtMallocHost(&host, size);
if (ret == ACL_SUCCESS) { return std::shared_ptr<void>(host, aclrtFreeHost); }
UC_ERROR("Aclrt malloc host buffer failed, size: {}, ret: {}.", size, ret);
return nullptr;
}

std::shared_ptr<void> Trans::AscendBuffer::MakeHostBuffer4DirectIo(size_t size)
std::shared_ptr<void> Trans::AscendBuffer::MakeHostBuffer4DirectIo(size_t size, bool useHugePage)
{
try {
return HostHugePages::Create(size)->Data();
return HostHugePages::Create(size)->Data(useHugePage);
} catch (...) {
UC_ERROR("Make direct-io host buffer failed with exception, size: {}.", size);
return nullptr;
}
}
Expand Down
2 changes: 1 addition & 1 deletion ucm/shared/trans/ascend/ascend_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class AscendBuffer : public ReservedBuffer {
public:
std::shared_ptr<void> MakeDeviceBuffer(size_t size) override;
std::shared_ptr<void> MakeHostBuffer(size_t size) override;
std::shared_ptr<void> MakeHostBuffer4DirectIo(size_t size) override;
std::shared_ptr<void> MakeHostBuffer4DirectIo(size_t size, bool useHugePage = false) override;
};

} // namespace UC::Trans
Expand Down
2 changes: 1 addition & 1 deletion ucm/shared/trans/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Buffer {
virtual std::shared_ptr<void> GetDeviceBuffer(size_t size) = 0;

virtual std::shared_ptr<void> MakeHostBuffer(size_t size) = 0;
virtual std::shared_ptr<void> MakeHostBuffer4DirectIo(size_t size) = 0;
virtual std::shared_ptr<void> MakeHostBuffer4DirectIo(size_t size, bool useHugePage = false) = 0;
virtual Status MakeHostBuffers(size_t size, size_t number) = 0;
virtual std::shared_ptr<void> GetHostBuffer(size_t size) = 0;

Expand Down
2 changes: 1 addition & 1 deletion ucm/shared/trans/detail/reserved_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ReservedBuffer : public Buffer {
return this->MakeDeviceBuffer(size);
}

std::shared_ptr<void> MakeHostBuffer4DirectIo(size_t size) override
std::shared_ptr<void> MakeHostBuffer4DirectIo(size_t size, bool = false) override
{
return this->MakeHostBuffer(size);
}
Expand Down
2 changes: 2 additions & 0 deletions ucm/store/cache/cc/cache_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class CacheStore : public StoreV1 {
config.Get("share_buffer_enable", param.shareBufferEnable);
if (!param.shareBufferEnable) { param.bufferCapacity /= 8; }
config.Get("io_direct", param.ioDirect);
config.Get("cache_use_hugepage", param.cacheUseHugePage);
size_t bufferCapacityGb = 0;
config.GetNumber("cache_buffer_capacity_gb", bufferCapacityGb);
if (bufferCapacityGb != 0) { param.bufferCapacity = bufferCapacityGb << 30; }
Expand Down Expand Up @@ -196,6 +197,7 @@ class CacheStore : public StoreV1 {
UC_INFO("Set {}::ShardSize to {}.", ns, config.shardSize);
UC_INFO("Set {}::BlockSize to {}.", ns, config.blockSize);
UC_INFO("Set {}::IoDirect to {}.", ns, config.ioDirect);
UC_INFO("Set {}::CacheUseHugePage to {}.", ns, config.cacheUseHugePage);
UC_INFO("Set {}::CpuAffinityCores to {}.", ns, config.cpuAffinityCores);
UC_INFO("Set {}::BufferCapacity to {}GB.", ns, config.bufferCapacity >> 30);
UC_INFO("Set {}::ShareBufferEnable to {}.", ns, config.shareBufferEnable);
Expand Down
1 change: 1 addition & 0 deletions ucm/store/cache/cc/global_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct Config {
size_t shardSize{0};
size_t blockSize{0};
bool ioDirect{false};
bool cacheUseHugePage{false};
std::vector<ssize_t> cpuAffinityCores{};
size_t bufferCapacity{256ULL << 30};
size_t loadExclusiveBufferNumber{1024};
Expand Down
17 changes: 12 additions & 5 deletions ucm/store/cache/cc/trans_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class LocalBufferStrategy : public BufferStrategy {
};

bool ioDirect_{false};
bool useHugePage_{false};
BufferHeader header_;
LocalMutex bucketLocks_[nHashTableBucket];
std::unique_ptr<LocalLock[]> nodeLocks_;
Expand All @@ -134,8 +135,9 @@ class LocalBufferStrategy : public BufferStrategy {

public:
LocalBufferStrategy(int32_t deviceId, size_t nodeSize, size_t totalSize, size_t reservedNumber,
bool ioDirect)
: BufferStrategy(deviceId, nodeSize, totalSize, reservedNumber), ioDirect_(ioDirect)
bool ioDirect, bool useHugePage)
: BufferStrategy(deviceId, nodeSize, totalSize, reservedNumber), ioDirect_(ioDirect),
useHugePage_(useHugePage)
{
}
Status Setup() override
Expand Down Expand Up @@ -164,12 +166,17 @@ class LocalBufferStrategy : public BufferStrategy {
UC_ERROR("Failed to make buffer on device({}).", deviceId);
return Status::Error();
}
data_ = ioDirect_ ? buffer->MakeHostBuffer4DirectIo(nodeSize * nNode)
UC_DEBUG("Make cache local buffer start, device: {}, ioDirect: {}, useHugePage: {}, nodeSize: {}, nNode: {}, totalSize: {}.",
deviceId, ioDirect_, useHugePage_, nodeSize, nNode, nodeSize * nNode);
data_ = ioDirect_ ? buffer->MakeHostBuffer4DirectIo(nodeSize * nNode, useHugePage_)
: buffer->MakeHostBuffer(nodeSize * nNode);
if (!data_) [[unlikely]] {
UC_ERROR("Failed to make pinned({}) for device({}).", nodeSize * nNode, deviceId);
UC_ERROR("Failed to make pinned({}) for device({}), ioDirect: {}, useHugePage: {}, nodeSize: {}, nNode: {}.",
nodeSize * nNode, deviceId, ioDirect_, useHugePage_, nodeSize, nNode);
return Status::OutOfMemory();
}
UC_DEBUG("Make cache local buffer success, device: {}, ioDirect: {}, useHugePage: {}, totalSize: {}.",
deviceId, ioDirect_, useHugePage_, nodeSize * nNode);
for (size_t i = 0; i < nHashTableBucket; i++) { header_.buckets[i] = invalidIndex; }
for (size_t i = 0; i < nNode; i++) { meta_[i].Init(); }
header_.freeHead = 0;
Expand Down Expand Up @@ -469,7 +476,7 @@ Status TransBuffer::Setup(const Config& config)
if (!config.shareBufferEnable) {
strategy_ = std::make_shared<LocalBufferStrategy>(
config.deviceId, config.shardSize, config.bufferCapacity,
config.loadExclusiveBufferNumber, config.ioDirect);
config.loadExclusiveBufferNumber, config.ioDirect, config.cacheUseHugePage);
} else if (config.deviceId >= 0) {
strategy_ = std::make_shared<SharedBufferStrategy>(
config.uniqueId, config.deviceId, config.shardSize, config.bufferCapacity,
Expand Down
6 changes: 6 additions & 0 deletions ucm/store/test/case/cache/cache_trans_buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ class UCCacheTransBufferTest : public testing::TestWithParam<bool> {

INSTANTIATE_TEST_CASE_P(SharedCondition, UCCacheTransBufferTest, ::testing::Values(false, true));

TEST(UCCacheConfigTest, CacheUseHugePageDefaultDisabled)
{
UC::CacheStore::Config config;
ASSERT_FALSE(config.cacheUseHugePage);
}

TEST_P(UCCacheTransBufferTest, GetFirstNode)
{
UC::CacheStore::TransBuffer transBuffer;
Expand Down
Loading