From 77ab14aa7a8e57c64b9eb274b4fa64615f43d788 Mon Sep 17 00:00:00 2001 From: hll1213181368 Date: Wed, 17 Dec 2025 14:20:43 +0800 Subject: [PATCH 1/2] perf(cluster): use multithreads to optimize dbsize scan --- kvrocks.conf | 5 + src/commands/cmd_server.cc | 18 +++- src/config/config.cc | 10 ++ src/config/config.h | 1 + src/server/server.cc | 2 +- src/server/server.h | 6 ++ src/storage/redis_db.cc | 174 ++++++++++++++++++++++++++++++++++- src/storage/redis_db.h | 10 +- src/storage/redis_metadata.h | 1 + src/storage/storage.h | 2 + 10 files changed, 223 insertions(+), 6 deletions(-) diff --git a/kvrocks.conf b/kvrocks.conf index 48884a20bac..d5255594940 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -657,6 +657,11 @@ compaction-checker-cron * 0-7 * * * # e.g. dbsize-scan-cron 0 * * * * # would recalculate the keyspace infos of the db every hour. +# The parallelism of Dbsize scan key +# Default: the half number of Kvrocks node cores +# e.g. dbsize-scan-key-parallelism 4 +# would scan the slotRange with 4 threads parallel. + # Command renaming. # # It is possible to change the name of dangerous commands in a shared diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index 212d44a49a0..0a7413ef649 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -306,7 +306,23 @@ class CommandDBSize : public Commander { srv->GetLatestKeyNumStats(ns, &stats); *output = redis::Integer(stats.n_key); } else if (args_.size() == 2 && util::EqualICase(args_[1], "scan")) { - Status s = srv->AsyncScanDBSize(ns); + auto local_ip_addresses = util::GetLocalIPAddresses(); + std::vector infos; + Status s = srv->cluster->GetSlotsInfo(&infos); + std::vector slot_ranges; + if (s.IsOK()) { + for (const auto &info : infos) { + for (const auto &n : info.nodes) { + if (std::find(local_ip_addresses.begin(), local_ip_addresses.end(), n.host) != local_ip_addresses.end()) { + slot_ranges.emplace_back(info.start, info.end); + } + } + } + } else { + return s; + } + srv->SetSlotRanges(slot_ranges); + s = srv->AsyncScanDBSize(ns); if (s.IsOK()) { *output = redis::RESP_OK; } else { diff --git a/src/config/config.cc b/src/config/config.cc index 52690ca8e3c..427f5e5a322 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -177,6 +177,7 @@ Config::Config() { {"compact-cron", false, new StringField(&compact_cron_str_, "")}, {"bgsave-cron", false, new StringField(&bgsave_cron_str_, "")}, {"dbsize-scan-cron", false, new StringField(&dbsize_scan_cron_str_, "")}, + {"dbsize-scan-key-parallelism", false, new IntField(&dbsize_scan_key_parallelism, 0, 0, INT_MAX)}, {"replica-announce-ip", false, new StringField(&replica_announce_ip, "")}, {"replica-announce-port", false, new UInt32Field(&replica_announce_port, 0, 0, PORT_LIMIT)}, {"compaction-checker-range", false, new StringField(&compaction_checker_range_str_, "")}, @@ -574,6 +575,15 @@ void Config::initFieldCallback() { srv->storage->SetIORateLimit(max_io_mb); return Status::OK(); }}, + {"dbsize-scan-key-parallelism", + [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, + [[maybe_unused]] const std::string &v) -> Status { + if (dbsize_scan_key_parallelism == 0) { + unsigned int max_parallelism = std::thread::hardware_concurrency(); + dbsize_scan_key_parallelism = static_cast(max_parallelism) / 2; + } + return Status::OK(); + }}, {"profiling-sample-record-max-len", [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK(); diff --git a/src/config/config.h b/src/config/config.h index 036e506e8eb..1b9b8899393 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -161,6 +161,7 @@ struct Config { bool repl_namespace_enabled = false; std::string replica_announce_ip; uint32_t replica_announce_port = 0; + int dbsize_scan_key_parallelism; bool persist_cluster_nodes_enabled = true; bool slot_id_encoded = false; diff --git a/src/server/server.cc b/src/server/server.cc index a952cdadd28..061e063a3f1 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -1626,7 +1626,7 @@ Status Server::AsyncScanDBSize(const std::string &ns) { KeyNumStats stats; engine::Context ctx(storage); - auto s = db.GetKeyNumStats(ctx, "", &stats); + auto s = db.GetKeyNumStats(ctx, "", &stats, GetSlotRanges()); if (!s.ok()) { error("failed to retrieve key num stats: {}", s.ToString()); } diff --git a/src/server/server.h b/src/server/server.h index 597ea00aec6..b05681e6559 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -346,6 +346,9 @@ class Server { AuthResult AuthenticateUser(const std::string &user_password, std::string *ns); + void SetSlotRanges(std::vector &slot_ranges) { slot_ranges_ = slot_ranges; }; + std::vector *GetSlotRanges() { return &slot_ranges_; }; + #ifdef ENABLE_OPENSSL UniqueSSLContext ssl_ctx; #endif @@ -450,4 +453,7 @@ class Server { std::atomic cursor_counter_ = {0}; using CursorDictType = std::array; std::unique_ptr cursor_dict_; + + // slot_ranges + std::vector slot_ranges_; }; diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index c12bb00ec0f..baf868cc91e 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -24,7 +24,6 @@ #include #include "cluster/redis_slot.h" -#include "common/scope_exit.h" #include "common/string_util.h" #include "db_util.h" #include "parse_util.h" @@ -242,8 +241,9 @@ rocksdb::Status Database::GetExpireTime(engine::Context &ctx, const Slice &user_ return rocksdb::Status::OK(); } -rocksdb::Status Database::GetKeyNumStats(engine::Context &ctx, const std::string &prefix, KeyNumStats *stats) { - return Keys(ctx, prefix, "*", nullptr, stats); +rocksdb::Status Database::GetKeyNumStats(engine::Context &ctx, const std::string &prefix, KeyNumStats *stats, + const std::vector *slot_ranges) { + return KeysParallel(ctx, prefix, "*", nullptr, stats, slot_ranges); } rocksdb::Status Database::Keys(engine::Context &ctx, const std::string &prefix, const std::string &suffix_glob, @@ -315,6 +315,174 @@ rocksdb::Status Database::Keys(engine::Context &ctx, const std::string &prefix, return rocksdb::Status::OK(); } +rocksdb::Status Database::KeysParallel(engine::Context &ctx, const std::string &prefix, const std::string &suffix_glob, + std::vector *keys, KeyNumStats *stats, + const std::vector *current_slot_ranges) { + if (!storage_->IsSlotIdEncoded()) { + return Keys(ctx, prefix, "*", nullptr, stats); + } + if (current_slot_ranges->empty()) { + return rocksdb::Status::InvalidArgument("slot_ranges is empty"); + } + + int total_slots = 0; + for (const auto &range : *current_slot_ranges) { + total_slots += (range.end - range.start + 1); + } + + int parallelism = storage_->GetDBScanKeyParallelism(); + + std::vector> thread_ranges(parallelism); + int slots_per_thread = total_slots / parallelism; + int remain_slots = total_slots % parallelism; + + int current_slot_count = 0; + int current_thread = 0; + int slots_for_current_thread = slots_per_thread + (current_thread < remain_slots ? 1 : 0); + + for (const auto &range : *current_slot_ranges) { + int range_size = range.end - range.start + 1; + int range_start = range.start; + int remaining_in_range = range_size; + + while (remaining_in_range > 0 && current_thread < parallelism) { + int slots_to_take = std::min(remaining_in_range, slots_for_current_thread - current_slot_count); + + int take_end = range_start + slots_to_take - 1; + thread_ranges[current_thread].emplace_back(range_start, take_end); + + range_start += slots_to_take; + remaining_in_range -= slots_to_take; + current_slot_count += slots_to_take; + + if (current_slot_count >= slots_for_current_thread) { + current_thread++; + if (current_thread < parallelism) { + slots_for_current_thread = slots_per_thread + (current_thread < remain_slots ? 1 : 0); + current_slot_count = 0; + } + } + } + } + + std::vector workers; + std::vector thread_statuses(parallelism); + std::vector> thread_keys(parallelism); + std::vector thread_stats(parallelism); + + for (int i = 0; i < parallelism; ++i) { + workers.emplace_back([&, i]() { + for (const auto &range : thread_ranges[i]) { + auto status = ProcessSlotRange(ctx, prefix, suffix_glob, range.start, range.end, + keys ? &thread_keys[i] : nullptr, stats ? &thread_stats[i] : nullptr); + if (!status.ok()) { + thread_statuses[i] = status; + return; + } + } + thread_statuses[i] = rocksdb::Status::OK(); + }); + } + + for (auto &worker : workers) { + worker.join(); + } + + for (const auto &status : thread_statuses) { + if (!status.ok()) { + return status; + } + } + + if (keys) { + for (auto &thread_key : thread_keys) { + keys->insert(keys->end(), std::make_move_iterator(thread_key.begin()), std::make_move_iterator(thread_key.end())); + } + } + + if (stats) { + for (auto &thread_stat : thread_stats) { + stats->n_key += thread_stat.n_key; + stats->n_expired += thread_stat.n_expired; + stats->n_expires += thread_stat.n_expires; + stats->ttl_sum += thread_stat.ttl_sum; + } + + if (stats->n_expires > 0) { + stats->avg_ttl = stats->ttl_sum / stats->n_expires / 1000; + } + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status Database::ProcessSlotRange(engine::Context &ctx, const std::string &prefix, + const std::string &suffix_glob, int start_slot, int end_slot, + std::vector *keys, KeyNumStats *stats) { + info("[ProcessSlotRangeOptimize] Begin batch {}-{}", start_slot, end_slot); + + uint64_t ttl_sum = 0; + KeyNumStats local_stats{0, 0, 0, 0, 0}; + + auto iter = util::UniqueIterator(ctx, ctx.GetReadOptions(), metadata_cf_handle_); + + for (uint16_t slot_id = start_slot; slot_id <= end_slot; ++slot_id) { // use iter with slot_id + std::string ns_prefix = ComposeNamespaceKey(namespace_, "", false); + PutFixed16(&ns_prefix, slot_id); + ns_prefix.append(prefix); + + uint64_t current_n_key = 0; + + iter->Seek(ns_prefix); + for (; iter->Valid(); iter->Next()) { + if (!iter->key().starts_with(ns_prefix)) { + break; + } + + auto [_, user_key] = ExtractNamespaceKey(iter->key(), storage_->IsSlotIdEncoded()); + if (!util::StringMatch(suffix_glob, user_key.ToString().substr(prefix.size()))) { + continue; + } + + Metadata metadata(kRedisNone, false); + auto s = metadata.Decode(iter->value()); + if (!s.ok()) continue; + + if (metadata.Expired()) { + if (stats) local_stats.n_expired++; + continue; + } + + if (stats) { + int64_t ttl = metadata.TTL(); + local_stats.n_key++; + current_n_key++; + if (ttl != -1) { + local_stats.n_expires++; + if (ttl > 0) ttl_sum += ttl; + } + } + + if (keys) { + keys->emplace_back(user_key.ToString()); + } + } + + info("[ProcessSlotRangeOptimize] current slotId is {}, keys size is {}", slot_id, current_n_key); + + if (auto s = iter->status(); !s.ok()) { + return s; + } + } + + local_stats.ttl_sum = ttl_sum; + if (stats) { + *stats = local_stats; + } + + return rocksdb::Status::OK(); +} + rocksdb::Status Database::Scan(engine::Context &ctx, const std::string &cursor, uint64_t limit, const std::string &prefix, const std::string &suffix_glob, std::vector *keys, std::string *end_cursor, RedisType type, diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h index 16d98dcd395..1222bc46b40 100644 --- a/src/storage/redis_db.h +++ b/src/storage/redis_db.h @@ -118,9 +118,17 @@ class Database { [[nodiscard]] rocksdb::Status Dump(engine::Context &ctx, const Slice &user_key, std::vector *infos); [[nodiscard]] rocksdb::Status FlushDB(engine::Context &ctx); [[nodiscard]] rocksdb::Status FlushAll(engine::Context &ctx); - [[nodiscard]] rocksdb::Status GetKeyNumStats(engine::Context &ctx, const std::string &prefix, KeyNumStats *stats); + [[nodiscard]] rocksdb::Status GetKeyNumStats(engine::Context &ctx, const std::string &prefix, KeyNumStats *stats, + const std::vector *slot_ranges); [[nodiscard]] rocksdb::Status Keys(engine::Context &ctx, const std::string &prefix, const std::string &suffix_glob, std::vector *keys = nullptr, KeyNumStats *stats = nullptr); + [[nodiscard]] rocksdb::Status KeysParallel(engine::Context &ctx, const std::string &prefix, + const std::string &suffix_glob, std::vector *keys = nullptr, + KeyNumStats *stats = nullptr, + const std::vector *slot_ranges = nullptr); + [[nodiscard]] rocksdb::Status ProcessSlotRange(engine::Context &ctx, const std::string &prefix, + const std::string &suffix_glob, int start_slot, int end_slot, + std::vector *keys, KeyNumStats *stats); [[nodiscard]] rocksdb::Status Scan(engine::Context &ctx, const std::string &cursor, uint64_t limit, const std::string &prefix, const std::string &suffix_glob, std::vector *keys, std::string *end_cursor = nullptr, diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index fd80e5a5ba0..bd17269c2d3 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -110,6 +110,7 @@ struct KeyNumStats { uint64_t n_expires = 0; uint64_t n_expired = 0; uint64_t avg_ttl = 0; + uint64_t ttl_sum = 0; }; [[nodiscard]] uint16_t ExtractSlotId(Slice ns_key); diff --git a/src/storage/storage.h b/src/storage/storage.h index 69afeff2829..6b3caf31f77 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -363,6 +363,8 @@ class Storage { std::string GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq); std::string GetReplIdFromDbEngine(); + int GetDBScanKeyParallelism() const { return config_->dbsize_scan_key_parallelism; } + private: std::unique_ptr db_ = nullptr; std::string replid_; From 81ee7bd1f6c77038dbccf08c049157602b0e71a6 Mon Sep 17 00:00:00 2001 From: Lele Huang Date: Thu, 18 Dec 2025 13:04:21 +0800 Subject: [PATCH 2/2] adjust init dbsize-scan-key-parallelism configuration Removed the 'dbsize-scan-key-parallelism' configuration handling from the config.cc file. --- src/config/config.cc | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/config/config.cc b/src/config/config.cc index 427f5e5a322..6fdb89c4f95 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -371,6 +371,14 @@ void Config::initFieldValidator() { std::vector args = util::Split(v, " \t"); return dbsize_scan_cron.SetScheduleTime(args); }}, + {"dbsize-scan-key-parallelism", + [this]([[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { + if (dbsize_scan_key_parallelism == 0) { + unsigned int max_parallelism = std::thread::hardware_concurrency(); + dbsize_scan_key_parallelism = static_cast(max_parallelism) / 2; + } + return Status::OK(); + }}, {"compaction-checker-range", [this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status { if (!compaction_checker_cron_str_.empty()) { @@ -575,15 +583,6 @@ void Config::initFieldCallback() { srv->storage->SetIORateLimit(max_io_mb); return Status::OK(); }}, - {"dbsize-scan-key-parallelism", - [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, - [[maybe_unused]] const std::string &v) -> Status { - if (dbsize_scan_key_parallelism == 0) { - unsigned int max_parallelism = std::thread::hardware_concurrency(); - dbsize_scan_key_parallelism = static_cast(max_parallelism) / 2; - } - return Status::OK(); - }}, {"profiling-sample-record-max-len", [this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status { if (!srv) return Status::OK();