Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion third_party/Mooncake
Submodule Mooncake updated from 8c31e7 to 1adcb2
2 changes: 1 addition & 1 deletion xllm/core/distributed_runtime/llm_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ bool LLMEngine::allocate_kv_cache(const Engine::KVCacheCapacity& kv_cache_cap) {

CHECK_GT(kv_cache_cap.n_blocks, 0) << "no memory for kv cache";
const int32_t block_size = options_.block_size();
bool enable_lighting_indexer = args_.index_n_heads() > 1;
bool enable_lighting_indexer = args_.index_n_heads() > 0;
bool enable_gdn_attention = has_linear_attention_layers(args_);

// init kv cache for each worker
Expand Down
22 changes: 17 additions & 5 deletions xllm/core/framework/kv_cache/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ cc_library(
$<$<BOOL:${USE_NPU}>:spec_kv_cache_transfer.h>
kv_cache_store.h
hierarchy_kv_cache_transfer.h
$<$<BOOL:${USE_NPU}>:mooncake_transfer_engine.h>
$<$<BOOL:${USE_NPU}>:mooncake_kv_cache_transfer.h>
$<$<OR:$<BOOL:${USE_NPU}>,$<BOOL:${USE_MLU}>>:mooncake_transfer_engine.h>
$<$<OR:$<BOOL:${USE_NPU}>,$<BOOL:${USE_MLU}>>:mooncake_kv_cache_transfer.h>
$<$<BOOL:${USE_NPU}>:mooncake_weight_transfer.h>
SRCS
embedding_cache.cpp
Expand All @@ -26,8 +26,8 @@ cc_library(
$<$<BOOL:${USE_NPU}>:spec_kv_cache_transfer.cpp>
kv_cache_store.cpp
hierarchy_kv_cache_transfer.cpp
$<$<BOOL:${USE_NPU}>:mooncake_transfer_engine.cpp>
$<$<BOOL:${USE_NPU}>:mooncake_kv_cache_transfer.cpp>
$<$<OR:$<BOOL:${USE_NPU}>,$<BOOL:${USE_MLU}>>:mooncake_transfer_engine.cpp>
$<$<OR:$<BOOL:${USE_NPU}>,$<BOOL:${USE_MLU}>>:mooncake_kv_cache_transfer.cpp>
$<$<BOOL:${USE_NPU}>:mooncake_weight_transfer.cpp>
DEPS
:common
Expand Down Expand Up @@ -58,4 +58,16 @@ target_link_libraries(embedding_cache_test
$<$<BOOL:${USE_NPU}>:ascendcl>
$<$<BOOL:${USE_NPU}>:hccl>
$<$<BOOL:${USE_NPU}>:c_sec>
$<$<BOOL:${USE_NPU}>:nnopbase>)
$<$<BOOL:${USE_NPU}>:nnopbase>)

if(USE_NPU OR USE_MLU)
cc_test(
NAME
mooncake_transfer_engine_test
SRCS
mooncake_transfer_engine_test.cpp
DEPS
:kv_cache
GTest::gtest_main
)
endif()
20 changes: 17 additions & 3 deletions xllm/core/framework/kv_cache/kv_cache_transfer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ limitations under the License.

#if defined(USE_NPU)
#include <torch_npu/csrc/core/npu/NPUFormat.h>
#endif

#if defined(USE_NPU)
#include "llm_data_dist_transfer.h"
#endif

#if defined(USE_NPU) || defined(USE_MLU)
#include "mooncake_kv_cache_transfer.h"
#endif

Expand Down Expand Up @@ -56,11 +61,11 @@ folly::SemiFuture<bool> KVCacheTransfer::pull_kv_blocks_async(
return future;
}

#if defined(USE_NPU)
#if defined(USE_NPU) || defined(USE_MLU)
folly::SemiFuture<bool> KVCacheTransfer::push_kv_blocks_async(
const std::vector<TransferKVInfo>& transfer_kv_infos,
const ParallelArgs& parallel_args,
std::shared_ptr<NPULayerSynchronizerImpl> layer_synchronizer,
std::shared_ptr<KVPushSynchronizerImpl> layer_synchronizer,
bool is_spec_draft) {
folly::Promise<bool> promise;
auto future = promise.getSemiFuture();
Expand Down Expand Up @@ -245,10 +250,11 @@ std::shared_ptr<KVCacheTransfer> KVCacheTransferFactory::create(

int32_t device_id = device.index();

#if defined(USE_NPU)
#if defined(USE_NPU) || defined(USE_MLU)
LOG(INFO) << "Create KVCacheTransfer for " << transfer_type << "flag"
<< FLAGS_kv_cache_transfer_type;
if (transfer_type == "LlmDataDist") {
#if defined(USE_NPU)
transfer = std::make_shared<LlmDataDistTransfer>(device_ip,
transfer_listen_port,
instance_role,
Expand All @@ -259,8 +265,12 @@ std::shared_ptr<KVCacheTransfer> KVCacheTransferFactory::create(

transfer->initialize(device_id);
transfer->allocate_kv_cache(kv_caches, num_layers, kv_cache_shape, dtype);
#else
LOG(FATAL) << "LlmDataDist is not supported on MLU backend.";
#endif
} else if (transfer_type == "Mooncake") {
std::shared_ptr<MooncakeKVCacheTransferBase> mooncake_transfer;
#if defined(USE_NPU)
if (FLAGS_enable_xtensor) {
auto xtensor_transfer = std::make_shared<MooncakeKVCacheTransferXTensor>(
device_id, transfer_listen_port, device);
Expand All @@ -275,6 +285,10 @@ std::shared_ptr<KVCacheTransfer> KVCacheTransferFactory::create(
mooncake_transfer = std::make_shared<MooncakeKVCacheTransferDefault>(
device_id, transfer_listen_port, device, model_type);
}
#else
mooncake_transfer = std::make_shared<MooncakeKVCacheTransferDefault>(
device_id, transfer_listen_port, device, model_type);
#endif

mooncake_transfer->initialize(device_id);
mooncake_transfer->allocate_kv_cache(
Expand Down
18 changes: 14 additions & 4 deletions xllm/core/framework/kv_cache/kv_cache_transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,21 @@ limitations under the License.
#if defined(USE_NPU)
#include "platform/npu/npu_layer_synchronizer.h"
#endif
#if defined(USE_MLU)
#include "platform/mlu/mlu_layer_synchronizer.h"
#endif
#include "framework/parallel_state/parallel_args.h"
#include "platform/device.h"
#include "util/threadpool.h"

namespace xllm {

#if defined(USE_NPU)
using KVPushSynchronizerImpl = NPULayerSynchronizerImpl;
#elif defined(USE_MLU)
using KVPushSynchronizerImpl = MLULayerSynchronizerImpl;
#endif

class KVCacheTransfer {
public:
struct KVCacheInfo {
Expand Down Expand Up @@ -101,11 +111,11 @@ class KVCacheTransfer {
const std::vector<uint64_t>& src_blocks,
const std::vector<uint64_t>& dst_blocks);

#if defined(USE_NPU)
#if defined(USE_NPU) || defined(USE_MLU)
virtual folly::SemiFuture<bool> push_kv_blocks_async(
const std::vector<TransferKVInfo>& transfer_kv_infos,
const ParallelArgs& parallel_args,
std::shared_ptr<NPULayerSynchronizerImpl> layer_synchronizer,
std::shared_ptr<KVPushSynchronizerImpl> layer_synchronizer,
bool is_spec_draft);
#endif

Expand All @@ -114,10 +124,10 @@ class KVCacheTransfer {
const std::vector<TransferKVInfo>& transfer_kv_infos,
const ParallelArgs& parallel_args);

#if defined(USE_NPU)
#if defined(USE_NPU) || defined(USE_MLU)
virtual bool push_kv_blocks(
std::unordered_map<std::string, KVCacheInfo>& merged_kv_infos,
std::shared_ptr<NPULayerSynchronizerImpl>& layer_synchronizer,
std::shared_ptr<KVPushSynchronizerImpl>& layer_synchronizer,
bool is_spec_draft) = 0;
#endif

Expand Down
124 changes: 100 additions & 24 deletions xllm/core/framework/kv_cache/mooncake_kv_cache_transfer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.

#include <glog/logging.h>

#include <numeric>

#if defined(USE_NPU)
#ifdef TORCH_HIGHER_THAN_PTA6
#include <torch_npu/csrc/core/npu/NPUFormat.h>
Expand Down Expand Up @@ -44,6 +46,7 @@ MooncakeKVCacheTransferBase::MooncakeKVCacheTransferBase(
const torch::Device& device,
std::unique_ptr<MooncakeTransferEngine> engine)
: device_id_(device_id),
device_(device),
listen_port_(listen_port),
mooncake_te_(std::move(engine)) {
std::string instance_ip = net::get_local_ip_addr();
Expand Down Expand Up @@ -119,10 +122,18 @@ void MooncakeKVCacheTransferDefault::register_kv_cache(
const std::vector<std::vector<int64_t>>& kv_cache_shape,
torch::ScalarType dtype) {
num_layers_ = kv_caches.size();
if (!kv_caches.empty()) {
torch::Tensor value_cache = kv_caches[0].get_v_cache();
torch::Tensor index_cache = kv_caches[0].get_index_cache();
has_v_cache_ = value_cache.defined() && value_cache.numel() > 0;
has_index_cache_ = index_cache.defined() && index_cache.numel() > 0;
}
buf_cnt_per_layer_ = 1 + static_cast<int64_t>(has_v_cache_) +
static_cast<int64_t>(has_index_cache_);

int64_t data_size = torch::scalarTypeToTypeMeta(dtype).itemsize();
int64_t count_per_block = 1;
for (int32_t i = 1; i < kv_cache_shape[0].size(); ++i) {
for (size_t i = 1; i < kv_cache_shape[0].size(); ++i) {
count_per_block *= kv_cache_shape[0][i];
}
size_per_block_ = count_per_block * data_size;
Expand All @@ -135,6 +146,26 @@ void MooncakeKVCacheTransferDefault::allocate_kv_cache_impl(
int64_t num_layers,
const std::vector<std::vector<int64_t>>& kv_cache_shape,
torch::ScalarType dtype) {
#if defined(USE_MLU)
torch::TensorOptions options =
torch::TensorOptions().dtype(dtype).device(device_);
for (int64_t i = 0; i < num_layers; ++i) {
torch::Tensor key_cache = torch::zeros(kv_cache_shape[0], options);
torch::Tensor value_cache;
torch::Tensor index_cache;
if (kv_cache_shape.size() > 1 && !kv_cache_shape[1].empty()) {
value_cache = torch::zeros(kv_cache_shape[1], options);
}
if (kv_cache_shape.size() > 2 && !kv_cache_shape[2].empty()) {
index_cache = torch::zeros(kv_cache_shape[2], options);
}
if (index_cache.defined()) {
kv_caches.emplace_back(key_cache, value_cache, index_cache);
} else {
kv_caches.emplace_back(key_cache, value_cache);
}
}
#else
// Original mode: allocate device memory using aclrtMalloc
// calculate the size of kv cache for each layer
auto data_size = torch::elementSize(dtype);
Expand Down Expand Up @@ -190,36 +221,78 @@ void MooncakeKVCacheTransferDefault::allocate_kv_cache_impl(
value_cache = v_torch_tensors[i];
kv_caches.emplace_back(key_cache, value_cache);
}
#endif
}

void MooncakeKVCacheTransferDefault::register_kv_cache_impl(
std::vector<xllm::KVCache>& kv_caches) {
// Default mode registers each layer's K/V cache tensor buffers directly.
int64_t num_cache = num_layers_ * 2;
void MooncakeKVCacheTransferDefault::add_buf(
const torch::Tensor& tensor,
std::vector<void*>& addrs,
std::vector<size_t>& lens,
std::vector<uint64_t>& buf_bytes) const {
if (!tensor.defined() || tensor.numel() == 0) {
return;
}

CHECK_GT(tensor.dim(), 0) << "cache tensor dim must be positive";
int64_t block_cnt = tensor.size(0);
CHECK_GT(block_cnt, 0) << "cache tensor block dim must be positive";

std::vector<void*> cache_addrs;
std::vector<size_t> cache_lens;
cache_addrs.reserve(num_cache);
cache_lens.reserve(num_cache);
addrs.emplace_back(tensor.data_ptr());
lens.emplace_back(static_cast<size_t>(tensor.nbytes()));
buf_bytes.emplace_back(static_cast<uint64_t>(tensor.nbytes() / block_cnt));
}

std::vector<int64_t> MooncakeKVCacheTransferDefault::get_buf_ids(
const std::vector<int64_t>& layer_ids) const {
std::vector<int64_t> active_layer_ids;
if (layer_ids.empty()) {
active_layer_ids.resize(static_cast<size_t>(num_layers_));
std::iota(active_layer_ids.begin(), active_layer_ids.end(), 0);
} else {
active_layer_ids = layer_ids;
}

for (int32_t i = 0; i < num_layers_; ++i) {
cache_addrs.emplace_back(kv_caches[i].get_k_cache().data_ptr());
cache_lens.emplace_back(kv_caches[i].get_k_cache().nbytes());
std::vector<int64_t> buf_ids;
buf_ids.reserve(active_layer_ids.size() *
static_cast<size_t>(buf_cnt_per_layer_));
for (int64_t layer_id : active_layer_ids) {
CHECK_GE(layer_id, 0) << "layer_id must be non-negative";
CHECK_LT(layer_id, num_layers_) << "layer_id out of range";

int64_t buf_id = layer_id * buf_cnt_per_layer_;
buf_ids.emplace_back(buf_id++);
if (has_v_cache_) {
buf_ids.emplace_back(buf_id++);
}
if (has_index_cache_) {
buf_ids.emplace_back(buf_id);
}
}
return buf_ids;
}

for (int32_t i = 0; i < num_layers_; ++i) {
cache_addrs.emplace_back(kv_caches[i].get_v_cache().data_ptr());
cache_lens.emplace_back(kv_caches[i].get_v_cache().nbytes());
void MooncakeKVCacheTransferDefault::register_kv_cache_impl(
std::vector<xllm::KVCache>& kv_caches) {
std::vector<void*> addrs;
std::vector<size_t> lens;
std::vector<uint64_t> buf_bytes;
addrs.reserve(static_cast<size_t>(num_layers_) * 3);
lens.reserve(static_cast<size_t>(num_layers_) * 3);
buf_bytes.reserve(static_cast<size_t>(num_layers_) * 3);

for (int64_t i = 0; i < num_layers_; ++i) {
add_buf(kv_caches[i].get_k_cache(), addrs, lens, buf_bytes);
add_buf(kv_caches[i].get_v_cache(), addrs, lens, buf_bytes);
add_buf(kv_caches[i].get_index_cache(), addrs, lens, buf_bytes);
}

if (!mooncake_te_->register_memory(
cache_addrs, cache_lens, size_per_block_)) {
if (!mooncake_te_->register_memory(addrs, lens, buf_bytes)) {
LOG(ERROR) << "register_kv_cache_impl failed";
return;
}

LOG(INFO) << "register_kv_cache_impl success, num_layers=" << num_layers_
<< ", size_per_block=" << size_per_block_;
<< ", buffers=" << buf_bytes.size();
}

bool MooncakeKVCacheTransferDefault::pull_kv_blocks(
Expand All @@ -233,8 +306,9 @@ bool MooncakeKVCacheTransferDefault::pull_kv_blocks(
(void)src_k_cache_id;
(void)src_v_cache_id;
std::vector<int64_t> layer_ids;
std::vector<int64_t> buf_ids = get_buf_ids(layer_ids);
auto ret = mooncake_te_->pull_memory_blocks(
src_addr, src_blocks, dst_blocks, layer_ids);
src_addr, src_blocks, dst_blocks, buf_ids);
if (!ret) {
LOG(ERROR) << "Pull kv cache blocks failed, ret = " << ret;
return false;
Expand All @@ -244,16 +318,17 @@ bool MooncakeKVCacheTransferDefault::pull_kv_blocks(

bool MooncakeKVCacheTransferDefault::push_kv_blocks(
std::unordered_map<std::string, KVCacheInfo>& merged_kv_infos,
std::shared_ptr<NPULayerSynchronizerImpl>& layer_synchronizer,
std::shared_ptr<KVPushSynchronizerImpl>& layer_synchronizer,
bool is_spec_draft) {
(void)is_spec_draft;
for (int64_t layer_index = 0; layer_index < num_layers_; ++layer_index) {
layer_synchronizer->synchronize_layer(layer_index);
for (const auto& pair : merged_kv_infos) {
std::vector<int64_t> layer_ids = {layer_index};
std::vector<int64_t> buf_ids = get_buf_ids(layer_ids);
const KVCacheInfo& kv_info = pair.second;
auto ret = mooncake_te_->push_memory_blocks(
kv_info.dst_addr, kv_info.src_blocks, kv_info.dst_blocks, layer_ids);
kv_info.dst_addr, kv_info.src_blocks, kv_info.dst_blocks, buf_ids);
if (!ret) {
LOG(ERROR) << "Push kv blocks failed, layer = " << layer_index
<< ", ret = " << ret;
Expand Down Expand Up @@ -347,8 +422,9 @@ void MooncakeKVCacheTransferXTensor::register_kv_cache_impl() {

std::vector<void*> addrs = {global_xtensor.base_vaddr()};
std::vector<size_t> lens = {global_xtensor.total_size()};
std::vector<uint64_t> buf_bytes = {static_cast<uint64_t>(size_per_block_)};

if (!mooncake_te_->register_memory(addrs, lens, size_per_block_)) {
if (!mooncake_te_->register_memory(addrs, lens, buf_bytes)) {
LOG(ERROR) << "register GlobalXTensor failed";
return;
}
Expand All @@ -375,7 +451,7 @@ bool MooncakeKVCacheTransferXTensor::pull_kv_blocks(

bool MooncakeKVCacheTransferXTensor::push_kv_blocks(
std::unordered_map<std::string, KVCacheInfo>& merged_kv_infos,
std::shared_ptr<NPULayerSynchronizerImpl>& layer_synchronizer,
std::shared_ptr<KVPushSynchronizerImpl>& layer_synchronizer,
bool is_spec_draft) {
(void)is_spec_draft;
return push_kv_blocks_impl(merged_kv_infos, layer_synchronizer);
Expand Down Expand Up @@ -446,7 +522,7 @@ bool MooncakeKVCacheTransferXTensor::pull_kv_blocks_impl(

bool MooncakeKVCacheTransferXTensor::push_kv_blocks_impl(
std::unordered_map<std::string, KVCacheInfo>& merged_kv_infos,
std::shared_ptr<NPULayerSynchronizerImpl>& layer_synchronizer) {
std::shared_ptr<KVPushSynchronizerImpl>& layer_synchronizer) {
if (model_id_.empty()) {
LOG(ERROR) << "model_id not set for XTensor mode push";
return false;
Expand Down
Loading
Loading