Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ bool CacheAwareRouting::select_instances_pair(
if (!request->token_ids.empty()) {
Slice<int32_t> token_ids(request->token_ids.data(),
request->token_ids.size());
global_kvcache_mgr_->match(token_ids, &lb_infos.overlap_scores);
instance_mgr_->kvcache_match(token_ids, &lb_infos.overlap_scores);
DLOG(INFO) << lb_infos.debug_string();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@ limitations under the License.

#include "common/macros.h"
#include "loadbalance_policy.h"
#include "scheduler/managers/global_kvcache_mgr.h"

namespace xllm_service {

class CacheAwareRouting final : public LoadBalancePolicy {
public:
CacheAwareRouting(std::shared_ptr<InstanceMgr> instance_mgr,
std::shared_ptr<GlobalKVCacheMgr> global_kvcache_mgr)
: global_kvcache_mgr_(global_kvcache_mgr),
LoadBalancePolicy(instance_mgr) {};
explicit CacheAwareRouting(std::shared_ptr<InstanceMgr> instance_mgr)
: LoadBalancePolicy(instance_mgr) {}

virtual ~CacheAwareRouting() = default;

Expand All @@ -41,8 +38,6 @@ class CacheAwareRouting final : public LoadBalancePolicy {
const std::unordered_map<std::string, LoadMetrics>& load_metrics,
const int64_t& max_waiting_requests_num,
std::string* best_choice);

std::shared_ptr<GlobalKVCacheMgr> global_kvcache_mgr_;
};

} // namespace xllm_service
37 changes: 36 additions & 1 deletion xllm_service/scheduler/loadbalance_policy/round_robin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,45 @@ limitations under the License.

#include "round_robin.h"

#include <glog/logging.h>

#include <vector>

namespace xllm_service {

bool SelectRoutingRoundRobin(const std::shared_ptr<InstanceMgr>& instance_mgr,
uint64_t* next_prefill_index,
uint64_t* next_decode_index,
Routing* routing) {
const std::vector<std::string> prefill =
instance_mgr->get_schedulable_prefill_instances();
const std::vector<std::string> decode =
instance_mgr->get_schedulable_decode_instances();

if (prefill.empty()) {
LOG(ERROR) << "No prefill or default instance found!";
return false;
}

if (decode.empty()) {
LOG(ERROR) << "No decode or default instance found!";
return false;
}

*next_prefill_index = *next_prefill_index % prefill.size();
routing->prefill_name = prefill[*next_prefill_index];
(*next_prefill_index)++;
*next_decode_index = *next_decode_index % decode.size();
routing->decode_name = decode[*next_decode_index];
(*next_decode_index)++;
return true;
Comment thread
magicheng0816 marked this conversation as resolved.
}

bool RoundRobin::select_instances_pair(std::shared_ptr<Request> request) {
return instance_mgr_->get_next_instance_pair(&request->routing);
return SelectRoutingRoundRobin(instance_mgr_,
&next_prefill_index_,
&next_decode_index_,
&request->routing);
}

} // namespace xllm_service
14 changes: 14 additions & 0 deletions xllm_service/scheduler/loadbalance_policy/round_robin.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ limitations under the License.

#pragma once

#include <cstdint>
#include <memory>

#include "common/macros.h"
#include "common/types.h"
#include "loadbalance_policy.h"

namespace xllm_service {
Expand All @@ -31,6 +35,16 @@ class RoundRobin final : public LoadBalancePolicy {

private:
DISALLOW_COPY_AND_ASSIGN(RoundRobin);

uint64_t next_prefill_index_ = 0;
uint64_t next_decode_index_ = 0;
};

// Shared round-robin selection over get_schedulable_* lists (used by RoundRobin
// and SloAwarePolicy when token_ids is empty).
bool SelectRoutingRoundRobin(const std::shared_ptr<InstanceMgr>& instance_mgr,
uint64_t* next_prefill_index,
uint64_t* next_decode_index,
Routing* routing);

} // namespace xllm_service
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ limitations under the License.

#include "slo_aware_policy.h"

#include <glog/logging.h>

#include "common/global_gflags.h"
#include "round_robin.h"

namespace xllm_service {

Expand All @@ -25,7 +28,10 @@ SloAwarePolicy::SloAwarePolicy(const Options& options,

bool SloAwarePolicy::select_instances_pair(std::shared_ptr<Request> request) {
if (request->token_ids.empty()) {
return instance_mgr_->get_next_instance_pair(&request->routing);
return SelectRoutingRoundRobin(instance_mgr_,
&round_robin_next_prefill_index_,
&round_robin_next_decode_index_,
&request->routing);
}

// select instances pair based on slo
Expand Down
4 changes: 4 additions & 0 deletions xllm_service/scheduler/loadbalance_policy/slo_aware_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ limitations under the License.

#pragma once

#include <cstdint>

#include "common/options.h"
#include "common/types.h"
#include "loadbalance_policy.h"
Expand All @@ -34,6 +36,8 @@ class SloAwarePolicy final : public LoadBalancePolicy {
DISALLOW_COPY_AND_ASSIGN(SloAwarePolicy);

Options options_;
uint64_t round_robin_next_prefill_index_ = 0;
uint64_t round_robin_next_decode_index_ = 0;
};

} // namespace xllm_service
8 changes: 6 additions & 2 deletions xllm_service/scheduler/managers/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ cc_library(
managers
HDRS
instance_mgr.h
global_kvcache_mgr.h
instance_metrics.h
instance_topology.h
instance_kvcache.h
SRCS
instance_mgr.cpp
global_kvcache_mgr.cpp
instance_metrics.cpp
instance_topology.cpp
instance_kvcache.cpp
DEPS
:chat_template
:common
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#include "global_kvcache_mgr.h"
#include "instance_kvcache.h"

#include <glog/logging.h>
#include <nlohmann/json.hpp>

#include "common/hash_util.h"
Expand All @@ -29,15 +30,14 @@ std::string ETCD_CACHE_PREFIX = "XLLM:CACHE:";

namespace xllm_service {

GlobalKVCacheMgr::GlobalKVCacheMgr(
const Options& options,
const std::shared_ptr<EtcdClient>& etcd_client,
const bool is_master_service)
InstanceKVCache::InstanceKVCache(const Options& options,
const std::shared_ptr<EtcdClient>& etcd_client,
const bool is_master_service)
: options_(options),
is_master_service_(is_master_service),
etcd_client_(etcd_client) {
if (!is_master_service_) {
auto handle_kvcache = std::bind(&GlobalKVCacheMgr::update_kvcache,
auto handle_kvcache = std::bind(&InstanceKVCache::update_kvcache,
this,
std::placeholders::_1,
std::placeholders::_2);
Expand All @@ -51,7 +51,7 @@ GlobalKVCacheMgr::GlobalKVCacheMgr(
}
}

GlobalKVCacheMgr::~GlobalKVCacheMgr() {
InstanceKVCache::~InstanceKVCache() {
exited_ = true;
etcd_client_->remove_watch(ETCD_CACHE_PREFIX);
}
Expand All @@ -70,9 +70,8 @@ void set_score(const std::unordered_set<std::string>& instance_names,
}
}

void GlobalKVCacheMgr::match(const Slice<int32_t>& token_ids,
OverlapScores* overlap_scores) {
// allign tokens to block boundary
void InstanceKVCache::match(const Slice<int32_t>& token_ids,
OverlapScores* overlap_scores) {
const size_t n_tokens = round_down(token_ids.size(), options_.block_size());
if (n_tokens == 0) {
return;
Expand Down Expand Up @@ -130,8 +129,8 @@ void GlobalKVCacheMgr::match(const Slice<int32_t>& token_ids,
}
}

void GlobalKVCacheMgr::update_kvcache(const etcd::Response& response,
const uint64_t prefix_len) {
void InstanceKVCache::update_kvcache(const etcd::Response& response,
const uint64_t prefix_len) {
if (response.events().empty() || exited_) {
return;
}
Expand Down Expand Up @@ -174,7 +173,7 @@ void GlobalKVCacheMgr::update_kvcache(const etcd::Response& response,
});
}

void GlobalKVCacheMgr::record_updated_kvcaches(
void InstanceKVCache::record_updated_kvcaches(
const std::string& instance_name,
const proto::KvCacheEvent& kvcache_event) {
std::lock_guard<std::mutex> update_lock(update_mutex_);
Expand Down Expand Up @@ -224,7 +223,7 @@ void GlobalKVCacheMgr::record_updated_kvcaches(
}
}

bool GlobalKVCacheMgr::upload_kvcache() {
bool InstanceKVCache::upload_kvcache() {
std::lock_guard<std::mutex> update_lock(update_mutex_);
if (updated_kvcaches_.empty()) {
return true;
Expand All @@ -246,7 +245,7 @@ bool GlobalKVCacheMgr::upload_kvcache() {
return rt;
}

void GlobalKVCacheMgr::set_as_master() {
void InstanceKVCache::set_as_master() {
is_master_service_ = true;
etcd_client_->remove_watch(ETCD_CACHE_PREFIX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@ limitations under the License.
#pragma once

#include <shared_mutex>
#include <thread>

#include "../etcd_client/etcd_client.h"
#include "common/hash_util.h"
#include "common/macros.h"
#include "common/options.h"
#include "common/slice.h"
#include "common/threadpool.h"
#include "common/types.h"
#include "scheduler/etcd_client/etcd_client.h"
#include "xllm_rpc_service.pb.h"

namespace xllm_service {

class GlobalKVCacheMgr final {
// Per-instance KV cache block locations aggregated via etcd (internal to
// InstanceMgr; do not use directly from Scheduler).
class InstanceKVCache final {
public:
explicit GlobalKVCacheMgr(const Options& options,
const std::shared_ptr<EtcdClient>& etcd_client,
const bool is_master_service);
~GlobalKVCacheMgr();
explicit InstanceKVCache(const Options& options,
const std::shared_ptr<EtcdClient>& etcd_client,
const bool is_master_service);
~InstanceKVCache();

void match(const Slice<int32_t>& token_ids, OverlapScores* overlap_scores);

Expand All @@ -45,18 +46,17 @@ class GlobalKVCacheMgr final {
void set_as_master();

private:
DISALLOW_COPY_AND_ASSIGN(GlobalKVCacheMgr);
DISALLOW_COPY_AND_ASSIGN(InstanceKVCache);

void update_kvcache(const etcd::Response& response,
const uint64_t prefix_len);

private:
Options options_;
std::atomic_bool is_master_service_ = false;
bool exited_ = false;
std::shared_mutex kvcache_mutex_;
XXH3KeyCacheMap kvcache_infos_;
std::shared_ptr<EtcdClient> etcd_client_; // not own
std::shared_ptr<EtcdClient> etcd_client_;

std::mutex update_mutex_;
XXH3KeyCacheMap updated_kvcaches_;
Expand Down
Loading
Loading