diff --git a/xllm_service/scheduler/loadbalance_policy/cache_aware_routing.cpp b/xllm_service/scheduler/loadbalance_policy/cache_aware_routing.cpp index c7d8e5e..b429d51 100644 --- a/xllm_service/scheduler/loadbalance_policy/cache_aware_routing.cpp +++ b/xllm_service/scheduler/loadbalance_policy/cache_aware_routing.cpp @@ -25,7 +25,7 @@ bool CacheAwareRouting::select_instances_pair( if (!request->token_ids.empty()) { Slice token_ids(request->token_ids.data(), request->token_ids.size()); - global_kvcache_mgr_->match(token_ids, &lb_infos.overlap_scores); + instance_mgr_->kvcache_match(token_ids, &lb_infos.overlap_scores); DLOG(INFO) << lb_infos.debug_string(); } diff --git a/xllm_service/scheduler/loadbalance_policy/cache_aware_routing.h b/xllm_service/scheduler/loadbalance_policy/cache_aware_routing.h index 5eb9999..2200efa 100644 --- a/xllm_service/scheduler/loadbalance_policy/cache_aware_routing.h +++ b/xllm_service/scheduler/loadbalance_policy/cache_aware_routing.h @@ -17,16 +17,13 @@ limitations under the License. #include "common/macros.h" #include "loadbalance_policy.h" -#include "scheduler/managers/global_kvcache_mgr.h" namespace xllm_service { class CacheAwareRouting final : public LoadBalancePolicy { public: - CacheAwareRouting(std::shared_ptr instance_mgr, - std::shared_ptr global_kvcache_mgr) - : global_kvcache_mgr_(global_kvcache_mgr), - LoadBalancePolicy(instance_mgr) {}; + explicit CacheAwareRouting(std::shared_ptr instance_mgr) + : LoadBalancePolicy(instance_mgr) {} virtual ~CacheAwareRouting() = default; @@ -41,8 +38,6 @@ class CacheAwareRouting final : public LoadBalancePolicy { const std::unordered_map& load_metrics, const int64_t& max_waiting_requests_num, std::string* best_choice); - - std::shared_ptr global_kvcache_mgr_; }; } // namespace xllm_service diff --git a/xllm_service/scheduler/loadbalance_policy/round_robin.cpp b/xllm_service/scheduler/loadbalance_policy/round_robin.cpp index e667d45..b78b23e 100644 --- a/xllm_service/scheduler/loadbalance_policy/round_robin.cpp +++ b/xllm_service/scheduler/loadbalance_policy/round_robin.cpp @@ -15,10 +15,45 @@ limitations under the License. #include "round_robin.h" +#include + +#include + namespace xllm_service { +bool SelectRoutingRoundRobin(const std::shared_ptr& instance_mgr, + uint64_t* next_prefill_index, + uint64_t* next_decode_index, + Routing* routing) { + const std::vector prefill = + instance_mgr->get_schedulable_prefill_instances(); + const std::vector decode = + instance_mgr->get_schedulable_decode_instances(); + + if (prefill.empty()) { + LOG(ERROR) << "No prefill or default instance found!"; + return false; + } + + if (decode.empty()) { + LOG(ERROR) << "No decode or default instance found!"; + return false; + } + + *next_prefill_index = *next_prefill_index % prefill.size(); + routing->prefill_name = prefill[*next_prefill_index]; + (*next_prefill_index)++; + *next_decode_index = *next_decode_index % decode.size(); + routing->decode_name = decode[*next_decode_index]; + (*next_decode_index)++; + return true; +} + bool RoundRobin::select_instances_pair(std::shared_ptr request) { - return instance_mgr_->get_next_instance_pair(&request->routing); + return SelectRoutingRoundRobin(instance_mgr_, + &next_prefill_index_, + &next_decode_index_, + &request->routing); } } // namespace xllm_service diff --git a/xllm_service/scheduler/loadbalance_policy/round_robin.h b/xllm_service/scheduler/loadbalance_policy/round_robin.h index bcbcc43..488a035 100644 --- a/xllm_service/scheduler/loadbalance_policy/round_robin.h +++ b/xllm_service/scheduler/loadbalance_policy/round_robin.h @@ -15,7 +15,11 @@ limitations under the License. #pragma once +#include +#include + #include "common/macros.h" +#include "common/types.h" #include "loadbalance_policy.h" namespace xllm_service { @@ -31,6 +35,16 @@ class RoundRobin final : public LoadBalancePolicy { private: DISALLOW_COPY_AND_ASSIGN(RoundRobin); + + uint64_t next_prefill_index_ = 0; + uint64_t next_decode_index_ = 0; }; +// Shared round-robin selection over get_schedulable_* lists (used by RoundRobin +// and SloAwarePolicy when token_ids is empty). +bool SelectRoutingRoundRobin(const std::shared_ptr& instance_mgr, + uint64_t* next_prefill_index, + uint64_t* next_decode_index, + Routing* routing); + } // namespace xllm_service diff --git a/xllm_service/scheduler/loadbalance_policy/slo_aware_policy.cpp b/xllm_service/scheduler/loadbalance_policy/slo_aware_policy.cpp index 3af3ceb..e57cd75 100644 --- a/xllm_service/scheduler/loadbalance_policy/slo_aware_policy.cpp +++ b/xllm_service/scheduler/loadbalance_policy/slo_aware_policy.cpp @@ -15,7 +15,10 @@ limitations under the License. #include "slo_aware_policy.h" +#include + #include "common/global_gflags.h" +#include "round_robin.h" namespace xllm_service { @@ -25,7 +28,10 @@ SloAwarePolicy::SloAwarePolicy(const Options& options, bool SloAwarePolicy::select_instances_pair(std::shared_ptr request) { if (request->token_ids.empty()) { - return instance_mgr_->get_next_instance_pair(&request->routing); + return SelectRoutingRoundRobin(instance_mgr_, + &round_robin_next_prefill_index_, + &round_robin_next_decode_index_, + &request->routing); } // select instances pair based on slo diff --git a/xllm_service/scheduler/loadbalance_policy/slo_aware_policy.h b/xllm_service/scheduler/loadbalance_policy/slo_aware_policy.h index f15c918..4ad80aa 100644 --- a/xllm_service/scheduler/loadbalance_policy/slo_aware_policy.h +++ b/xllm_service/scheduler/loadbalance_policy/slo_aware_policy.h @@ -15,6 +15,8 @@ limitations under the License. #pragma once +#include + #include "common/options.h" #include "common/types.h" #include "loadbalance_policy.h" @@ -34,6 +36,8 @@ class SloAwarePolicy final : public LoadBalancePolicy { DISALLOW_COPY_AND_ASSIGN(SloAwarePolicy); Options options_; + uint64_t round_robin_next_prefill_index_ = 0; + uint64_t round_robin_next_decode_index_ = 0; }; } // namespace xllm_service \ No newline at end of file diff --git a/xllm_service/scheduler/managers/CMakeLists.txt b/xllm_service/scheduler/managers/CMakeLists.txt index e6a77f7..32b3876 100644 --- a/xllm_service/scheduler/managers/CMakeLists.txt +++ b/xllm_service/scheduler/managers/CMakeLists.txt @@ -6,10 +6,14 @@ cc_library( managers HDRS instance_mgr.h - global_kvcache_mgr.h + instance_metrics.h + instance_topology.h + instance_kvcache.h SRCS instance_mgr.cpp - global_kvcache_mgr.cpp + instance_metrics.cpp + instance_topology.cpp + instance_kvcache.cpp DEPS :chat_template :common diff --git a/xllm_service/scheduler/managers/global_kvcache_mgr.cpp b/xllm_service/scheduler/managers/instance_kvcache.cpp similarity index 91% rename from xllm_service/scheduler/managers/global_kvcache_mgr.cpp rename to xllm_service/scheduler/managers/instance_kvcache.cpp index c0e4493..7cf9b27 100644 --- a/xllm_service/scheduler/managers/global_kvcache_mgr.cpp +++ b/xllm_service/scheduler/managers/instance_kvcache.cpp @@ -13,8 +13,9 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#include "global_kvcache_mgr.h" +#include "instance_kvcache.h" +#include #include #include "common/hash_util.h" @@ -29,15 +30,14 @@ std::string ETCD_CACHE_PREFIX = "XLLM:CACHE:"; namespace xllm_service { -GlobalKVCacheMgr::GlobalKVCacheMgr( - const Options& options, - const std::shared_ptr& etcd_client, - const bool is_master_service) +InstanceKVCache::InstanceKVCache(const Options& options, + const std::shared_ptr& etcd_client, + const bool is_master_service) : options_(options), is_master_service_(is_master_service), etcd_client_(etcd_client) { if (!is_master_service_) { - auto handle_kvcache = std::bind(&GlobalKVCacheMgr::update_kvcache, + auto handle_kvcache = std::bind(&InstanceKVCache::update_kvcache, this, std::placeholders::_1, std::placeholders::_2); @@ -51,7 +51,7 @@ GlobalKVCacheMgr::GlobalKVCacheMgr( } } -GlobalKVCacheMgr::~GlobalKVCacheMgr() { +InstanceKVCache::~InstanceKVCache() { exited_ = true; etcd_client_->remove_watch(ETCD_CACHE_PREFIX); } @@ -70,9 +70,8 @@ void set_score(const std::unordered_set& instance_names, } } -void GlobalKVCacheMgr::match(const Slice& token_ids, - OverlapScores* overlap_scores) { - // allign tokens to block boundary +void InstanceKVCache::match(const Slice& token_ids, + OverlapScores* overlap_scores) { const size_t n_tokens = round_down(token_ids.size(), options_.block_size()); if (n_tokens == 0) { return; @@ -130,8 +129,8 @@ void GlobalKVCacheMgr::match(const Slice& token_ids, } } -void GlobalKVCacheMgr::update_kvcache(const etcd::Response& response, - const uint64_t prefix_len) { +void InstanceKVCache::update_kvcache(const etcd::Response& response, + const uint64_t prefix_len) { if (response.events().empty() || exited_) { return; } @@ -174,7 +173,7 @@ void GlobalKVCacheMgr::update_kvcache(const etcd::Response& response, }); } -void GlobalKVCacheMgr::record_updated_kvcaches( +void InstanceKVCache::record_updated_kvcaches( const std::string& instance_name, const proto::KvCacheEvent& kvcache_event) { std::lock_guard update_lock(update_mutex_); @@ -224,7 +223,7 @@ void GlobalKVCacheMgr::record_updated_kvcaches( } } -bool GlobalKVCacheMgr::upload_kvcache() { +bool InstanceKVCache::upload_kvcache() { std::lock_guard update_lock(update_mutex_); if (updated_kvcaches_.empty()) { return true; @@ -246,7 +245,7 @@ bool GlobalKVCacheMgr::upload_kvcache() { return rt; } -void GlobalKVCacheMgr::set_as_master() { +void InstanceKVCache::set_as_master() { is_master_service_ = true; etcd_client_->remove_watch(ETCD_CACHE_PREFIX); } diff --git a/xllm_service/scheduler/managers/global_kvcache_mgr.h b/xllm_service/scheduler/managers/instance_kvcache.h similarity index 76% rename from xllm_service/scheduler/managers/global_kvcache_mgr.h rename to xllm_service/scheduler/managers/instance_kvcache.h index 5241fb8..aec49d4 100644 --- a/xllm_service/scheduler/managers/global_kvcache_mgr.h +++ b/xllm_service/scheduler/managers/instance_kvcache.h @@ -16,25 +16,26 @@ limitations under the License. #pragma once #include -#include -#include "../etcd_client/etcd_client.h" #include "common/hash_util.h" #include "common/macros.h" #include "common/options.h" #include "common/slice.h" #include "common/threadpool.h" #include "common/types.h" +#include "scheduler/etcd_client/etcd_client.h" #include "xllm_rpc_service.pb.h" namespace xllm_service { -class GlobalKVCacheMgr final { +// Per-instance KV cache block locations aggregated via etcd (internal to +// InstanceMgr; do not use directly from Scheduler). +class InstanceKVCache final { public: - explicit GlobalKVCacheMgr(const Options& options, - const std::shared_ptr& etcd_client, - const bool is_master_service); - ~GlobalKVCacheMgr(); + explicit InstanceKVCache(const Options& options, + const std::shared_ptr& etcd_client, + const bool is_master_service); + ~InstanceKVCache(); void match(const Slice& token_ids, OverlapScores* overlap_scores); @@ -45,18 +46,17 @@ class GlobalKVCacheMgr final { void set_as_master(); private: - DISALLOW_COPY_AND_ASSIGN(GlobalKVCacheMgr); + DISALLOW_COPY_AND_ASSIGN(InstanceKVCache); void update_kvcache(const etcd::Response& response, const uint64_t prefix_len); - private: Options options_; std::atomic_bool is_master_service_ = false; bool exited_ = false; std::shared_mutex kvcache_mutex_; XXH3KeyCacheMap kvcache_infos_; - std::shared_ptr etcd_client_; // not own + std::shared_ptr etcd_client_; std::mutex update_mutex_; XXH3KeyCacheMap updated_kvcaches_; diff --git a/xllm_service/scheduler/managers/instance_metrics.cpp b/xllm_service/scheduler/managers/instance_metrics.cpp new file mode 100644 index 0000000..4d888a0 --- /dev/null +++ b/xllm_service/scheduler/managers/instance_metrics.cpp @@ -0,0 +1,382 @@ +/* Copyright 2025 The xLLM Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://github.com/jd-opensource/xllm-service/blob/main/LICENSE + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "instance_metrics.h" + +#include + +#include +#include +#include +#include +#include + +#include "scheduler/managers/instance_topology.h" + +namespace { +constexpr const char* kEtcdLoadMetricsPrefix = "XLLM:LOADMETRICS:"; + +bool is_instance_schedulable(const xllm_service::InstanceMetaInfo& info) { + return info.runtime_state != xllm_service::InstanceRuntimeState::SUSPECT; +} +} // namespace + +namespace xllm_service { + +InstanceMetricsImpl::InstanceMetricsImpl( + const Options& options, + const std::shared_ptr& etcd_client, + bool is_master_service) + : options_(options), + etcd_client_(etcd_client), + is_master_service_(is_master_service) { + if (!is_master_service_) { + auto handle_load_metrics = + std::bind(&InstanceMetricsImpl::update_load_metrics, + this, + std::placeholders::_1, + std::placeholders::_2); + etcd_client_->add_watch(kEtcdLoadMetricsPrefix, handle_load_metrics); + } +} + +InstanceMetricsImpl::~InstanceMetricsImpl() { shutdown(); } + +void InstanceMetricsImpl::set_topology(InstanceTopologyImpl* topology) { + topology_impl_ = topology; +} + +void InstanceMetricsImpl::load_initial_load_metrics_from_etcd() { + std::unordered_map loaded_metrics; + etcd_client_->get_prefix(kEtcdLoadMetricsPrefix, &loaded_metrics); + std::unique_lock lock(metrics_mutex_); + load_metrics_ = std::move(loaded_metrics); +} + +void InstanceMetricsImpl::shutdown() { + exited_.store(true, std::memory_order_release); +} + +void InstanceMetricsImpl::get_load_metrics(LoadBalanceInfos* infos, + const TopologySnapshot& topology) { + std::shared_lock metric_lock(metrics_mutex_); + + const auto& instances = topology.instances; + + for (auto name : infos->overlap_scores.instances) { + auto it = load_metrics_.find(name); + if (it == load_metrics_.end()) { + continue; + } + auto instance_it = instances.find(name); + if (instance_it == instances.end() || + !is_instance_schedulable(instance_it->second)) { + continue; + } + + if (instance_it->second.type == InstanceType::DECODE) { + infos->decode_load_metrics.insert(std::make_pair(name, it->second)); + infos->decode_max_waiting_requests_num = + std::max(infos->decode_max_waiting_requests_num, + it->second.waiting_requests_num); + } else { + infos->prefill_load_metrics.insert(std::make_pair(name, it->second)); + infos->prefill_max_waiting_requests_num = + std::max(infos->prefill_max_waiting_requests_num, + it->second.waiting_requests_num); + } + } + + std::string least_loaded_prefill_instance; + float least_loaded_prefill_gpu_cache_usage_perc = 1; + std::string least_loaded_decode_instance; + float least_loaded_decode_gpu_cache_usage_perc = 1; + + if (infos->prefill_load_metrics.size() == 0 || + infos->decode_load_metrics.size() == 0) { + for (const auto& metric : load_metrics_) { + auto instance_it = instances.find(metric.first); + if (instance_it == instances.end() || + !is_instance_schedulable(instance_it->second)) { + continue; + } + if (instance_it->second.type != InstanceType::DECODE) { + if (metric.second.gpu_cache_usage_perc < + least_loaded_prefill_gpu_cache_usage_perc) { + least_loaded_prefill_gpu_cache_usage_perc = + metric.second.gpu_cache_usage_perc; + least_loaded_prefill_instance = metric.first; + } + } else { + if (metric.second.gpu_cache_usage_perc < + least_loaded_decode_gpu_cache_usage_perc) { + least_loaded_decode_gpu_cache_usage_perc = + metric.second.gpu_cache_usage_perc; + least_loaded_decode_instance = metric.first; + } + } + } + } + + if (infos->prefill_load_metrics.size() == 0 && + !least_loaded_prefill_instance.empty()) { + infos->prefill_load_metrics.insert( + std::make_pair(least_loaded_prefill_instance, + load_metrics_.at(least_loaded_prefill_instance))); + } + + if (infos->decode_load_metrics.size() == 0 && + !least_loaded_decode_instance.empty()) { + infos->decode_load_metrics.insert( + std::make_pair(least_loaded_decode_instance, + load_metrics_.at(least_loaded_decode_instance))); + } +} + +void InstanceMetricsImpl::record_load_metrics_update( + const std::string& instance_name, + const proto::LoadMetrics& load_metrics) { + std::unique_lock lock(metrics_mutex_); + + updated_metrics_.insert_or_assign( + instance_name, + LoadMetrics(load_metrics.waiting_requests_num(), + load_metrics.gpu_cache_usage_perc())); +} + +bool InstanceMetricsImpl::upload_load_metrics() { + std::unordered_map upload_snapshot; + std::unordered_set remove_snapshot; + { + std::unique_lock lk(metrics_mutex_); + for (auto& iter : updated_metrics_) { + load_metrics_.insert_or_assign(iter.first, iter.second); + } + for (auto& iter : removed_instance_) { + load_metrics_.erase(iter); + } + upload_snapshot = updated_metrics_; + remove_snapshot = removed_instance_; + updated_metrics_.clear(); + removed_instance_.clear(); + } + bool status = etcd_client_->set(kEtcdLoadMetricsPrefix, upload_snapshot); + status = status && etcd_client_->rm(kEtcdLoadMetricsPrefix, remove_snapshot); + return status; +} + +void InstanceMetricsImpl::update_latency_metrics( + const std::string& instance_name, + const proto::LatencyMetrics& latency_metrics) { + std::unique_lock lock(metrics_mutex_); + + latency_metrics_.insert_or_assign( + instance_name, + LatencyMetrics(latency_metrics.recent_max_ttft(), + latency_metrics.recent_max_tbt())); +} + +void InstanceMetricsImpl::update_request_metrics( + std::shared_ptr request, + RequestAction action) { + if (options_.load_balance_policy() != "SLO_AWARE") { + return; + } + if (topology_impl_ == nullptr) { + LOG(ERROR) << "update_request_metrics: topology not set"; + return; + } + + std::string flip_decode_name; + { + std::unique_lock lock(metrics_mutex_); + + auto prefill_it = request_metrics_.find(request->routing.prefill_name); + if (prefill_it == request_metrics_.end()) { + LOG(ERROR) << "Failed to find instance request metrics, instance name : " + << request->routing.prefill_name; + return; + } + + auto decode_it = request_metrics_.find(request->routing.decode_name); + if (decode_it == request_metrics_.end()) { + LOG(ERROR) << "Failed to find instance request metrics, instance name : " + << request->routing.decode_name; + return; + } + + int64_t num_prompt_tokens = request->token_ids.size(); + int64_t num_generated_tokens = request->num_generated_tokens; + switch (action) { + case RequestAction::SCHEDULE: + prefill_it->second.prefill_request_num += 1; + prefill_it->second.prefill_token_num += num_prompt_tokens; + + decode_it->second.decode_request_num += 1; + decode_it->second.decode_token_num += num_prompt_tokens; + break; + case RequestAction::FINISH_PREFILL: + prefill_it->second.prefill_request_num -= 1; + prefill_it->second.prefill_token_num -= num_prompt_tokens; + prefill_it->second.estimated_prefill_time -= request->estimated_ttft; + + decode_it->second.decode_token_num += 1; + break; + case RequestAction::GENERATE: + decode_it->second.decode_token_num += 1; + break; + case RequestAction::FINISH_DECODE: + decode_it->second.decode_request_num -= 1; + decode_it->second.decode_token_num -= + (num_prompt_tokens + num_generated_tokens); + + break; + case RequestAction::CANCEL: + prefill_it->second.prefill_request_num -= 1; + prefill_it->second.prefill_token_num -= num_prompt_tokens; + prefill_it->second.estimated_prefill_time -= request->estimated_ttft; + + decode_it->second.decode_request_num -= 1; + decode_it->second.decode_token_num -= + (num_prompt_tokens + num_generated_tokens); + + break; + default: + LOG(ERROR) << "Unknown RequestAction: " << static_cast(action); + break; + } + + if (decode_it->second.decode_request_num == 0) { + flip_decode_name = request->routing.decode_name; + } + } + + if (!flip_decode_name.empty()) { + topology_impl_->flip_decode_to_prefill(flip_decode_name); + } +} + +MetricsSnapshot InstanceMetricsImpl::snapshot() const { + std::shared_lock lock(metrics_mutex_); + MetricsSnapshot s; + s.load_metrics = load_metrics_; + s.request_metrics = request_metrics_; + s.latency_metrics = latency_metrics_; + return s; +} + +double InstanceMetricsImpl::predict_ttft(const std::string& instance_name, + int32_t token_len) { + std::shared_lock lock(metrics_mutex_); + auto it = time_predictors_.find(instance_name); + if (it == time_predictors_.end()) { + LOG(FATAL) << "Find TimePredictor failed, instance name : " + << instance_name; + } + return it->second.predict_ttft(token_len); +} + +double InstanceMetricsImpl::predict_tpot(const std::string& instance_name, + int32_t total_length, + int32_t batch_size) { + std::shared_lock lock(metrics_mutex_); + auto it = time_predictors_.find(instance_name); + if (it == time_predictors_.end()) { + LOG(FATAL) << "Find TimePredictor failed, instance name : " + << instance_name; + } + return it->second.predict_tpot(total_length, batch_size); +} + +TimePredictor& InstanceMetricsImpl::time_predictor_unlocked( + const std::string& instance_name) { + auto it = time_predictors_.find(instance_name); + if (it == time_predictors_.end()) { + LOG(FATAL) << "Find TimePredictor failed, instance name : " + << instance_name; + } + return it->second; +} + +void InstanceMetricsImpl::set_as_master() { + is_master_service_.store(true, std::memory_order_release); + etcd_client_->remove_watch(kEtcdLoadMetricsPrefix); +} + +void InstanceMetricsImpl::add_instance_metrics(const std::string& name, + const InstanceMetaInfo& info) { + std::unique_lock lock(metrics_mutex_); + + time_predictors_.insert_or_assign( + name, TimePredictor(info.ttft_profiling_data, info.tpot_profiling_data)); + + request_metrics_.insert_or_assign(name, RequestMetrics()); +} + +void InstanceMetricsImpl::remove_instance_metrics(const std::string& name) { + std::unique_lock lock(metrics_mutex_); + time_predictors_.erase(name); + request_metrics_.erase(name); + latency_metrics_.erase(name); + updated_metrics_.erase(name); + removed_instance_.insert(name); + load_metrics_.erase(name); +} + +void InstanceMetricsImpl::update_load_metrics(const etcd::Response& response, + const uint64_t& prefix_len) { + if (response.events().empty() || exited_.load(std::memory_order_acquire)) { + return; + } + + load_metrics_threadpool_.schedule([this, + response = std::move(response), + prefix_len = std::move(prefix_len)] { + if (exited_.load(std::memory_order_acquire)) return; + std::unordered_map put_map; + std::vector delete_list; + + for (const auto& event : response.events()) { + std::string instance_name = event.kv().key().substr(prefix_len); + + if (event.event_type() == etcd::Event::EventType::PUT) { + LoadMetrics load_metrics; + auto json_str = event.kv().as_string(); + if (!load_metrics.parse_from_json(json_str)) { + LOG(ERROR) << "pase json:" << json_str << " error!"; + continue; + } + + put_map.insert(std::make_pair(instance_name, std::move(load_metrics))); + + } else if (event.event_type() == etcd::Event::EventType::DELETE_) { + delete_list.push_back(instance_name); + } + } + + { + std::unique_lock lock(metrics_mutex_); + for (auto& iter : put_map) { + load_metrics_.insert_or_assign(iter.first, std::move(iter.second)); + } + + for (auto& iter : delete_list) { + load_metrics_.erase(iter); + } + } + }); +} + +} // namespace xllm_service diff --git a/xllm_service/scheduler/managers/instance_metrics.h b/xllm_service/scheduler/managers/instance_metrics.h new file mode 100644 index 0000000..bec48ad --- /dev/null +++ b/xllm_service/scheduler/managers/instance_metrics.h @@ -0,0 +1,160 @@ +/* Copyright 2025 The xLLM Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://github.com/jd-opensource/xllm-service/blob/main/LICENSE + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "common/macros.h" +#include "common/options.h" +#include "common/threadpool.h" +#include "common/time_predictor.h" +#include "common/types.h" +#include "request/request.h" +#include "scheduler/etcd_client/etcd_client.h" +#include "scheduler/managers/instance_topology.h" +#include "xllm_rpc_service.pb.h" + +namespace xllm_service { + +// Point-in-time copy of metrics-side state (no TimePredictor; use predict_*). +struct MetricsSnapshot { + std::unordered_map load_metrics; + std::unordered_map request_metrics; + std::unordered_map latency_metrics; +}; + +// Per-instance load / request / latency metrics, predictors, and etcd upload. +class InstanceMetrics { + public: + virtual ~InstanceMetrics() = default; + + virtual void get_load_metrics(LoadBalanceInfos* infos, + const TopologySnapshot& topology) = 0; + + virtual void record_load_metrics_update( + const std::string& instance_name, + const proto::LoadMetrics& load_metrics) = 0; + + virtual bool upload_load_metrics() = 0; + + virtual void update_latency_metrics( + const std::string& instance_name, + const proto::LatencyMetrics& latency_metrics) = 0; + + virtual void update_request_metrics(std::shared_ptr request, + RequestAction action) = 0; + + virtual MetricsSnapshot snapshot() const = 0; + + virtual double predict_ttft(const std::string& instance_name, + int32_t token_len) = 0; + + virtual double predict_tpot(const std::string& instance_name, + int32_t total_length, + int32_t batch_size) = 0; + + virtual void set_as_master() = 0; + + virtual void add_instance_metrics(const std::string& name, + const InstanceMetaInfo& info) = 0; + + virtual void remove_instance_metrics(const std::string& name) = 0; +}; + +// Default implementation (extracted from InstanceMgr). Call set_topology() +// after InstanceTopologyImpl is constructed. +class InstanceMetricsImpl final : public InstanceMetrics { + public: + InstanceMetricsImpl(const Options& options, + const std::shared_ptr& etcd_client, + bool is_master_service); + + ~InstanceMetricsImpl() override; + + void set_topology(InstanceTopologyImpl* topology); + + void load_initial_load_metrics_from_etcd(); + + void shutdown(); + + void get_load_metrics(LoadBalanceInfos* infos, + const TopologySnapshot& topology) override; + + void record_load_metrics_update( + const std::string& instance_name, + const proto::LoadMetrics& load_metrics) override; + + bool upload_load_metrics() override; + + void update_latency_metrics( + const std::string& instance_name, + const proto::LatencyMetrics& latency_metrics) override; + + void update_request_metrics(std::shared_ptr request, + RequestAction action) override; + + MetricsSnapshot snapshot() const override; + + double predict_ttft(const std::string& instance_name, + int32_t token_len) override; + + double predict_tpot(const std::string& instance_name, + int32_t total_length, + int32_t batch_size) override; + + void set_as_master() override; + + void add_instance_metrics(const std::string& name, + const InstanceMetaInfo& info) override; + + void remove_instance_metrics(const std::string& name) override; + + private: + friend class InstanceMgr; + + DISALLOW_COPY_AND_ASSIGN(InstanceMetricsImpl); + + // Caller must hold metrics_mutex_; used by InstanceMgr SLO path under + // cluster+metrics locks (predict_* would deadlock). + TimePredictor& time_predictor_unlocked(const std::string& instance_name); + + void update_load_metrics(const etcd::Response& response, + const uint64_t& prefix_len); + + Options options_; + std::shared_ptr etcd_client_; + std::atomic_bool is_master_service_; + std::atomic_bool exited_{false}; + + InstanceTopologyImpl* topology_impl_ = nullptr; + + ThreadPool load_metrics_threadpool_; + mutable std::shared_mutex metrics_mutex_; + std::unordered_map load_metrics_; + std::unordered_map updated_metrics_; + std::unordered_set removed_instance_; + std::unordered_map time_predictors_; + std::unordered_map latency_metrics_; + std::unordered_map request_metrics_; +}; + +} // namespace xllm_service diff --git a/xllm_service/scheduler/managers/instance_mgr.cpp b/xllm_service/scheduler/managers/instance_mgr.cpp index d094a81..579d430 100644 --- a/xllm_service/scheduler/managers/instance_mgr.cpp +++ b/xllm_service/scheduler/managers/instance_mgr.cpp @@ -15,104 +15,23 @@ limitations under the License. #include "instance_mgr.h" -#include -#include -#include -#include #include -#include -#include -#include +#include "instance_kvcache.h" + +#include #include -#include #include -#include -#include #include #include "common/global_gflags.h" #include "common/types.h" -#include "common/utils.h" -#include "common/xllm/output.h" -#include "common/xllm/status.h" -#include "disagg_pd.pb.h" -#include "scheduler/scheduler.h" namespace { -using xllm_service::InstanceRuntimeState; -using xllm_service::InstanceType; -std::unordered_map ETCD_KEYS_PREFIX_MAP = { - {InstanceType::DEFAULT, "XLLM:DEFAULT:"}, - {InstanceType::PREFILL, "XLLM:PREFILL:"}, - {InstanceType::DECODE, "XLLM:DECODE:"}, - {InstanceType::MIX, "XLLM:MIX:"}, -}; - -std::string ETCD_ALL_KEYS_PREFIX = "XLLM:"; -std::string ETCD_LOADMETRICS_PREFIX = "XLLM:LOADMETRICS:"; - -constexpr char kHealthPath[] = "/health"; -constexpr int64_t kDeleteProbeRetryBackoffMs = 100; - -uint64_t current_time_ms() { - return static_cast( - absl::ToInt64Milliseconds(absl::Now() - absl::UnixEpoch())); -} - bool is_instance_schedulable(const xllm_service::InstanceMetaInfo& info) { - // LEASE_LOST instances can still be reused while heartbeats continue. - return info.runtime_state != InstanceRuntimeState::SUSPECT; -} - -bool select_next_schedulable_instance( - const std::unordered_map& - instances, - const std::vector& index, - uint64_t* next_index, - std::string* instance_name) { - if (index.empty()) { - return false; - } - - const uint64_t start_index = *next_index % index.size(); - for (uint64_t offset = 0; offset < index.size(); ++offset) { - const uint64_t index_pos = (start_index + offset) % index.size(); - auto it = instances.find(index[index_pos]); - if (it == instances.end() || !is_instance_schedulable(it->second)) { - continue; - } - *instance_name = index[index_pos]; - *next_index = index_pos + 1; - return true; - } - return false; -} - -size_t count_schedulable_instances( - const std::unordered_map& - instances, - const std::vector& index) { - size_t count = 0; - for (const auto& name : index) { - auto it = instances.find(name); - if (it == instances.end() || !is_instance_schedulable(it->second)) { - continue; - } - ++count; - } - return count; + return info.runtime_state != xllm_service::InstanceRuntimeState::SUSPECT; } -InstanceType get_cleanup_type(const xllm_service::InstanceMetaInfo& info) { - if (info.type == InstanceType::DEFAULT) { - return InstanceType::PREFILL; - } - if (info.type == InstanceType::MIX) { - return info.current_type; - } - return info.type; -} } // namespace namespace xllm_service { @@ -120,1295 +39,267 @@ namespace xllm_service { InstanceMgr::InstanceMgr(const Options& options, const std::shared_ptr& etcd_client, const bool is_master_service, - Scheduler* scheduler) - : options_(options), - is_master_service_(is_master_service), - etcd_client_(etcd_client), - scheduler_(scheduler) { - auto handle_instance_metainfo = - std::bind(&InstanceMgr::update_instance_metainfo, - this, - std::placeholders::_1, - std::placeholders::_2); - for (auto& it : ETCD_KEYS_PREFIX_MAP) { - etcd_client_->add_watch(it.second, handle_instance_metainfo); - } - if (!is_master_service_) { - auto handle_load_metrics = std::bind(&InstanceMgr::update_load_metrics, - this, - std::placeholders::_1, - std::placeholders::_2); - etcd_client_->add_watch(ETCD_LOADMETRICS_PREFIX, handle_load_metrics); - } - + OnInstanceDeregisteredCallback on_instance_deregistered) + : etcd_client_(etcd_client), + metrics_impl_(std::make_unique( + options, + etcd_client, + is_master_service)), + topology_impl_(std::make_unique( + options, + etcd_client, + std::move(on_instance_deregistered), + [this](const std::string& n, const InstanceMetaInfo& i) { + metrics_impl_->add_instance_metrics(n, i); + }, + [this](const std::string& n) { + metrics_impl_->remove_instance_metrics(n); + })), + kvcache_(std::make_unique( + options, + etcd_client, + is_master_service)) { + metrics_impl_->set_topology(topology_impl_.get()); init(); - - state_reconcile_thread_ = std::make_unique( - &InstanceMgr::reconcile_instance_states, this); } void InstanceMgr::init() { - std::unordered_map loaded_instances; - for (auto& it : ETCD_KEYS_PREFIX_MAP) { - etcd_client_->get_prefix(it.second, &loaded_instances); - } - LOG(INFO) << "Load instance info from etcd:" << loaded_instances.size(); - - { - std::unique_lock lock(cluster_mutex_); - prefill_index_.reserve(loaded_instances.size()); - decode_index_.reserve(loaded_instances.size()); - } - - for (auto& pair : loaded_instances) { - if (!register_instance(pair.first, pair.second)) { - LOG(ERROR) << "Fail to register instance: " << pair.first; - } - } - - std::unordered_map loaded_metrics; - etcd_client_->get_prefix(ETCD_LOADMETRICS_PREFIX, &loaded_metrics); - { - std::unique_lock lock(metrics_mutex_); - load_metrics_ = std::move(loaded_metrics); - } - - { - std::shared_lock lock(cluster_mutex_); - for (int i = 0; i < prefill_index_.size(); i++) { - LOG(INFO) << i << " : " << prefill_index_[i]; - } - } + topology_impl_->init_from_etcd_register_all(); + metrics_impl_->load_initial_load_metrics_from_etcd(); } InstanceMgr::~InstanceMgr() { - exited_ = true; - if (state_reconcile_thread_ && state_reconcile_thread_->joinable()) { - state_reconcile_thread_->join(); - } + metrics_impl_->shutdown(); } InstanceMetaInfo InstanceMgr::get_instance_info( const std::string& instance_name) { - std::shared_lock lock(cluster_mutex_); - if (instances_.find(instance_name) == instances_.end()) { - LOG(ERROR) << "Get instance info failed, instance is not registered, " - "instance_name: " - << instance_name; - return InstanceMetaInfo(); + return topology_impl_->get_instance_info(instance_name); +} + +std::vector InstanceMgr::get_schedulable_prefill_instances() { + std::shared_lock lock(topology_impl_->cluster_mutex_); + const auto& idx = topology_impl_->prefill_index_; + const auto& inst = topology_impl_->instances_; + const bool suspect_empty = topology_impl_->suspect_instances_.empty(); + std::vector out; + out.reserve(idx.size()); + for (const auto& name : idx) { + if (suspect_empty) { + out.push_back(name); + continue; + } + auto it = inst.find(name); + if (it != inst.end() && is_instance_schedulable(it->second)) { + out.push_back(name); + } } - return instances_[instance_name]; + return out; } -bool InstanceMgr::get_next_instance_pair(Routing* routing) { - std::unique_lock lock(cluster_mutex_); - if (prefill_index_.empty()) { - LOG(ERROR) << "No prefill or default instance found!"; - return false; - } - - routing->decode_name.clear(); - if (suspect_instances_.empty()) { - // Fast path for the common case: no suspect instances, keep plain RR. - next_prefill_index_ = next_prefill_index_ % prefill_index_.size(); - routing->prefill_name = prefill_index_[next_prefill_index_]; - next_prefill_index_++; - - if (decode_index_.empty()) { - return true; +std::vector InstanceMgr::get_schedulable_decode_instances() { + std::shared_lock lock(topology_impl_->cluster_mutex_); + const auto& idx = topology_impl_->decode_index_; + const auto& inst = topology_impl_->instances_; + const bool suspect_empty = topology_impl_->suspect_instances_.empty(); + std::vector out; + out.reserve(idx.size()); + for (const auto& name : idx) { + if (suspect_empty) { + out.push_back(name); + continue; + } + auto it = inst.find(name); + if (it != inst.end() && is_instance_schedulable(it->second)) { + out.push_back(name); } - next_decode_index_ = next_decode_index_ % decode_index_.size(); - routing->decode_name = decode_index_[next_decode_index_]; - next_decode_index_++; - return true; - } - - if (!select_next_schedulable_instance(instances_, - prefill_index_, - &next_prefill_index_, - &routing->prefill_name)) { - LOG(ERROR) << "No schedulable prefill or default instance found!"; - return false; - } - - if (!decode_index_.empty()) { - select_next_schedulable_instance( - instances_, decode_index_, &next_decode_index_, &routing->decode_name); } - return true; + return out; } -// TODO: refactor later, currently return all decode instances std::vector InstanceMgr::get_static_decode_list( const std::string& instance_name) { - std::vector decode_list; - std::shared_lock lock(cluster_mutex_); - for (auto& inst : instances_) { - if (inst.second.type == InstanceType::DECODE && - is_instance_schedulable(inst.second)) { - decode_list.emplace_back(inst.second.name); - } - } - - return decode_list; + return topology_impl_->get_static_decode_list(instance_name); } -// TODO: refactor later, currently return all prefill instances std::vector InstanceMgr::get_static_prefill_list( const std::string& instance_name) { - std::vector prefill_list; - std::shared_lock lock(cluster_mutex_); - for (auto& inst : instances_) { - if ((inst.second.type == InstanceType::PREFILL || - inst.second.type == InstanceType::DEFAULT) && - is_instance_schedulable(inst.second)) { - prefill_list.emplace_back(inst.second.name); - } - } - - return prefill_list; + return topology_impl_->get_static_prefill_list(instance_name); } void InstanceMgr::get_load_metrics(LoadBalanceInfos* infos) { - std::shared_lock inst_lock(cluster_mutex_); - std::shared_lock metric_lock(metrics_mutex_); - - for (auto name : infos->overlap_scores.instances) { - auto it = load_metrics_.find(name); - if (it == load_metrics_.end()) { - continue; - } - auto instance_it = instances_.find(name); - if (instance_it == instances_.end() || - !is_instance_schedulable(instance_it->second)) { - continue; - } - - if (instance_it->second.type == InstanceType::DECODE) { - infos->decode_load_metrics.insert(std::make_pair(name, it->second)); - infos->decode_max_waiting_requests_num = - std::max(infos->decode_max_waiting_requests_num, - it->second.waiting_requests_num); - } else { - infos->prefill_load_metrics.insert(std::make_pair(name, it->second)); - infos->prefill_max_waiting_requests_num = - std::max(infos->prefill_max_waiting_requests_num, - it->second.waiting_requests_num); - } - } - - std::string least_loaded_prefill_instance; - float least_loaded_prefill_gpu_cache_usage_perc = 1; - std::string least_loaded_decode_instance; - float least_loaded_decode_gpu_cache_usage_perc = 1; - - if (infos->prefill_load_metrics.size() == 0 || - infos->decode_load_metrics.size() == 0) { - for (const auto& metric : load_metrics_) { - auto instance_it = instances_.find(metric.first); - if (instance_it == instances_.end() || - !is_instance_schedulable(instance_it->second)) { - continue; - } - if (instance_it->second.type != InstanceType::DECODE) { - if (metric.second.gpu_cache_usage_perc < - least_loaded_prefill_gpu_cache_usage_perc) { - least_loaded_prefill_gpu_cache_usage_perc = - metric.second.gpu_cache_usage_perc; - least_loaded_prefill_instance = metric.first; - } - } else { - if (metric.second.gpu_cache_usage_perc < - least_loaded_decode_gpu_cache_usage_perc) { - least_loaded_decode_gpu_cache_usage_perc = - metric.second.gpu_cache_usage_perc; - least_loaded_decode_instance = metric.first; - } - } - } - } - - if (infos->prefill_load_metrics.size() == 0 && - !least_loaded_prefill_instance.empty()) { - infos->prefill_load_metrics.insert( - std::make_pair(least_loaded_prefill_instance, - load_metrics_[least_loaded_prefill_instance])); - } - - if (infos->decode_load_metrics.size() == 0 && - !least_loaded_decode_instance.empty()) { - infos->decode_load_metrics.insert( - std::make_pair(least_loaded_decode_instance, - load_metrics_[least_loaded_decode_instance])); - } + TopologySnapshot topo = topology_impl_->snapshot(); + metrics_impl_->get_load_metrics(infos, topo); } -void InstanceMgr::record_load_metrics_update( - const std::string& instance_name, - const proto::LoadMetrics& load_metrics) { - std::unique_lock lock(metrics_mutex_); - - updated_metrics_.insert_or_assign( - instance_name, - LoadMetrics(load_metrics.waiting_requests_num(), - load_metrics.gpu_cache_usage_perc())); +void InstanceMgr::kvcache_match(const Slice& token_ids, + OverlapScores* overlap_scores) { + kvcache_->match(token_ids, overlap_scores); } -bool InstanceMgr::upload_load_metrics() { - std::unordered_map upload_snapshot; - std::unordered_set remove_snapshot; - { - std::unique_lock lk(metrics_mutex_); - for (auto& iter : updated_metrics_) { - load_metrics_.insert_or_assign(iter.first, iter.second); - } - for (auto& iter : removed_instance_) { - load_metrics_.erase(iter); - } - upload_snapshot = updated_metrics_; - remove_snapshot = removed_instance_; - updated_metrics_.clear(); - removed_instance_.clear(); - } - bool status = etcd_client_->set(ETCD_LOADMETRICS_PREFIX, upload_snapshot); - status = - status && etcd_client_->rm(ETCD_LOADMETRICS_PREFIX, remove_snapshot); - return status; +bool InstanceMgr::upload_master_state_to_etcd() { + const bool kvcache_ok = kvcache_->upload_kvcache(); + const bool load_ok = metrics_impl_->upload_load_metrics(); + return kvcache_ok && load_ok; } void InstanceMgr::set_as_master() { - is_master_service_ = true; - etcd_client_->remove_watch(ETCD_LOADMETRICS_PREFIX); + metrics_impl_->set_as_master(); + kvcache_->set_as_master(); } std::shared_ptr InstanceMgr::get_channel( const std::string& instance_name) { - std::shared_lock lock(cluster_mutex_); - auto iter = cached_channels_.find(instance_name); - if (iter == cached_channels_.end()) { - return nullptr; - } - return iter->second; + return topology_impl_->get_channel(instance_name); } bool InstanceMgr::bind_request_instance_incarnations( const std::shared_ptr& request) { - std::shared_lock lock(cluster_mutex_); - - // Bind the selected routing to a concrete incarnation before dispatch. - request->prefill_incarnation_id.clear(); - request->decode_incarnation_id.clear(); - - if (!request->routing.prefill_name.empty()) { - auto prefill_it = instances_.find(request->routing.prefill_name); - if (prefill_it == instances_.end()) { - LOG(ERROR) << "Prefill instance is not registered when binding request: " - << request->routing.prefill_name; - return false; - } - if (!is_instance_schedulable(prefill_it->second)) { - LOG(ERROR) << "Prefill instance is not schedulable when binding request: " - << request->routing.prefill_name << ", state: " - << runtime_state_name(prefill_it->second.runtime_state); - return false; - } - request->prefill_incarnation_id = prefill_it->second.incarnation_id; - } - - if (!request->routing.decode_name.empty()) { - auto decode_it = instances_.find(request->routing.decode_name); - if (decode_it == instances_.end()) { - LOG(ERROR) << "Decode instance is not registered when binding request: " - << request->routing.decode_name; - return false; - } - if (!is_instance_schedulable(decode_it->second)) { - LOG(ERROR) << "Decode instance is not schedulable when binding request: " - << request->routing.decode_name << ", state: " - << runtime_state_name(decode_it->second.runtime_state); - return false; - } - request->decode_incarnation_id = decode_it->second.incarnation_id; - } - - return true; + return topology_impl_->bind_request_instance_incarnations(request); } -bool InstanceMgr::record_instance_heartbeat(const std::string& instance_name, - const std::string& incarnation_id) { - std::unique_lock lock(cluster_mutex_); - auto it = instances_.find(instance_name); - if (it == instances_.end()) { - LOG(WARNING) << "Ignore heartbeat from unknown instance: " << instance_name; +bool InstanceMgr::on_instance_heartbeat(const proto::HeartbeatRequest& req) { + if (!topology_impl_->record_instance_heartbeat(req.name(), + req.incarnation_id())) { return false; } - - if (it->second.incarnation_id != incarnation_id) { - LOG(WARNING) << "Ignore stale heartbeat from instance: " << instance_name - << ", current incarnation_id: " << it->second.incarnation_id - << ", heartbeat incarnation_id: " << incarnation_id; - return false; - } - - it->second.latest_timestamp = current_time_ms(); - if (it->second.runtime_state == InstanceRuntimeState::SUSPECT) { - // A recovered suspect instance first goes back to LEASE_LOST and must - // keep heartbeating before becoming fully active again via registration. - clear_suspect_instance(instance_name, incarnation_id); - it->second.runtime_state = InstanceRuntimeState::LEASE_LOST; - LOG(WARNING) << "Heartbeat recovered for suspect instance, move to " - << "lease lost state: " << instance_name - << ", incarnation_id: " << incarnation_id; - } + kvcache_->record_updated_kvcaches(req.name(), req.cache_event()); + metrics_impl_->record_load_metrics_update(req.name(), req.load_metrics()); + metrics_impl_->update_latency_metrics(req.name(), req.latency_metrics()); return true; } -bool InstanceMgr::init_brpc_channel( - const std::string& instance_name, - std::shared_ptr* out_channel) { - auto channel = std::make_shared(); - brpc::ChannelOptions options; - // Add to params - // options.protocol = "http"; - options.timeout_ms = options_.timeout_ms(); /*milliseconds*/ - options.max_retry = 3; - options.connect_timeout_ms = options_.connect_timeout_ms(); - std::string load_balancer = ""; - if (channel->Init(instance_name.c_str(), load_balancer.c_str(), &options) != - 0) { - LOG(ERROR) << "Fail to initialize channel for " << instance_name; - return false; - } - *out_channel = std::move(channel); - return true; -} - -bool InstanceMgr::probe_instance_health(const std::string& instance_name) { - const int attempts = std::max(1, options_.instance_delete_probe_attempts()); - const int timeout_ms = - std::max(1, options_.instance_delete_probe_timeout_ms()); - const std::string url = "http://" + instance_name + kHealthPath; - - for (int attempt = 1; attempt <= attempts; ++attempt) { - brpc::Channel channel; - brpc::ChannelOptions options; - options.protocol = "http"; - options.timeout_ms = timeout_ms; - options.connect_timeout_ms = timeout_ms; - options.max_retry = 0; - if (channel.Init(url.c_str(), "", &options) != 0) { - LOG(WARNING) << "Failed to initialize health probe channel, instance: " - << instance_name << ", attempt: " << attempt << "/" - << attempts; - } else { - brpc::Controller cntl; - cntl.http_request().uri() = url; - cntl.http_request().set_method(brpc::HTTP_METHOD_GET); - channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr); - if (!cntl.Failed() && cntl.http_response().status_code() == 200) { - return true; - } - LOG(WARNING) << "Health probe failed, instance: " << instance_name - << ", attempt: " << attempt << "/" << attempts << ", error: " - << (cntl.Failed() ? cntl.ErrorText() - : std::to_string( - cntl.http_response().status_code())); - } - - if (attempt < attempts) { - std::this_thread::sleep_for( - std::chrono::milliseconds(kDeleteProbeRetryBackoffMs)); - } - } - - return false; +void InstanceMgr::update_request_metrics(std::shared_ptr request, + RequestAction action) { + metrics_impl_->update_request_metrics(request, action); } -void InstanceMgr::update_instance_metainfo(const etcd::Response& response, - const uint64_t& prefix_len) { - if (response.events().empty() || exited_) { - return; - } - - threadpool_.schedule([this, - response = std::move(response), - prefix_len = std::move(prefix_len)] { - if (exited_) return; - for (const auto& event : response.events()) { - const std::string instance_name = get_event_key_suffix(event, prefix_len); - if (instance_name.empty()) { - continue; - } - - if (event.event_type() == etcd::Event::EventType::PUT) { - InstanceMetaInfo metainfo; - auto json_str = get_event_value(event); - if (!metainfo.parse_from_json(json_str)) { - LOG(ERROR) << "Parse instance json failed: " << json_str; - continue; - } - - std::unique_lock lock(cluster_mutex_); - auto existing_it = instances_.find(instance_name); - if (existing_it == instances_.end()) { - lock.unlock(); - if (!register_instance(instance_name, metainfo)) { - LOG(ERROR) << "Fail to register instance: " << instance_name; - } - continue; - } - - if (existing_it->second.incarnation_id == metainfo.incarnation_id) { - const auto previous_state = existing_it->second.runtime_state; - refresh_instance_registration(instance_name, metainfo); - clear_suspect_instance(instance_name, metainfo.incarnation_id); - if (previous_state != InstanceRuntimeState::ACTIVE) { - LOG(INFO) << "Instance registration restored, back to active: " - << instance_name - << ", incarnation_id: " << metainfo.incarnation_id - << ", previous_state: " - << runtime_state_name(previous_state); - } - continue; - } - - const std::string old_incarnation_id = - existing_it->second.incarnation_id; - LOG(WARNING) << "Detected instance replacement, instance_name: " - << instance_name - << ", old incarnation_id: " << old_incarnation_id - << ", new incarnation_id: " << metainfo.incarnation_id; - lock.unlock(); - deregister_instance(instance_name, old_incarnation_id); - if (!register_instance(instance_name, metainfo)) { - LOG(ERROR) << "Fail to register replacement instance: " - << instance_name; - } - continue; - } - - if (event.event_type() != etcd::Event::EventType::DELETE_) { - continue; - } - - InstanceMetaInfo deleted_info; - std::string deleted_incarnation_id; - const auto deleted_value = get_event_value(event); - if (!deleted_value.empty() && - deleted_info.parse_from_json(deleted_value)) { - deleted_incarnation_id = deleted_info.incarnation_id; - } - std::string tracked_incarnation_id; - { - std::unique_lock lock(cluster_mutex_); - auto existing_it = instances_.find(instance_name); - if (existing_it == instances_.end()) { - continue; - } - - if (!deleted_incarnation_id.empty() && - existing_it->second.incarnation_id != deleted_incarnation_id) { - LOG(INFO) << "Ignore stale delete for replaced instance: " - << instance_name - << ", deleted incarnation_id: " << deleted_incarnation_id - << ", current incarnation_id: " - << existing_it->second.incarnation_id; +bool InstanceMgr::select_instance_pair_on_slo( + std::shared_ptr request) { + std::string flip_prefill_target; + { + std::scoped_lock lock( + topology_impl_->cluster_mutex_, metrics_impl_->metrics_mutex_); + + auto& instances_ = topology_impl_->instances_; + auto& prefill_index_ = topology_impl_->prefill_index_; + auto& decode_index_ = topology_impl_->decode_index_; + auto& suspect_instances_ = topology_impl_->suspect_instances_; + auto& request_metrics_ = metrics_impl_->request_metrics_; + + const bool has_unschedulable_instances = !suspect_instances_.empty(); + + std::string min_prefill_instance; + int64_t min_prefill_time = std::numeric_limits::max(); + int64_t total_prefill_time = 0; + size_t schedulable_prefill_count = 0; + for (const auto& prefill_instance : prefill_index_) { + if (has_unschedulable_instances) { + auto it = instances_.find(prefill_instance); + if (it == instances_.end() || !is_instance_schedulable(it->second)) { continue; } - tracked_incarnation_id = existing_it->second.incarnation_id; } - // Keep delete handling event-driven: use this event, one health probe, - // and later heartbeats or PUT events to drive recovery. - const bool probe_success = probe_instance_health(instance_name); - - std::unique_lock lock(cluster_mutex_); - auto existing_it = instances_.find(instance_name); - if (existing_it == instances_.end() || - existing_it->second.incarnation_id != tracked_incarnation_id) { - continue; - } - - if (probe_success) { - // Keep the instance in service temporarily and wait for heartbeats. - clear_suspect_instance(instance_name, tracked_incarnation_id); - existing_it->second.runtime_state = InstanceRuntimeState::LEASE_LOST; - existing_it->second.latest_timestamp = current_time_ms(); - LOG(WARNING) << "Instance lease deleted, probe succeeded, enter " - << "lease lost state: " << instance_name - << ", incarnation_id: " << tracked_incarnation_id; - continue; - } - - mark_instance_suspect(instance_name, tracked_incarnation_id); - LOG(WARNING) << "Instance lease deleted, probe failed, enter suspect " - << "state: " << instance_name - << ", incarnation_id: " << tracked_incarnation_id; - } - }); -} - -void InstanceMgr::update_load_metrics(const etcd::Response& response, - const uint64_t& prefix_len) { - if (response.events().empty() || exited_) { - return; - } - threadpool_.schedule([this, - response = std::move(response), - prefix_len = std::move(prefix_len)] { - if (exited_) return; - std::unordered_map put_map; - std::vector delete_list; - - for (const auto& event : response.events()) { - std::string instance_name = event.kv().key().substr(prefix_len); - - if (event.event_type() == etcd::Event::EventType::PUT) { - LoadMetrics load_metrics; - auto json_str = event.kv().as_string(); - if (!load_metrics.parse_from_json(json_str)) { - LOG(ERROR) << "pase json:" << json_str << " error!"; - continue; - } - - put_map.insert(std::make_pair(instance_name, std::move(load_metrics))); - - } else if (event.event_type() == etcd::Event::EventType::DELETE_) { - delete_list.push_back(instance_name); + int64_t prefill_time = + request_metrics_[prefill_instance].estimated_prefill_time; + total_prefill_time += prefill_time; + if (prefill_time < min_prefill_time) { + min_prefill_instance = prefill_instance; + min_prefill_time = prefill_time; } + ++schedulable_prefill_count; } - { - std::unique_lock lock(metrics_mutex_); - for (auto& iter : put_map) { - load_metrics_.insert_or_assign(iter.first, std::move(iter.second)); - } - - for (auto& iter : delete_list) { - load_metrics_.erase(iter); - } + if (schedulable_prefill_count == 0) { + LOG(ERROR) << "No prefill or default instance found!"; + return false; } - }); -} - -void InstanceMgr::update_latency_metrics( - const std::string& instance_name, - const proto::LatencyMetrics& latency_metrics) { - std::unique_lock lock(metrics_mutex_); - - latency_metrics_.insert_or_assign( - instance_name, - LatencyMetrics(latency_metrics.recent_max_ttft(), - latency_metrics.recent_max_tbt())); -} - -void InstanceMgr::reconcile_instance_states() { - const auto suspect_interval_ms = - std::max(1, options_.detect_disconnected_instance_interval()) * - 1000; - const auto heartbeat_timeout_ms = - std::max(1, options_.lease_lost_heartbeat_timeout_ms()); - while (!exited_) { - std::this_thread::sleep_for(std::chrono::seconds(1)); - - std::vector> to_deregister; + int64_t avg_prefill_time = total_prefill_time / schedulable_prefill_count; - { - std::unique_lock lock(cluster_mutex_); - if (exited_) { - return; - } - - const uint64_t now_ms = current_time_ms(); - for (auto& [instance_name, info] : instances_) { - // LEASE_LOST is a grace period after etcd delete but before hard - // eviction. - if (info.runtime_state != InstanceRuntimeState::LEASE_LOST) { - continue; - } - if (now_ms - info.latest_timestamp < heartbeat_timeout_ms) { + std::string min_decode_instance; + int64_t min_estimated_tpot = std::numeric_limits::max(); + std::string target_decode_instance; + size_t schedulable_decode_count = 0; + for (const auto& decode_instance : decode_index_) { + if (has_unschedulable_instances) { + auto it = instances_.find(decode_instance); + if (it == instances_.end() || !is_instance_schedulable(it->second)) { continue; } - mark_instance_suspect(instance_name, info.incarnation_id); - LOG(WARNING) << "Lease lost instance heartbeat timed out, enter suspect " - << "state: " << instance_name - << ", incarnation_id: " << info.incarnation_id; } - for (auto it = suspect_instances_.begin(); - it != suspect_instances_.end();) { - const std::string instance_name = it->first; - const std::string incarnation_id = it->second.incarnation_id; - const uint64_t enter_ts_ms = it->second.enter_ts_ms; - ++it; - - if (now_ms - enter_ts_ms < suspect_interval_ms) { - continue; - } - - auto inst_it = instances_.find(instance_name); - if (inst_it == instances_.end() || - inst_it->second.incarnation_id != incarnation_id) { - suspect_instances_.erase(instance_name); - continue; - } - - LOG(WARNING) << "Suspect window expired, deregister instance: " - << instance_name << ", incarnation_id: " << incarnation_id; - to_deregister.emplace_back(instance_name, incarnation_id); + int64_t token_num = request_metrics_[decode_instance].decode_token_num; + int64_t request_num = + request_metrics_[decode_instance].decode_request_num; + auto& time_predictor = + metrics_impl_->time_predictor_unlocked(decode_instance); + int64_t estimated_tpot = static_cast(time_predictor.predict_tpot( + static_cast(token_num + request->token_ids.size()), + static_cast(request_num + 1))); + if (estimated_tpot <= FLAGS_target_tpot && + target_decode_instance.empty()) { + target_decode_instance = decode_instance; } - } - - for (const auto& p : to_deregister) { - deregister_instance(p.first, p.second); - } - } -} - -void InstanceMgr::refresh_instance_registration(const std::string& name, - const InstanceMetaInfo& info) { - auto it = instances_.find(name); - if (it == instances_.end()) { - return; - } - - // Preserve local scheduling/index state across etcd refreshes. - const auto instance_index = it->second.instance_index; - const auto current_type = it->second.current_type; - - it->second = info; - it->second.instance_index = instance_index; - it->second.current_type = current_type; - it->second.latest_timestamp = current_time_ms(); - it->second.runtime_state = InstanceRuntimeState::ACTIVE; -} - -void InstanceMgr::mark_instance_suspect(const std::string& name, - const std::string& incarnation_id) { - SuspectInstanceInfo info; - info.incarnation_id = incarnation_id; - info.enter_ts_ms = current_time_ms(); - suspect_instances_[name] = std::move(info); - auto it = instances_.find(name); - if (it != instances_.end() && it->second.incarnation_id == incarnation_id) { - it->second.runtime_state = InstanceRuntimeState::SUSPECT; - } -} - -void InstanceMgr::clear_suspect_instance(const std::string& name, - const std::string& incarnation_id) { - auto it = suspect_instances_.find(name); - if (it == suspect_instances_.end()) { - return; - } - if (!incarnation_id.empty() && it->second.incarnation_id != incarnation_id) { - return; - } - suspect_instances_.erase(it); -} - -void InstanceMgr::update_request_metrics(std::shared_ptr request, - RequestAction action) { - // skip request metrics update if policy is not SLO_AWARE - if (options_.load_balance_policy() != "SLO_AWARE") { - return; - } - - std::scoped_lock lock( - cluster_mutex_, metrics_mutex_); - - auto prefill_it = request_metrics_.find(request->routing.prefill_name); - if (prefill_it == request_metrics_.end()) { - LOG(ERROR) << "Failed to find instance request metrics, instance name : " - << request->routing.prefill_name; - return; - } - - auto decode_it = request_metrics_.find(request->routing.decode_name); - if (decode_it == request_metrics_.end()) { - LOG(ERROR) << "Failed to find instance request metrics, instance name : " - << request->routing.decode_name; - return; - } - - int64_t num_prompt_tokens = request->token_ids.size(); - int64_t num_generated_tokens = request->num_generated_tokens; - switch (action) { - case RequestAction::SCHEDULE: - // update the request metrics for prefill and decode instances when - // request is scheduled - prefill_it->second.prefill_request_num += 1; - prefill_it->second.prefill_token_num += num_prompt_tokens; - - decode_it->second.decode_request_num += 1; - decode_it->second.decode_token_num += num_prompt_tokens; - break; - case RequestAction::FINISH_PREFILL: - // update the request metrics for prefill and decode instance when request - // finishes the prefill phase - prefill_it->second.prefill_request_num -= 1; - prefill_it->second.prefill_token_num -= num_prompt_tokens; - prefill_it->second.estimated_prefill_time -= request->estimated_ttft; - - decode_it->second.decode_token_num += 1; - break; - case RequestAction::GENERATE: - // update the request metrics for decode instance when request generate a - // token - decode_it->second.decode_token_num += 1; - break; - case RequestAction::FINISH_DECODE: - // update the request metrics for decode instance when request finishes - // the decode phase - decode_it->second.decode_request_num -= 1; - decode_it->second.decode_token_num -= - (num_prompt_tokens + num_generated_tokens); - - break; - case RequestAction::CANCEL: - // update the request metrics for prefill and decode instances when - // request is cancelled - prefill_it->second.prefill_request_num -= 1; - prefill_it->second.prefill_token_num -= num_prompt_tokens; - prefill_it->second.estimated_prefill_time -= request->estimated_ttft; - - decode_it->second.decode_request_num -= 1; - decode_it->second.decode_token_num -= - (num_prompt_tokens + num_generated_tokens); - - break; - default: - LOG(ERROR) << "Unknown RequestAction: " << static_cast(action); - break; - } - - if (decode_it->second.decode_request_num == 0) { - flip_decode_to_prefill(request->routing.decode_name); - } -} - -bool InstanceMgr::select_instance_pair_on_slo( - std::shared_ptr request) { - std::scoped_lock lock( - cluster_mutex_, metrics_mutex_); - const bool has_unschedulable_instances = !suspect_instances_.empty(); - - std::string min_prefill_instance; - int64_t min_prefill_time = std::numeric_limits::max(); - int64_t total_prefill_time = 0; - size_t schedulable_prefill_count = 0; - for (const auto& prefill_instance : prefill_index_) { - if (has_unschedulable_instances) { - auto it = instances_.find(prefill_instance); - if (it == instances_.end() || !is_instance_schedulable(it->second)) { - continue; - } - } - - int64_t prefill_time = - request_metrics_[prefill_instance].estimated_prefill_time; - total_prefill_time += prefill_time; - if (prefill_time < min_prefill_time) { - min_prefill_instance = prefill_instance; - min_prefill_time = prefill_time; - } - ++schedulable_prefill_count; - } - - if (schedulable_prefill_count == 0) { - LOG(ERROR) << "No prefill or default instance found!"; - return false; - } - int64_t avg_prefill_time = total_prefill_time / schedulable_prefill_count; - std::string min_decode_instance; - int64_t min_estimated_tpot = std::numeric_limits::max(); - std::string target_decode_instance; - size_t schedulable_decode_count = 0; - for (const auto& decode_instance : decode_index_) { - if (has_unschedulable_instances) { - auto it = instances_.find(decode_instance); - if (it == instances_.end() || !is_instance_schedulable(it->second)) { - continue; + if (estimated_tpot < min_estimated_tpot) { + min_decode_instance = decode_instance; + min_estimated_tpot = estimated_tpot; } + ++schedulable_decode_count; } - int64_t token_num = request_metrics_[decode_instance].decode_token_num; - int64_t request_num = request_metrics_[decode_instance].decode_request_num; - auto& time_predictor = get_time_predictor(decode_instance); - int64_t estimated_tpot = time_predictor.predict_tpot( - token_num + request->token_ids.size(), request_num + 1); - if (estimated_tpot <= FLAGS_target_tpot && target_decode_instance.empty()) { - target_decode_instance = decode_instance; - } - - if (estimated_tpot < min_estimated_tpot) { - min_decode_instance = decode_instance; - min_estimated_tpot = estimated_tpot; - } - ++schedulable_decode_count; - } - - if (schedulable_decode_count == 0) { - LOG(ERROR) << "No decode instance found!"; - return false; - } - - if (!target_decode_instance.empty()) { - request->routing.decode_name = target_decode_instance; - } else { - request->routing.decode_name = min_decode_instance; - } - - // select prefill instance - float tpot_threshold = - (schedulable_decode_count - 1.0f) / schedulable_decode_count; - // When the prefill instances are already overloaded and there are other - // instances with lower loads in the decode group, we will dispatch the - // prefill requests to those instances to alleviate the pressure on the - // prefill instances. - if (min_prefill_time > FLAGS_target_ttft && - target_decode_instance != min_decode_instance && - min_estimated_tpot < FLAGS_target_tpot * tpot_threshold && - request_metrics_[min_decode_instance].estimated_prefill_time < - min_prefill_time) { - request->routing.prefill_name = min_decode_instance; - // update estimated ttft - auto& time_predictor = get_time_predictor(min_decode_instance); - request->estimated_ttft = - time_predictor.predict_ttft(request->token_ids.size()); - request_metrics_[min_decode_instance].estimated_prefill_time += - request->estimated_ttft; - } else { - request->routing.prefill_name = min_prefill_instance; - // update estimated ttft - auto& time_predictor = get_time_predictor(min_prefill_instance); - request->estimated_ttft = - time_predictor.predict_ttft(request->token_ids.size()); - request_metrics_[min_prefill_instance].estimated_prefill_time += - request->estimated_ttft; - } - - // If there are no decode instances that meet the requirements, switch a - // prefill instance to decode if the number of instances allows. Since the - // current disaggregated PD mode does not support prefill and decode using the - // same instance, we only switch the instance here, without dispatching the - // decode request to this instance. - float ttft_threshold = - (schedulable_prefill_count - 1.0f) / schedulable_prefill_count; - if (target_decode_instance.empty() && - (avg_prefill_time < FLAGS_target_ttft * ttft_threshold || - schedulable_decode_count < schedulable_prefill_count)) { - flip_prefill_to_decode(request->routing.prefill_name); - } - - return true; -} - -void InstanceMgr::flip_prefill_to_decode(std::string& instance_name) { - if (count_schedulable_instances(instances_, prefill_index_) <= 1) { - // Ensure there is at least one prefill instance. - return; - } - - if (instances_.find(instance_name) == instances_.end()) { - LOG(ERROR) << "Can't find instance, instance_name: " << instance_name; - return; - } - - // delete instance name from prefill_index_ - remove_instance_from_index(instance_name, instances_[instance_name]); - - // insert instance name to decode_index_ - instances_[instance_name].current_type = InstanceType::DECODE; - add_instance_to_index(instance_name, instances_[instance_name]); - - LOG(INFO) << "Flip prefill to decode, instance name : " << instance_name; -} - -void InstanceMgr::flip_decode_to_prefill(std::string& instance_name) { - if (count_schedulable_instances(instances_, decode_index_) <= 1) { - // Ensure there is at least one decode instance. - return; - } - - if (instances_.find(instance_name) == instances_.end()) { - LOG(ERROR) << "Can't find instance, instance_name: " << instance_name; - return; - } - - // delete instance name from decode_index_ - remove_instance_from_index(instance_name, instances_[instance_name]); - - // insert instance name to prefill_index - instances_[instance_name].current_type = InstanceType::PREFILL; - add_instance_to_index(instance_name, instances_[instance_name]); - - LOG(INFO) << "Flip decode to prefill, instance name : " << instance_name; -} - -TimePredictor& InstanceMgr::get_time_predictor( - const std::string& instance_name) { - auto it = time_predictors_.find(instance_name); - if (it == time_predictors_.end()) { - LOG(FATAL) << "Find TimePredictor failed, instance name : " - << instance_name; - } - return it->second; -} - -bool InstanceMgr::call_link_instance(const std::string& target_rpc_addr, - const InstanceMetaInfo& peer_info) { - brpc::Channel channel; - brpc::ChannelOptions options; - options.protocol = "http"; - options.timeout_ms = options_.timeout_ms(); - options.max_retry = 3; - if (channel.Init(target_rpc_addr.c_str(), "", &options) != 0) { - LOG(ERROR) << "Fail to initialize channel for LinkInstance to " - << target_rpc_addr; - return false; - } - xllm::proto::DisaggPDService_Stub stub(&channel); - brpc::Controller cntl; - xllm::proto::InstanceClusterInfo req; - req.set_instance_name(peer_info.name); - for (auto& cluster_id : peer_info.cluster_ids) { - req.add_cluster_ids(cluster_id); - } - for (auto& addr : peer_info.addrs) { - req.add_addrs(addr); - } - for (auto& ip : peer_info.device_ips) { - req.add_device_ips(ip); - } - for (auto& port : peer_info.ports) { - req.add_ports(port); - } - req.set_dp_size(peer_info.dp_size); - xllm::proto::Status res; - stub.LinkInstance(&cntl, &req, &res, nullptr); - if (cntl.Failed()) { - LOG(ERROR) << "LinkInstance failed, target: " << target_rpc_addr - << ", peer: " << peer_info.name - << ", error: " << cntl.ErrorText(); - return false; - } - return res.ok(); -} - -bool InstanceMgr::call_unlink_instance(const std::string& target_rpc_addr, - const InstanceMetaInfo& peer_info) { - brpc::Channel channel; - brpc::ChannelOptions options; - options.protocol = "http"; - options.timeout_ms = options_.timeout_ms(); - options.max_retry = 3; - if (channel.Init(target_rpc_addr.c_str(), "", &options) != 0) { - LOG(ERROR) << "Fail to initialize channel for UnlinkInstance to " - << target_rpc_addr; - return false; - } - xllm::proto::DisaggPDService_Stub stub(&channel); - brpc::Controller cntl; - xllm::proto::InstanceClusterInfo req; - req.set_instance_name(peer_info.name); - for (auto& cluster_id : peer_info.cluster_ids) { - req.add_cluster_ids(cluster_id); - } - for (auto& addr : peer_info.addrs) { - req.add_addrs(addr); - } - for (auto& ip : peer_info.device_ips) { - req.add_device_ips(ip); - } - for (auto& port : peer_info.ports) { - req.add_ports(port); - } - req.set_dp_size(peer_info.dp_size); - xllm::proto::Status res; - stub.UnlinkInstance(&cntl, &req, &res, nullptr); - if (cntl.Failed()) { - LOG(ERROR) << "UnlinkInstance failed, target: " << target_rpc_addr - << ", peer: " << peer_info.name - << ", error: " << cntl.ErrorText(); - return false; - } - return res.ok(); -} - -bool InstanceMgr::register_instance(const std::string& name, - InstanceMetaInfo& info) { - info.runtime_state = InstanceRuntimeState::ACTIVE; - info.latest_timestamp = current_time_ms(); - info.name = name; - - { - std::unique_lock lock(cluster_mutex_); - if (instances_.find(name) != instances_.end() || - cached_channels_.find(name) != cached_channels_.end()) { - LOG(ERROR) << "Instance is already registered, instance_name: " << name; - return false; - } - } - - std::shared_ptr channel; - if (!init_brpc_channel(name, &channel)) { - LOG(ERROR) << "create channel fail: " << name; - return false; - } - - { - std::unique_lock lock(cluster_mutex_); - if (instances_.find(name) != instances_.end() || - cached_channels_.find(name) != cached_channels_.end()) { - LOG(WARNING) << "Instance registered concurrently during channel init: " - << name; - return false; - } - cached_channels_[name] = std::move(channel); - } - - add_instance_resources(name, info); - - std::vector> link_ops; - { - std::unique_lock lock(cluster_mutex_); - if (!gather_link_operations(info, &link_ops)) { - remove_instance_resources(name); + if (schedulable_decode_count == 0) { + LOG(ERROR) << "No decode instance found!"; return false; } - } - - if (!run_link_operations(link_ops)) { - std::unique_lock lock(cluster_mutex_); - remove_instance_resources(name); - return false; - } - - { - std::unique_lock lock(cluster_mutex_); - add_instance_to_index(name, info); - instances_.insert(std::make_pair(name, info)); - } - return true; -} - -void InstanceMgr::deregister_instance( - const std::string& name, - const std::string& expected_incarnation_id) { - InstanceMetaInfo info; - std::vector> unlink_ops; - { - std::unique_lock lock(cluster_mutex_); - auto it = instances_.find(name); - if (it == instances_.end()) { - LOG(ERROR) << "Instance is not registered, instance_name: " << name; - return; - } - - if (!expected_incarnation_id.empty() && - it->second.incarnation_id != expected_incarnation_id) { - LOG(INFO) << "Skip deregistering stale incarnation, instance_name: " << name - << ", current incarnation_id: " << it->second.incarnation_id - << ", expected incarnation_id: " << expected_incarnation_id; - return; - } - - info = it->second; - clear_suspect_instance(name, info.incarnation_id); - gather_unlink_operations(name, info, &unlink_ops); - } - - for (const auto& op : unlink_ops) { - call_unlink_instance(op.first, op.second); - } - { - std::unique_lock lock(cluster_mutex_); - auto it = instances_.find(name); - if (it == instances_.end()) { - return; + if (!target_decode_instance.empty()) { + request->routing.decode_name = target_decode_instance; + } else { + request->routing.decode_name = min_decode_instance; + } + + float tpot_threshold = + (schedulable_decode_count - 1.0f) / schedulable_decode_count; + if (min_prefill_time > FLAGS_target_ttft && + target_decode_instance != min_decode_instance && + min_estimated_tpot < FLAGS_target_tpot * tpot_threshold && + request_metrics_[min_decode_instance].estimated_prefill_time < + min_prefill_time) { + request->routing.prefill_name = min_decode_instance; + auto& time_predictor = + metrics_impl_->time_predictor_unlocked(min_decode_instance); + request->estimated_ttft = + static_cast(time_predictor.predict_ttft( + static_cast(request->token_ids.size()))); + request_metrics_[min_decode_instance].estimated_prefill_time += + request->estimated_ttft; + } else { + request->routing.prefill_name = min_prefill_instance; + auto& time_predictor = + metrics_impl_->time_predictor_unlocked(min_prefill_instance); + request->estimated_ttft = + static_cast(time_predictor.predict_ttft( + static_cast(request->token_ids.size()))); + request_metrics_[min_prefill_instance].estimated_prefill_time += + request->estimated_ttft; } - remove_instance_from_index(name, it->second); - } - - scheduler_->clear_requests_on_failed_instance( - name, info.incarnation_id, get_cleanup_type(info)); - { - std::unique_lock lock(cluster_mutex_); - auto it = instances_.find(name); - if (it == instances_.end()) { - return; + float ttft_threshold = + (schedulable_prefill_count - 1.0f) / schedulable_prefill_count; + if (target_decode_instance.empty() && + (avg_prefill_time < FLAGS_target_ttft * ttft_threshold || + schedulable_decode_count < schedulable_prefill_count)) { + flip_prefill_target = request->routing.prefill_name; } - remove_instance_resources(name); - instances_.erase(it); } - LOG(INFO) << "delete instance: " << name; -} - -void InstanceMgr::add_instance_resources(const std::string& name, - const InstanceMetaInfo& info) { - std::unique_lock lock(metrics_mutex_); - - time_predictors_.insert_or_assign( - name, TimePredictor(info.ttft_profiling_data, info.tpot_profiling_data)); - - request_metrics_.insert_or_assign(name, RequestMetrics()); -} -void InstanceMgr::remove_instance_resources(const std::string& name) { - // Caller must hold cluster_mutex_ (cached_channels_ is L1). - cached_channels_.erase(name); - std::unique_lock lock(metrics_mutex_); - time_predictors_.erase(name); - request_metrics_.erase(name); - latency_metrics_.erase(name); - updated_metrics_.erase(name); - removed_instance_.insert(name); - load_metrics_.erase(name); -} - -bool InstanceMgr::gather_link_operations( - const InstanceMetaInfo& info, - std::vector>* out_ops) { - out_ops->clear(); - switch (info.type) { - case InstanceType::DEFAULT: - break; - case InstanceType::PREFILL: { - for (auto& d_name : decode_index_) { - out_ops->emplace_back(instances_[d_name].rpc_address, info); - } - break; - } - case InstanceType::DECODE: { - for (auto& p_name : prefill_index_) { - out_ops->emplace_back(info.rpc_address, instances_[p_name]); - } - break; - } - case InstanceType::MIX: { - for (const auto& [peer_name, peer_info] : instances_) { - if (peer_name == info.name) { - continue; - } - out_ops->emplace_back(info.rpc_address, peer_info); - } - break; - } - default: - LOG(WARNING) << "Unknown InstanceType: " << int(info.type); - return false; + if (!flip_prefill_target.empty()) { + topology_impl_->flip_prefill_to_decode(flip_prefill_target); } - return true; -} -bool InstanceMgr::run_link_operations( - const std::vector>& ops) { - for (size_t i = 0; i < ops.size(); ++i) { - if (!call_link_instance(ops[i].first, ops[i].second)) { - LOG(ERROR) << "Fail to link instance during registration, op index " << i; - for (size_t j = 0; j < i; ++j) { - call_unlink_instance(ops[j].first, ops[j].second); - } - return false; - } - } return true; } -void InstanceMgr::gather_unlink_operations( - const std::string& name, - const InstanceMetaInfo& info, - std::vector>* out_ops) { - out_ops->clear(); - if (info.type == InstanceType::PREFILL) { - for (auto& d_name : decode_index_) { - out_ops->emplace_back(instances_[d_name].rpc_address, info); - } - } else if (info.type == InstanceType::DECODE) { - for (auto& p_name : prefill_index_) { - out_ops->emplace_back(instances_[p_name].rpc_address, info); - } - } else if (info.type == InstanceType::MIX) { - for (const auto& [peer_name, peer_info] : instances_) { - if (peer_name == name) { - continue; - } - out_ops->emplace_back(peer_info.rpc_address, info); - } - } -} - -void InstanceMgr::add_instance_to_index(const std::string& name, - InstanceMetaInfo& info) { - switch (info.type) { - case InstanceType::DEFAULT: - info.instance_index = prefill_index_.size(); - prefill_index_.emplace_back(name); - LOG(INFO) << "Register a new default instance, instance name : " << name; - break; - case InstanceType::PREFILL: - info.instance_index = prefill_index_.size(); - prefill_index_.emplace_back(name); - LOG(INFO) << "Register a new prefill instance, instance name : " << name; - break; - case InstanceType::DECODE: - info.instance_index = decode_index_.size(); - decode_index_.emplace_back(name); - LOG(INFO) << "Register a new decode instance, instance name : " << name; - break; - case InstanceType::MIX: - if (decode_index_.size() > 0) { - info.instance_index = prefill_index_.size(); - info.current_type = InstanceType::PREFILL; - prefill_index_.emplace_back(name); - LOG(INFO) << "Register a new prefill instance, instance name : " - << name; - } else { - info.instance_index = decode_index_.size(); - info.current_type = InstanceType::DECODE; - decode_index_.emplace_back(name); - LOG(INFO) << "Register a new decode instance, instance name : " << name; - } - break; - default: - break; - } -} - -void InstanceMgr::remove_instance_from_index(const std::string& name, - const InstanceMetaInfo& info) { - uint64_t index = info.instance_index; - if (index == -1) return; - - auto remove_from_vec = [&](std::vector& vec) { - if (index >= vec.size()) return; - std::swap(vec[index], vec.back()); - instances_[vec[index]].instance_index = index; - vec.pop_back(); - }; - - switch (info.type) { - case InstanceType::DEFAULT: - case InstanceType::PREFILL: - remove_from_vec(prefill_index_); - break; - case InstanceType::DECODE: - remove_from_vec(decode_index_); - break; - case InstanceType::MIX: - if (info.current_type == InstanceType::PREFILL) { - remove_from_vec(prefill_index_); - } else { - remove_from_vec(decode_index_); - } - break; - default: - break; - } -} - } // namespace xllm_service diff --git a/xllm_service/scheduler/managers/instance_mgr.h b/xllm_service/scheduler/managers/instance_mgr.h index 65bdb30..512d37f 100644 --- a/xllm_service/scheduler/managers/instance_mgr.h +++ b/xllm_service/scheduler/managers/instance_mgr.h @@ -15,41 +15,42 @@ limitations under the License. #pragma once -#include - +#include #include -#include #include -#include -#include -#include -#include #include #include "common/macros.h" #include "common/options.h" -#include "common/threadpool.h" -#include "common/time_predictor.h" +#include "common/slice.h" #include "common/types.h" #include "request/request.h" #include "scheduler/etcd_client/etcd_client.h" +#include "scheduler/managers/instance_metrics.h" +#include "scheduler/managers/instance_topology.h" #include "xllm_rpc_service.pb.h" namespace xllm_service { -class Scheduler; +class InstanceKVCache; class InstanceMgr final { public: explicit InstanceMgr(const Options& options, const std::shared_ptr& etcd_client, const bool is_master_service, - Scheduler* scheduler); + OnInstanceDeregisteredCallback on_instance_deregistered); ~InstanceMgr(); InstanceMetaInfo get_instance_info(const std::string& instance_name); + std::shared_ptr get_channel(const std::string& instance_name); - bool get_next_instance_pair(Routing* routing); + // Prefill / decode names in topology index order for load balancing. + // When no instance is in SUSPECT, returns full index lists (same as legacy + // fast-path RR). When any suspect exists, only instances that are + // schedulable (non-SUSPECT) are included. + std::vector get_schedulable_prefill_instances(); + std::vector get_schedulable_decode_instances(); std::vector get_static_decode_list( const std::string& instance_name); @@ -57,29 +58,30 @@ class InstanceMgr final { std::vector get_static_prefill_list( const std::string& instance_name); - void get_load_metrics(LoadBalanceInfos* infos); + // Single entry for worker heartbeat: topology liveness, KV cache events, + // load metrics, and latency metrics. Returns false when heartbeat is + // rejected (unknown instance or incarnation mismatch). + bool on_instance_heartbeat(const proto::HeartbeatRequest& req); - std::shared_ptr get_channel(const std::string& instance_name); + // Master-only: flush aggregated KV-cache locations and load metrics to etcd. + // Returns true only if both uploads succeed. Order matches legacy behavior + // (KV cache first, then load metrics). + bool upload_master_state_to_etcd(); + + void get_load_metrics(LoadBalanceInfos* infos); bool bind_request_instance_incarnations( const std::shared_ptr& request); - bool record_instance_heartbeat(const std::string& instance_name, - const std::string& incarnation_id); - void record_load_metrics_update(const std::string& instance_name, - const proto::LoadMetrics& load_metrics); - bool upload_load_metrics(); - // update the recent token latency metrics for the corresponding instance - void update_latency_metrics(const std::string& instance_name, - const proto::LatencyMetrics& latency_metrics); + void kvcache_match(const Slice& token_ids, + OverlapScores* overlap_scores); - // update request metrics under different actions void update_request_metrics(std::shared_ptr request, RequestAction action); - // select instances based on the SLO bool select_instance_pair_on_slo(std::shared_ptr request); + // Master promotion: stop follower watches on metrics/KV-cache paths. void set_as_master(); private: @@ -87,107 +89,11 @@ class InstanceMgr final { void init(); - // brpc::Channel::Init only; must NOT be called while holding cluster_mutex_. - bool init_brpc_channel(const std::string& target_uri, - std::shared_ptr* out_channel); - bool probe_instance_health(const std::string& instance_name); - void reconcile_instance_states(); - void refresh_instance_registration(const std::string& name, - const InstanceMetaInfo& info); - void mark_instance_suspect(const std::string& name, - const std::string& incarnation_id); - void clear_suspect_instance(const std::string& name, - const std::string& incarnation_id = ""); - // use etcd as ServiceDiscovery - void update_instance_metainfo(const etcd::Response& response, - const uint64_t& prefix_len); - - void update_load_metrics(const etcd::Response& response, - const uint64_t& prefix_len); - - TimePredictor& get_time_predictor(const std::string& instance_name); - - void flip_prefill_to_decode(std::string& instance_name); - void flip_decode_to_prefill(std::string& instance_name); - - // Register a new instance with all necessary resources and connections - bool register_instance(const std::string& name, InstanceMetaInfo& info); - // Remove an instance and clean up its resources and connections - void deregister_instance(const std::string& name, - const std::string& expected_incarnation_id = ""); - // Initialize internal resources for an instance (predictors, metrics) - void add_instance_resources(const std::string& name, - const InstanceMetaInfo& info); - // Release internal resources for an instance - void remove_instance_resources(const std::string& name); - // Build LinkInstance RPC list; caller must hold cluster_mutex_. - bool gather_link_operations( - const InstanceMetaInfo& info, - std::vector>* out_ops); - // Run LinkInstance calls without holding cluster_mutex_. - bool run_link_operations( - const std::vector>& ops); - // Build UnlinkInstance RPC list; caller must hold cluster_mutex_. - void gather_unlink_operations( - const std::string& name, - const InstanceMetaInfo& info, - std::vector>* out_ops); - // Add instance to prefill or decode index according to its type - void add_instance_to_index(const std::string& name, InstanceMetaInfo& info); - // Remove instance from prefill or decode index - void remove_instance_from_index(const std::string& name, - const InstanceMetaInfo& info); - bool call_link_instance(const std::string& target_rpc_addr, - const InstanceMetaInfo& peer_info); - bool call_unlink_instance(const std::string& target_rpc_addr, - const InstanceMetaInfo& peer_info); - - // Locking (scheme B): only two mutexes participate in ordering. - // L1 cluster_mutex_: instances_, indices, cached_channels_. - // L2 metrics_mutex_: load_metrics_, request_metrics_, latency_metrics_, - // time_predictors_, updated_metrics_, removed_instance_. - // Order when both needed: always lock L1 before L2 (use std::scoped_lock). - // get_time_predictor() requires metrics_mutex_ held by caller. - // remove_instance_resources() requires cluster_mutex_ held by caller. - - Options options_; - - bool exited_ = false; - bool use_etcd_ = false; - std::atomic_bool is_master_service_ = false; - std::shared_ptr etcd_client_; - // L1 — cluster topology & channels - std::shared_mutex cluster_mutex_; - std::unordered_map instances_; - struct SuspectInstanceInfo { - std::string incarnation_id; - uint64_t enter_ts_ms = 0; - }; - std::unordered_map suspect_instances_; - std::vector prefill_index_; - std::vector decode_index_; - uint64_t next_prefill_index_ = 0; - uint64_t next_decode_index_ = 0; - std::unordered_map> - cached_channels_; - - // L2 — metrics & predictors (single lock to avoid order ambiguity) - std::shared_mutex metrics_mutex_; - std::unordered_map load_metrics_; - std::unordered_map updated_metrics_; - std::unordered_set removed_instance_; - std::unordered_map time_predictors_; - std::unordered_map latency_metrics_; - std::unordered_map request_metrics_; - - // not own - // NOTE: need to refactor with scheduler in future - Scheduler* scheduler_; - - ThreadPool threadpool_; - std::unique_ptr state_reconcile_thread_; + std::unique_ptr metrics_impl_; + std::unique_ptr topology_impl_; + std::unique_ptr kvcache_; }; } // namespace xllm_service diff --git a/xllm_service/scheduler/managers/instance_topology.cpp b/xllm_service/scheduler/managers/instance_topology.cpp new file mode 100644 index 0000000..b77917f --- /dev/null +++ b/xllm_service/scheduler/managers/instance_topology.cpp @@ -0,0 +1,954 @@ +/* Copyright 2025 The xLLM Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://github.com/jd-opensource/xllm-service/blob/main/LICENSE + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "instance_topology.h" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common/types.h" +#include "disagg_pd.pb.h" + +namespace { +using xllm_service::InstanceRuntimeState; +using xllm_service::InstanceType; +std::unordered_map ETCD_KEYS_PREFIX_MAP = { + {InstanceType::DEFAULT, "XLLM:DEFAULT:"}, + {InstanceType::PREFILL, "XLLM:PREFILL:"}, + {InstanceType::DECODE, "XLLM:DECODE:"}, + {InstanceType::MIX, "XLLM:MIX:"}, +}; + +constexpr char kHealthPath[] = "/health"; +constexpr int64_t kDeleteProbeRetryBackoffMs = 100; + +uint64_t current_time_ms() { + return static_cast( + absl::ToInt64Milliseconds(absl::Now() - absl::UnixEpoch())); +} + +bool is_instance_schedulable(const xllm_service::InstanceMetaInfo& info) { + return info.runtime_state != InstanceRuntimeState::SUSPECT; +} + +size_t count_schedulable_instances( + const std::unordered_map& + instances, + const std::vector& index) { + size_t count = 0; + for (const auto& name : index) { + auto it = instances.find(name); + if (it == instances.end() || !is_instance_schedulable(it->second)) { + continue; + } + ++count; + } + return count; +} + +InstanceType get_cleanup_type(const xllm_service::InstanceMetaInfo& info) { + if (info.type == InstanceType::DEFAULT) { + return InstanceType::PREFILL; + } + if (info.type == InstanceType::MIX) { + return info.current_type; + } + return info.type; +} +} // namespace + +namespace xllm_service { + +InstanceTopologyImpl::InstanceTopologyImpl( + const Options& options, + const std::shared_ptr& etcd_client, + OnInstanceDeregisteredCallback on_instance_deregistered, + AddInstanceMetricsCallback add_instance_metrics, + RemoveInstanceMetricsCallback remove_instance_metrics_maps) + : options_(options), + etcd_client_(etcd_client), + on_instance_deregistered_cb_(std::move(on_instance_deregistered)), + add_instance_metrics_cb_(std::move(add_instance_metrics)), + remove_instance_metrics_maps_cb_(std::move(remove_instance_metrics_maps)) { + auto handle_instance_metainfo = + std::bind(&InstanceTopologyImpl::update_instance_metainfo, + this, + std::placeholders::_1, + std::placeholders::_2); + for (auto& it : ETCD_KEYS_PREFIX_MAP) { + etcd_client_->add_watch(it.second, handle_instance_metainfo); + } +} + +InstanceTopologyImpl::~InstanceTopologyImpl() { + exited_ = true; + if (state_reconcile_thread_ && state_reconcile_thread_->joinable()) { + state_reconcile_thread_->join(); + } +} + +void InstanceTopologyImpl::init_from_etcd_register_all() { + std::unordered_map loaded_instances; + for (auto& it : ETCD_KEYS_PREFIX_MAP) { + etcd_client_->get_prefix(it.second, &loaded_instances); + } + LOG(INFO) << "Load instance info from etcd:" << loaded_instances.size(); + + { + std::unique_lock lock(cluster_mutex_); + prefill_index_.reserve(loaded_instances.size()); + decode_index_.reserve(loaded_instances.size()); + } + + for (auto& pair : loaded_instances) { + if (!register_instance(pair.first, pair.second)) { + LOG(ERROR) << "Fail to register instance: " << pair.first; + } + } + + { + std::shared_lock lock(cluster_mutex_); + for (int i = 0; i < prefill_index_.size(); i++) { + LOG(INFO) << i << " : " << prefill_index_[i]; + } + } + + state_reconcile_thread_ = std::make_unique( + &InstanceTopologyImpl::reconcile_instance_states, this); +} + +InstanceMetaInfo InstanceTopologyImpl::get_instance_info( + const std::string& instance_name) { + std::shared_lock lock(cluster_mutex_); + if (instances_.find(instance_name) == instances_.end()) { + LOG(ERROR) << "Get instance info failed, instance is not registered, " + "instance_name: " + << instance_name; + return InstanceMetaInfo(); + } + return instances_[instance_name]; +} + +std::vector InstanceTopologyImpl::get_static_decode_list( + const std::string& /*instance_name*/) { + std::vector decode_list; + std::shared_lock lock(cluster_mutex_); + for (auto& inst : instances_) { + if (inst.second.type == InstanceType::DECODE && + is_instance_schedulable(inst.second)) { + decode_list.emplace_back(inst.second.name); + } + } + + return decode_list; +} + +std::vector InstanceTopologyImpl::get_static_prefill_list( + const std::string& /*instance_name*/) { + std::vector prefill_list; + std::shared_lock lock(cluster_mutex_); + for (auto& inst : instances_) { + if ((inst.second.type == InstanceType::PREFILL || + inst.second.type == InstanceType::DEFAULT) && + is_instance_schedulable(inst.second)) { + prefill_list.emplace_back(inst.second.name); + } + } + + return prefill_list; +} + +std::shared_ptr InstanceTopologyImpl::get_channel( + const std::string& instance_name) { + std::shared_lock lock(cluster_mutex_); + auto iter = cached_channels_.find(instance_name); + if (iter == cached_channels_.end()) { + return nullptr; + } + return iter->second; +} + +bool InstanceTopologyImpl::bind_request_instance_incarnations( + const std::shared_ptr& request) { + std::shared_lock lock(cluster_mutex_); + + request->prefill_incarnation_id.clear(); + request->decode_incarnation_id.clear(); + + if (!request->routing.prefill_name.empty()) { + auto prefill_it = instances_.find(request->routing.prefill_name); + if (prefill_it == instances_.end()) { + LOG(ERROR) << "Prefill instance is not registered when binding request: " + << request->routing.prefill_name; + return false; + } + if (!is_instance_schedulable(prefill_it->second)) { + LOG(ERROR) << "Prefill instance is not schedulable when binding request: " + << request->routing.prefill_name << ", state: " + << runtime_state_name(prefill_it->second.runtime_state); + return false; + } + request->prefill_incarnation_id = prefill_it->second.incarnation_id; + } + + if (!request->routing.decode_name.empty()) { + auto decode_it = instances_.find(request->routing.decode_name); + if (decode_it == instances_.end()) { + LOG(ERROR) << "Decode instance is not registered when binding request: " + << request->routing.decode_name; + return false; + } + if (!is_instance_schedulable(decode_it->second)) { + LOG(ERROR) << "Decode instance is not schedulable when binding request: " + << request->routing.decode_name << ", state: " + << runtime_state_name(decode_it->second.runtime_state); + return false; + } + request->decode_incarnation_id = decode_it->second.incarnation_id; + } + + return true; +} + +bool InstanceTopologyImpl::record_instance_heartbeat( + const std::string& instance_name, + const std::string& incarnation_id) { + std::unique_lock lock(cluster_mutex_); + auto it = instances_.find(instance_name); + if (it == instances_.end()) { + LOG(WARNING) << "Ignore heartbeat from unknown instance: " << instance_name; + return false; + } + + if (it->second.incarnation_id != incarnation_id) { + LOG(WARNING) << "Ignore stale heartbeat from instance: " << instance_name + << ", current incarnation_id: " << it->second.incarnation_id + << ", heartbeat incarnation_id: " << incarnation_id; + return false; + } + + it->second.latest_timestamp = current_time_ms(); + if (it->second.runtime_state == InstanceRuntimeState::SUSPECT) { + clear_suspect_instance(instance_name, incarnation_id); + it->second.runtime_state = InstanceRuntimeState::LEASE_LOST; + LOG(WARNING) << "Heartbeat recovered for suspect instance, move to " + << "lease lost state: " << instance_name + << ", incarnation_id: " << incarnation_id; + } + return true; +} + +TopologySnapshot InstanceTopologyImpl::snapshot() const { + std::shared_lock lock(cluster_mutex_); + TopologySnapshot s; + s.instances = instances_; + for (const auto& [k, v] : suspect_instances_) { + SuspectInstanceEntry e; + e.incarnation_id = v.incarnation_id; + e.enter_ts_ms = v.enter_ts_ms; + s.suspect_instances[k] = std::move(e); + } + s.prefill_index = prefill_index_; + s.decode_index = decode_index_; + return s; +} + +void InstanceTopologyImpl::flip_prefill_to_decode( + const std::string& instance_name) { + std::unique_lock lk(cluster_mutex_); + std::string name = instance_name; + + if (count_schedulable_instances(instances_, prefill_index_) <= 1) { + return; + } + + if (instances_.find(name) == instances_.end()) { + LOG(ERROR) << "Can't find instance, instance_name: " << name; + return; + } + + remove_instance_from_index(name, instances_[name]); + + instances_[name].current_type = InstanceType::DECODE; + add_instance_to_index(name, instances_[name]); + + LOG(INFO) << "Flip prefill to decode, instance name : " << name; +} + +void InstanceTopologyImpl::flip_decode_to_prefill( + const std::string& instance_name) { + std::unique_lock lk(cluster_mutex_); + std::string name = instance_name; + + if (count_schedulable_instances(instances_, decode_index_) <= 1) { + return; + } + + if (instances_.find(name) == instances_.end()) { + LOG(ERROR) << "Can't find instance, instance_name: " << name; + return; + } + + remove_instance_from_index(name, instances_[name]); + + instances_[name].current_type = InstanceType::PREFILL; + add_instance_to_index(name, instances_[name]); + + LOG(INFO) << "Flip decode to prefill, instance name : " << name; +} + +bool InstanceTopologyImpl::init_brpc_channel( + const std::string& target_uri, + std::shared_ptr* out_channel) { + auto channel = std::make_shared(); + brpc::ChannelOptions options; + options.timeout_ms = options_.timeout_ms(); + options.max_retry = 3; + options.connect_timeout_ms = options_.connect_timeout_ms(); + std::string load_balancer = ""; + if (channel->Init(target_uri.c_str(), load_balancer.c_str(), &options) != + 0) { + LOG(ERROR) << "Fail to initialize channel for " << target_uri; + return false; + } + *out_channel = std::move(channel); + return true; +} + +bool InstanceTopologyImpl::probe_instance_health( + const std::string& instance_name) { + const int attempts = std::max(1, options_.instance_delete_probe_attempts()); + const int timeout_ms = + std::max(1, options_.instance_delete_probe_timeout_ms()); + const std::string url = "http://" + instance_name + kHealthPath; + + for (int attempt = 1; attempt <= attempts; ++attempt) { + brpc::Channel channel; + brpc::ChannelOptions options; + options.protocol = "http"; + options.timeout_ms = timeout_ms; + options.connect_timeout_ms = timeout_ms; + options.max_retry = 0; + if (channel.Init(url.c_str(), "", &options) != 0) { + LOG(WARNING) << "Failed to initialize health probe channel, instance: " + << instance_name << ", attempt: " << attempt << "/" + << attempts; + } else { + brpc::Controller cntl; + cntl.http_request().uri() = url; + cntl.http_request().set_method(brpc::HTTP_METHOD_GET); + channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr); + if (!cntl.Failed() && cntl.http_response().status_code() == 200) { + return true; + } + LOG(WARNING) << "Health probe failed, instance: " << instance_name + << ", attempt: " << attempt << "/" << attempts << ", error: " + << (cntl.Failed() ? cntl.ErrorText() + : std::to_string( + cntl.http_response().status_code())); + } + + if (attempt < attempts) { + std::this_thread::sleep_for( + std::chrono::milliseconds(kDeleteProbeRetryBackoffMs)); + } + } + + return false; +} + +void InstanceTopologyImpl::update_instance_metainfo( + const etcd::Response& response, + const uint64_t& prefix_len) { + if (response.events().empty() || exited_) { + return; + } + + threadpool_.schedule([this, + response = std::move(response), + prefix_len = std::move(prefix_len)] { + if (exited_) return; + for (const auto& event : response.events()) { + const std::string instance_name = get_event_key_suffix(event, prefix_len); + if (instance_name.empty()) { + continue; + } + + if (event.event_type() == etcd::Event::EventType::PUT) { + InstanceMetaInfo metainfo; + auto json_str = get_event_value(event); + if (!metainfo.parse_from_json(json_str)) { + LOG(ERROR) << "Parse instance json failed: " << json_str; + continue; + } + + std::unique_lock lock(cluster_mutex_); + auto existing_it = instances_.find(instance_name); + if (existing_it == instances_.end()) { + lock.unlock(); + if (!register_instance(instance_name, metainfo)) { + LOG(ERROR) << "Fail to register instance: " << instance_name; + } + continue; + } + + if (existing_it->second.incarnation_id == metainfo.incarnation_id) { + const auto previous_state = existing_it->second.runtime_state; + refresh_instance_registration(instance_name, metainfo); + clear_suspect_instance(instance_name, metainfo.incarnation_id); + if (previous_state != InstanceRuntimeState::ACTIVE) { + LOG(INFO) << "Instance registration restored, back to active: " + << instance_name + << ", incarnation_id: " << metainfo.incarnation_id + << ", previous_state: " + << runtime_state_name(previous_state); + } + continue; + } + + const std::string old_incarnation_id = + existing_it->second.incarnation_id; + LOG(WARNING) << "Detected instance replacement, instance_name: " + << instance_name + << ", old incarnation_id: " << old_incarnation_id + << ", new incarnation_id: " << metainfo.incarnation_id; + lock.unlock(); + deregister_instance(instance_name, old_incarnation_id); + if (!register_instance(instance_name, metainfo)) { + LOG(ERROR) << "Fail to register replacement instance: " + << instance_name; + } + continue; + } + + if (event.event_type() != etcd::Event::EventType::DELETE_) { + continue; + } + + InstanceMetaInfo deleted_info; + std::string deleted_incarnation_id; + const auto deleted_value = get_event_value(event); + if (!deleted_value.empty() && + deleted_info.parse_from_json(deleted_value)) { + deleted_incarnation_id = deleted_info.incarnation_id; + } + std::string tracked_incarnation_id; + { + std::unique_lock lock(cluster_mutex_); + auto existing_it = instances_.find(instance_name); + if (existing_it == instances_.end()) { + continue; + } + + if (!deleted_incarnation_id.empty() && + existing_it->second.incarnation_id != deleted_incarnation_id) { + LOG(INFO) << "Ignore stale delete for replaced instance: " + << instance_name + << ", deleted incarnation_id: " << deleted_incarnation_id + << ", current incarnation_id: " + << existing_it->second.incarnation_id; + continue; + } + tracked_incarnation_id = existing_it->second.incarnation_id; + } + + const bool probe_success = probe_instance_health(instance_name); + + std::unique_lock lock(cluster_mutex_); + auto existing_it = instances_.find(instance_name); + if (existing_it == instances_.end() || + existing_it->second.incarnation_id != tracked_incarnation_id) { + continue; + } + + if (probe_success) { + clear_suspect_instance(instance_name, tracked_incarnation_id); + existing_it->second.runtime_state = InstanceRuntimeState::LEASE_LOST; + existing_it->second.latest_timestamp = current_time_ms(); + LOG(WARNING) << "Instance lease deleted, probe succeeded, enter " + << "lease lost state: " << instance_name + << ", incarnation_id: " << tracked_incarnation_id; + continue; + } + + mark_instance_suspect(instance_name, tracked_incarnation_id); + LOG(WARNING) << "Instance lease deleted, probe failed, enter suspect " + << "state: " << instance_name + << ", incarnation_id: " << tracked_incarnation_id; + } + }); +} + +void InstanceTopologyImpl::reconcile_instance_states() { + const auto suspect_interval_ms = + std::max(1, options_.detect_disconnected_instance_interval()) * + 1000; + const auto heartbeat_timeout_ms = + std::max(1, options_.lease_lost_heartbeat_timeout_ms()); + while (!exited_) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + + std::vector> to_deregister; + + { + std::unique_lock lock(cluster_mutex_); + if (exited_) { + return; + } + + const uint64_t now_ms = current_time_ms(); + for (auto& [instance_name, info] : instances_) { + if (info.runtime_state != InstanceRuntimeState::LEASE_LOST) { + continue; + } + if (now_ms - info.latest_timestamp < heartbeat_timeout_ms) { + continue; + } + mark_instance_suspect(instance_name, info.incarnation_id); + LOG(WARNING) << "Lease lost instance heartbeat timed out, enter suspect " + << "state: " << instance_name + << ", incarnation_id: " << info.incarnation_id; + } + + for (auto it = suspect_instances_.begin(); + it != suspect_instances_.end();) { + const std::string instance_name = it->first; + const std::string incarnation_id = it->second.incarnation_id; + const uint64_t enter_ts_ms = it->second.enter_ts_ms; + ++it; + + if (now_ms - enter_ts_ms < suspect_interval_ms) { + continue; + } + + auto inst_it = instances_.find(instance_name); + if (inst_it == instances_.end() || + inst_it->second.incarnation_id != incarnation_id) { + suspect_instances_.erase(instance_name); + continue; + } + + LOG(WARNING) << "Suspect window expired, deregister instance: " + << instance_name << ", incarnation_id: " << incarnation_id; + to_deregister.emplace_back(instance_name, incarnation_id); + } + } + + for (const auto& p : to_deregister) { + deregister_instance(p.first, p.second); + } + } +} + +void InstanceTopologyImpl::refresh_instance_registration( + const std::string& name, + const InstanceMetaInfo& info) { + auto it = instances_.find(name); + if (it == instances_.end()) { + return; + } + + const auto instance_index = it->second.instance_index; + const auto current_type = it->second.current_type; + + it->second = info; + it->second.instance_index = instance_index; + it->second.current_type = current_type; + it->second.latest_timestamp = current_time_ms(); + it->second.runtime_state = InstanceRuntimeState::ACTIVE; +} + +void InstanceTopologyImpl::mark_instance_suspect(const std::string& name, + const std::string& incarnation_id) { + SuspectInstanceInfo info; + info.incarnation_id = incarnation_id; + info.enter_ts_ms = current_time_ms(); + suspect_instances_[name] = std::move(info); + auto it = instances_.find(name); + if (it != instances_.end() && it->second.incarnation_id == incarnation_id) { + it->second.runtime_state = InstanceRuntimeState::SUSPECT; + } +} + +void InstanceTopologyImpl::clear_suspect_instance( + const std::string& name, + const std::string& incarnation_id) { + auto it = suspect_instances_.find(name); + if (it == suspect_instances_.end()) { + return; + } + if (!incarnation_id.empty() && it->second.incarnation_id != incarnation_id) { + return; + } + suspect_instances_.erase(it); +} + +bool InstanceTopologyImpl::call_link_instance( + const std::string& target_rpc_addr, + const InstanceMetaInfo& peer_info) { + brpc::Channel channel; + brpc::ChannelOptions options; + options.protocol = "http"; + options.timeout_ms = options_.timeout_ms(); + options.max_retry = 3; + if (channel.Init(target_rpc_addr.c_str(), "", &options) != 0) { + LOG(ERROR) << "Fail to initialize channel for LinkInstance to " + << target_rpc_addr; + return false; + } + xllm::proto::DisaggPDService_Stub stub(&channel); + brpc::Controller cntl; + xllm::proto::InstanceClusterInfo req; + req.set_instance_name(peer_info.name); + for (auto& cluster_id : peer_info.cluster_ids) { + req.add_cluster_ids(cluster_id); + } + for (auto& addr : peer_info.addrs) { + req.add_addrs(addr); + } + for (auto& ip : peer_info.device_ips) { + req.add_device_ips(ip); + } + for (auto& port : peer_info.ports) { + req.add_ports(port); + } + req.set_dp_size(peer_info.dp_size); + xllm::proto::Status res; + stub.LinkInstance(&cntl, &req, &res, nullptr); + if (cntl.Failed()) { + LOG(ERROR) << "LinkInstance failed, target: " << target_rpc_addr + << ", peer: " << peer_info.name + << ", error: " << cntl.ErrorText(); + return false; + } + return res.ok(); +} + +bool InstanceTopologyImpl::call_unlink_instance( + const std::string& target_rpc_addr, + const InstanceMetaInfo& peer_info) { + brpc::Channel channel; + brpc::ChannelOptions options; + options.protocol = "http"; + options.timeout_ms = options_.timeout_ms(); + options.max_retry = 3; + if (channel.Init(target_rpc_addr.c_str(), "", &options) != 0) { + LOG(ERROR) << "Fail to initialize channel for UnlinkInstance to " + << target_rpc_addr; + return false; + } + xllm::proto::DisaggPDService_Stub stub(&channel); + brpc::Controller cntl; + xllm::proto::InstanceClusterInfo req; + req.set_instance_name(peer_info.name); + for (auto& cluster_id : peer_info.cluster_ids) { + req.add_cluster_ids(cluster_id); + } + for (auto& addr : peer_info.addrs) { + req.add_addrs(addr); + } + for (auto& ip : peer_info.device_ips) { + req.add_device_ips(ip); + } + for (auto& port : peer_info.ports) { + req.add_ports(port); + } + req.set_dp_size(peer_info.dp_size); + xllm::proto::Status res; + stub.UnlinkInstance(&cntl, &req, &res, nullptr); + if (cntl.Failed()) { + LOG(ERROR) << "UnlinkInstance failed, target: " << target_rpc_addr + << ", peer: " << peer_info.name + << ", error: " << cntl.ErrorText(); + return false; + } + return res.ok(); +} + +void InstanceTopologyImpl::remove_channel_and_metrics_maps( + const std::string& name) { + cached_channels_.erase(name); + if (remove_instance_metrics_maps_cb_) { + remove_instance_metrics_maps_cb_(name); + } +} + +bool InstanceTopologyImpl::register_instance(const std::string& name, + InstanceMetaInfo& info) { + info.runtime_state = InstanceRuntimeState::ACTIVE; + info.latest_timestamp = current_time_ms(); + info.name = name; + + { + std::unique_lock lock(cluster_mutex_); + if (instances_.find(name) != instances_.end() || + cached_channels_.find(name) != cached_channels_.end()) { + LOG(ERROR) << "Instance is already registered, instance_name: " << name; + return false; + } + } + + std::shared_ptr channel; + if (!init_brpc_channel(name, &channel)) { + LOG(ERROR) << "create channel fail: " << name; + return false; + } + + { + std::unique_lock lock(cluster_mutex_); + if (instances_.find(name) != instances_.end() || + cached_channels_.find(name) != cached_channels_.end()) { + LOG(WARNING) << "Instance registered concurrently during channel init: " + << name; + return false; + } + cached_channels_[name] = std::move(channel); + } + + if (add_instance_metrics_cb_) { + add_instance_metrics_cb_(name, info); + } + + std::vector> link_ops; + { + std::unique_lock lock(cluster_mutex_); + if (!gather_link_operations(info, &link_ops)) { + remove_channel_and_metrics_maps(name); + return false; + } + } + + if (!run_link_operations(link_ops)) { + std::unique_lock lock(cluster_mutex_); + remove_channel_and_metrics_maps(name); + return false; + } + + { + std::unique_lock lock(cluster_mutex_); + add_instance_to_index(name, info); + instances_.insert(std::make_pair(name, info)); + } + return true; +} + +void InstanceTopologyImpl::deregister_instance( + const std::string& name, + const std::string& expected_incarnation_id) { + InstanceMetaInfo info; + std::vector> unlink_ops; + { + std::unique_lock lock(cluster_mutex_); + auto it = instances_.find(name); + if (it == instances_.end()) { + LOG(ERROR) << "Instance is not registered, instance_name: " << name; + return; + } + + if (!expected_incarnation_id.empty() && + it->second.incarnation_id != expected_incarnation_id) { + LOG(INFO) << "Skip deregistering stale incarnation, instance_name: " << name + << ", current incarnation_id: " << it->second.incarnation_id + << ", expected incarnation_id: " << expected_incarnation_id; + return; + } + + info = it->second; + clear_suspect_instance(name, info.incarnation_id); + gather_unlink_operations(name, info, &unlink_ops); + } + + for (const auto& op : unlink_ops) { + call_unlink_instance(op.first, op.second); + } + + { + std::unique_lock lock(cluster_mutex_); + auto it = instances_.find(name); + if (it == instances_.end()) { + return; + } + remove_instance_from_index(name, it->second); + } + + if (on_instance_deregistered_cb_) { + on_instance_deregistered_cb_(name, info.incarnation_id, + get_cleanup_type(info)); + } + + { + std::unique_lock lock(cluster_mutex_); + auto it = instances_.find(name); + if (it == instances_.end()) { + return; + } + remove_channel_and_metrics_maps(name); + instances_.erase(it); + } + LOG(INFO) << "delete instance: " << name; +} + +bool InstanceTopologyImpl::gather_link_operations( + const InstanceMetaInfo& info, + std::vector>* out_ops) { + out_ops->clear(); + switch (info.type) { + case InstanceType::DEFAULT: + break; + case InstanceType::PREFILL: { + for (auto& d_name : decode_index_) { + out_ops->emplace_back(instances_[d_name].rpc_address, info); + } + break; + } + case InstanceType::DECODE: { + for (auto& p_name : prefill_index_) { + out_ops->emplace_back(info.rpc_address, instances_[p_name]); + } + break; + } + case InstanceType::MIX: { + for (const auto& [peer_name, peer_info] : instances_) { + if (peer_name == info.name) { + continue; + } + out_ops->emplace_back(info.rpc_address, peer_info); + } + break; + } + default: + LOG(WARNING) << "Unknown InstanceType: " << int(info.type); + return false; + } + return true; +} + +bool InstanceTopologyImpl::run_link_operations( + const std::vector>& ops) { + for (size_t i = 0; i < ops.size(); ++i) { + if (!call_link_instance(ops[i].first, ops[i].second)) { + LOG(ERROR) << "Fail to link instance during registration, op index " << i; + for (size_t j = 0; j < i; ++j) { + call_unlink_instance(ops[j].first, ops[j].second); + } + return false; + } + } + return true; +} + +void InstanceTopologyImpl::gather_unlink_operations( + const std::string& name, + const InstanceMetaInfo& info, + std::vector>* out_ops) { + out_ops->clear(); + if (info.type == InstanceType::PREFILL) { + for (auto& d_name : decode_index_) { + out_ops->emplace_back(instances_[d_name].rpc_address, info); + } + } else if (info.type == InstanceType::DECODE) { + for (auto& p_name : prefill_index_) { + out_ops->emplace_back(instances_[p_name].rpc_address, info); + } + } else if (info.type == InstanceType::MIX) { + for (const auto& [peer_name, peer_info] : instances_) { + if (peer_name == name) { + continue; + } + out_ops->emplace_back(peer_info.rpc_address, info); + } + } +} + +void InstanceTopologyImpl::add_instance_to_index(const std::string& name, + InstanceMetaInfo& info) { + switch (info.type) { + case InstanceType::DEFAULT: + info.instance_index = prefill_index_.size(); + prefill_index_.emplace_back(name); + LOG(INFO) << "Register a new default instance, instance name : " << name; + break; + case InstanceType::PREFILL: + info.instance_index = prefill_index_.size(); + prefill_index_.emplace_back(name); + LOG(INFO) << "Register a new prefill instance, instance name : " << name; + break; + case InstanceType::DECODE: + info.instance_index = decode_index_.size(); + decode_index_.emplace_back(name); + LOG(INFO) << "Register a new decode instance, instance name : " << name; + break; + case InstanceType::MIX: + if (decode_index_.size() > 0) { + info.instance_index = prefill_index_.size(); + info.current_type = InstanceType::PREFILL; + prefill_index_.emplace_back(name); + LOG(INFO) << "Register a new prefill instance, instance name : " + << name; + } else { + info.instance_index = decode_index_.size(); + info.current_type = InstanceType::DECODE; + decode_index_.emplace_back(name); + LOG(INFO) << "Register a new decode instance, instance name : " << name; + } + break; + default: + break; + } +} + +void InstanceTopologyImpl::remove_instance_from_index( + const std::string& name, + const InstanceMetaInfo& info) { + uint64_t index = info.instance_index; + if (index == static_cast(-1)) return; + + auto remove_from_vec = [&](std::vector& vec) { + if (index >= vec.size()) return; + std::swap(vec[index], vec.back()); + instances_[vec[index]].instance_index = index; + vec.pop_back(); + }; + + switch (info.type) { + case InstanceType::DEFAULT: + case InstanceType::PREFILL: + remove_from_vec(prefill_index_); + break; + case InstanceType::DECODE: + remove_from_vec(decode_index_); + break; + case InstanceType::MIX: + if (info.current_type == InstanceType::PREFILL) { + remove_from_vec(prefill_index_); + } else { + remove_from_vec(decode_index_); + } + break; + default: + break; + } +} + +} // namespace xllm_service diff --git a/xllm_service/scheduler/managers/instance_topology.h b/xllm_service/scheduler/managers/instance_topology.h new file mode 100644 index 0000000..08c59b4 --- /dev/null +++ b/xllm_service/scheduler/managers/instance_topology.h @@ -0,0 +1,186 @@ +/* Copyright 2025 The xLLM Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://github.com/jd-opensource/xllm-service/blob/main/LICENSE + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/macros.h" +#include "common/options.h" +#include "common/threadpool.h" +#include "common/types.h" +#include "request/request.h" +#include "scheduler/etcd_client/etcd_client.h" + +namespace xllm_service { + +struct SuspectInstanceEntry { + std::string incarnation_id; + uint64_t enter_ts_ms = 0; +}; + +struct TopologySnapshot { + std::unordered_map instances; + std::unordered_map suspect_instances; + std::vector prefill_index; + std::vector decode_index; +}; + +using AddInstanceMetricsCallback = + std::function; +using RemoveInstanceMetricsCallback = std::function; +// Invoked when an instance is removed from topology (e.g. etcd deregister). +// Typically forwards to Scheduler::clear_requests_on_failed_instance. +using OnInstanceDeregisteredCallback = std::function; + +// Cluster instance registry, indices, runtime state, and brpc channels. +class InstanceTopology { + public: + virtual ~InstanceTopology() = default; + + virtual InstanceMetaInfo get_instance_info( + const std::string& instance_name) = 0; + + virtual std::vector get_static_decode_list( + const std::string& instance_name) = 0; + + virtual std::vector get_static_prefill_list( + const std::string& instance_name) = 0; + + virtual std::shared_ptr get_channel( + const std::string& instance_name) = 0; + + virtual bool bind_request_instance_incarnations( + const std::shared_ptr& request) = 0; + + virtual bool record_instance_heartbeat(const std::string& instance_name, + const std::string& incarnation_id) = 0; + + virtual TopologySnapshot snapshot() const = 0; + + virtual void flip_prefill_to_decode(const std::string& instance_name) = 0; + virtual void flip_decode_to_prefill(const std::string& instance_name) = 0; +}; + +// Default implementation extracted from InstanceMgr (etcd discovery, channels, +// link/unlink, suspect / lease-lost state machine). Metrics maps live in +// InstanceMgr; register/deregister invoke callbacks to add/remove them. +class InstanceTopologyImpl final : public InstanceTopology { + public: + InstanceTopologyImpl(const Options& options, + const std::shared_ptr& etcd_client, + OnInstanceDeregisteredCallback on_instance_deregistered, + AddInstanceMetricsCallback add_instance_metrics, + RemoveInstanceMetricsCallback remove_instance_metrics_maps); + + ~InstanceTopologyImpl() override; + + // Load instance keys from etcd and register each (mirrors InstanceMgr::init + // instance loop). Starts the reconcile thread at the end. + void init_from_etcd_register_all(); + + InstanceMetaInfo get_instance_info(const std::string& instance_name) override; + std::vector get_static_decode_list( + const std::string& instance_name) override; + std::vector get_static_prefill_list( + const std::string& instance_name) override; + std::shared_ptr get_channel( + const std::string& instance_name) override; + bool bind_request_instance_incarnations( + const std::shared_ptr& request) override; + bool record_instance_heartbeat(const std::string& instance_name, + const std::string& incarnation_id) override; + TopologySnapshot snapshot() const override; + void flip_prefill_to_decode(const std::string& instance_name) override; + void flip_decode_to_prefill(const std::string& instance_name) override; + + private: + friend class InstanceMgr; + DISALLOW_COPY_AND_ASSIGN(InstanceTopologyImpl); + + bool register_instance(const std::string& name, InstanceMetaInfo& info); + void deregister_instance(const std::string& name, + const std::string& expected_incarnation_id = ""); + + bool init_brpc_channel(const std::string& target_uri, + std::shared_ptr* out_channel); + bool probe_instance_health(const std::string& instance_name); + void reconcile_instance_states(); + void refresh_instance_registration(const std::string& name, + const InstanceMetaInfo& info); + void mark_instance_suspect(const std::string& name, + const std::string& incarnation_id); + void clear_suspect_instance(const std::string& name, + const std::string& incarnation_id = ""); + void update_instance_metainfo(const etcd::Response& response, + const uint64_t& prefix_len); + + bool gather_link_operations( + const InstanceMetaInfo& info, + std::vector>* out_ops); + bool run_link_operations( + const std::vector>& ops); + void gather_unlink_operations( + const std::string& name, + const InstanceMetaInfo& info, + std::vector>* out_ops); + void add_instance_to_index(const std::string& name, InstanceMetaInfo& info); + void remove_instance_from_index(const std::string& name, + const InstanceMetaInfo& info); + bool call_link_instance(const std::string& target_rpc_addr, + const InstanceMetaInfo& peer_info); + bool call_unlink_instance(const std::string& target_rpc_addr, + const InstanceMetaInfo& peer_info); + + // Caller must hold cluster_mutex_ exclusively. + void remove_channel_and_metrics_maps(const std::string& name); + + Options options_; + bool exited_ = false; + std::shared_ptr etcd_client_; + OnInstanceDeregisteredCallback on_instance_deregistered_cb_; + AddInstanceMetricsCallback add_instance_metrics_cb_; + RemoveInstanceMetricsCallback remove_instance_metrics_maps_cb_; + + mutable std::shared_mutex cluster_mutex_; + std::unordered_map instances_; + struct SuspectInstanceInfo { + std::string incarnation_id; + uint64_t enter_ts_ms = 0; + }; + std::unordered_map suspect_instances_; + std::vector prefill_index_; + std::vector decode_index_; + std::unordered_map> + cached_channels_; + + ThreadPool threadpool_; + std::unique_ptr state_reconcile_thread_; +}; + +} // namespace xllm_service diff --git a/xllm_service/scheduler/scheduler.cpp b/xllm_service/scheduler/scheduler.cpp index cd3313d..ff571e4 100644 --- a/xllm_service/scheduler/scheduler.cpp +++ b/xllm_service/scheduler/scheduler.cpp @@ -58,14 +58,18 @@ Scheduler::Scheduler(const Options& options) : options_(options) { } instance_mgr_ = std::make_shared( - options, etcd_client_, is_master_service_, this); - - global_kvcache_mgr_ = std::make_shared( - options, etcd_client_, is_master_service_); + options, + etcd_client_, + is_master_service_, + [this](const std::string& instance_name, + const std::string& incarnation_id, + InstanceType cleanup_type) { + clear_requests_on_failed_instance( + instance_name, incarnation_id, cleanup_type); + }); if (options.load_balance_policy() == "CAR") { - lb_policy_ = - std::make_unique(instance_mgr_, global_kvcache_mgr_); + lb_policy_ = std::make_unique(instance_mgr_); } else if (options.load_balance_policy() == "SLO_AWARE") { lb_policy_ = std::make_unique(options, instance_mgr_); } else { @@ -143,9 +147,7 @@ void Scheduler::update_master_service_heartbeat() { while (!exited_) { std::this_thread::sleep_for(std::chrono::seconds(kHeartbeatInterval)); - global_kvcache_mgr_->upload_kvcache(); - - instance_mgr_->upload_load_metrics(); + instance_mgr_->upload_master_state_to_etcd(); } } @@ -169,14 +171,10 @@ bool Scheduler::handle_instance_heartbeat(const proto::HeartbeatRequest* req) { if (exited_) { return false; } - if (!instance_mgr_->record_instance_heartbeat(req->name(), - req->incarnation_id())) { + if (req == nullptr) { return false; } - global_kvcache_mgr_->record_updated_kvcaches(req->name(), req->cache_event()); - instance_mgr_->record_load_metrics_update(req->name(), req->load_metrics()); - instance_mgr_->update_latency_metrics(req->name(), req->latency_metrics()); - return true; + return instance_mgr_->on_instance_heartbeat(*req); } void Scheduler::handle_master_service_watch(const etcd::Response& response, @@ -193,7 +191,6 @@ void Scheduler::handle_master_service_watch(const etcd::Response& response, heartbeat_thread_ = std::make_unique( &Scheduler::update_master_service_heartbeat, this); - global_kvcache_mgr_->set_as_master(); instance_mgr_->set_as_master(); } } diff --git a/xllm_service/scheduler/scheduler.h b/xllm_service/scheduler/scheduler.h index 4c04ab3..49f0fa1 100644 --- a/xllm_service/scheduler/scheduler.h +++ b/xllm_service/scheduler/scheduler.h @@ -22,7 +22,6 @@ limitations under the License. #include "common/xllm/output.h" #include "etcd_client/etcd_client.h" #include "loadbalance_policy/loadbalance_policy.h" -#include "managers/global_kvcache_mgr.h" #include "managers/instance_mgr.h" #include "request/request.h" #include "response_handler.h" @@ -111,8 +110,6 @@ class Scheduler final { std::shared_ptr instance_mgr_; - std::shared_ptr global_kvcache_mgr_; - std::unique_ptr lb_policy_; std::unique_ptr heartbeat_thread_;