Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,11 @@ compaction-checker-cron * 0-7 * * *
# e.g. dbsize-scan-cron 0 * * * *
# would recalculate the keyspace infos of the db every hour.

# The parallelism of Dbsize scan key
# Default: the half number of Kvrocks node cores
# e.g. dbsize-scan-key-parallelism 4
# would scan the slotRange with 4 threads parallel.

# Command renaming.
#
# It is possible to change the name of dangerous commands in a shared
Expand Down
18 changes: 17 additions & 1 deletion src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,23 @@ class CommandDBSize : public Commander {
srv->GetLatestKeyNumStats(ns, &stats);
*output = redis::Integer(stats.n_key);
} else if (args_.size() == 2 && util::EqualICase(args_[1], "scan")) {
Status s = srv->AsyncScanDBSize(ns);
auto local_ip_addresses = util::GetLocalIPAddresses();
std::vector<SlotInfo> infos;
Status s = srv->cluster->GetSlotsInfo(&infos);
std::vector<SlotRange> slot_ranges;
if (s.IsOK()) {
for (const auto &info : infos) {
for (const auto &n : info.nodes) {
if (std::find(local_ip_addresses.begin(), local_ip_addresses.end(), n.host) != local_ip_addresses.end()) {
slot_ranges.emplace_back(info.start, info.end);
}
}
}
} else {
return s;
}
srv->SetSlotRanges(slot_ranges);
s = srv->AsyncScanDBSize(ns);
if (s.IsOK()) {
*output = redis::RESP_OK;
} else {
Expand Down
9 changes: 9 additions & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ Config::Config() {
{"compact-cron", false, new StringField(&compact_cron_str_, "")},
{"bgsave-cron", false, new StringField(&bgsave_cron_str_, "")},
{"dbsize-scan-cron", false, new StringField(&dbsize_scan_cron_str_, "")},
{"dbsize-scan-key-parallelism", false, new IntField(&dbsize_scan_key_parallelism, 0, 0, INT_MAX)},
{"replica-announce-ip", false, new StringField(&replica_announce_ip, "")},
{"replica-announce-port", false, new UInt32Field(&replica_announce_port, 0, 0, PORT_LIMIT)},
{"compaction-checker-range", false, new StringField(&compaction_checker_range_str_, "")},
Expand Down Expand Up @@ -371,6 +372,14 @@ void Config::initFieldValidator() {
std::vector<std::string> args = util::Split(v, " \t");
return dbsize_scan_cron.SetScheduleTime(args);
}},
{"dbsize-scan-key-parallelism",
[this]([[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (dbsize_scan_key_parallelism == 0) {
unsigned int max_parallelism = std::thread::hardware_concurrency();
dbsize_scan_key_parallelism = static_cast<int>(max_parallelism) / 2;
}
return Status::OK();
}},
{"compaction-checker-range",
[this]([[maybe_unused]] const std::string &k, const std::string &v) -> Status {
if (!compaction_checker_cron_str_.empty()) {
Expand Down
1 change: 1 addition & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ struct Config {
bool repl_namespace_enabled = false;
std::string replica_announce_ip;
uint32_t replica_announce_port = 0;
int dbsize_scan_key_parallelism;

bool persist_cluster_nodes_enabled = true;
bool slot_id_encoded = false;
Expand Down
2 changes: 1 addition & 1 deletion src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ Status Server::AsyncScanDBSize(const std::string &ns) {

KeyNumStats stats;
engine::Context ctx(storage);
auto s = db.GetKeyNumStats(ctx, "", &stats);
auto s = db.GetKeyNumStats(ctx, "", &stats, GetSlotRanges());
if (!s.ok()) {
ERROR("failed to retrieve key num stats: {}", s.ToString());
}
Expand Down
6 changes: 6 additions & 0 deletions src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ class Server {

AuthResult AuthenticateUser(const std::string &user_password, std::string *ns);

void SetSlotRanges(std::vector<SlotRange> &slot_ranges) { slot_ranges_ = slot_ranges; };
std::vector<SlotRange> *GetSlotRanges() { return &slot_ranges_; };

#ifdef ENABLE_OPENSSL
UniqueSSLContext ssl_ctx;
#endif
Expand Down Expand Up @@ -450,4 +453,7 @@ class Server {
std::atomic<uint16_t> cursor_counter_ = {0};
using CursorDictType = std::array<CursorDictElement, CURSOR_DICT_SIZE>;
std::unique_ptr<CursorDictType> cursor_dict_;

// slot_ranges
std::vector<SlotRange> slot_ranges_;
};
174 changes: 171 additions & 3 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <utility>

#include "cluster/redis_slot.h"
#include "common/scope_exit.h"
#include "common/string_util.h"
#include "db_util.h"
#include "parse_util.h"
Expand Down Expand Up @@ -242,8 +241,9 @@ rocksdb::Status Database::GetExpireTime(engine::Context &ctx, const Slice &user_
return rocksdb::Status::OK();
}

rocksdb::Status Database::GetKeyNumStats(engine::Context &ctx, const std::string &prefix, KeyNumStats *stats) {
return Keys(ctx, prefix, "*", nullptr, stats);
rocksdb::Status Database::GetKeyNumStats(engine::Context &ctx, const std::string &prefix, KeyNumStats *stats,
const std::vector<SlotRange> *slot_ranges) {
return KeysParallel(ctx, prefix, "*", nullptr, stats, slot_ranges);
}

rocksdb::Status Database::Keys(engine::Context &ctx, const std::string &prefix, const std::string &suffix_glob,
Expand Down Expand Up @@ -315,6 +315,174 @@ rocksdb::Status Database::Keys(engine::Context &ctx, const std::string &prefix,
return rocksdb::Status::OK();
}

rocksdb::Status Database::KeysParallel(engine::Context &ctx, const std::string &prefix, const std::string &suffix_glob,
std::vector<std::string> *keys, KeyNumStats *stats,
const std::vector<SlotRange> *current_slot_ranges) {
if (!storage_->IsSlotIdEncoded()) {
return Keys(ctx, prefix, "*", nullptr, stats);
}
if (current_slot_ranges->empty()) {
return rocksdb::Status::InvalidArgument("slot_ranges is empty");
}

int total_slots = 0;
for (const auto &range : *current_slot_ranges) {
total_slots += (range.end - range.start + 1);
}

int parallelism = storage_->GetDBScanKeyParallelism();

std::vector<std::vector<SlotRange>> thread_ranges(parallelism);
int slots_per_thread = total_slots / parallelism;
int remain_slots = total_slots % parallelism;

int current_slot_count = 0;
int current_thread = 0;
int slots_for_current_thread = slots_per_thread + (current_thread < remain_slots ? 1 : 0);

for (const auto &range : *current_slot_ranges) {
int range_size = range.end - range.start + 1;
int range_start = range.start;
int remaining_in_range = range_size;

while (remaining_in_range > 0 && current_thread < parallelism) {
int slots_to_take = std::min(remaining_in_range, slots_for_current_thread - current_slot_count);

int take_end = range_start + slots_to_take - 1;
thread_ranges[current_thread].emplace_back(range_start, take_end);

range_start += slots_to_take;
remaining_in_range -= slots_to_take;
current_slot_count += slots_to_take;

if (current_slot_count >= slots_for_current_thread) {
current_thread++;
if (current_thread < parallelism) {
slots_for_current_thread = slots_per_thread + (current_thread < remain_slots ? 1 : 0);
current_slot_count = 0;
}
}
}
}

std::vector<std::thread> workers;
std::vector<rocksdb::Status> thread_statuses(parallelism);
std::vector<std::vector<std::string>> thread_keys(parallelism);
std::vector<KeyNumStats> thread_stats(parallelism);

for (int i = 0; i < parallelism; ++i) {
workers.emplace_back([&, i]() {
for (const auto &range : thread_ranges[i]) {
auto status = ProcessSlotRange(ctx, prefix, suffix_glob, range.start, range.end,
keys ? &thread_keys[i] : nullptr, stats ? &thread_stats[i] : nullptr);
if (!status.ok()) {
thread_statuses[i] = status;
return;
}
}
thread_statuses[i] = rocksdb::Status::OK();
});
}

for (auto &worker : workers) {
worker.join();
}

for (const auto &status : thread_statuses) {
if (!status.ok()) {
return status;
}
}

if (keys) {
for (auto &thread_key : thread_keys) {
keys->insert(keys->end(), std::make_move_iterator(thread_key.begin()), std::make_move_iterator(thread_key.end()));
}
}

if (stats) {
for (auto &thread_stat : thread_stats) {
stats->n_key += thread_stat.n_key;
stats->n_expired += thread_stat.n_expired;
stats->n_expires += thread_stat.n_expires;
stats->ttl_sum += thread_stat.ttl_sum;
}

if (stats->n_expires > 0) {
stats->avg_ttl = stats->ttl_sum / stats->n_expires / 1000;
}
}

return rocksdb::Status::OK();
}

rocksdb::Status Database::ProcessSlotRange(engine::Context &ctx, const std::string &prefix,
const std::string &suffix_glob, int start_slot, int end_slot,
std::vector<std::string> *keys, KeyNumStats *stats) {
info("[ProcessSlotRangeOptimize] Begin batch {}-{}", start_slot, end_slot);

uint64_t ttl_sum = 0;
KeyNumStats local_stats{0, 0, 0, 0, 0};

auto iter = util::UniqueIterator(ctx, ctx.GetReadOptions(), metadata_cf_handle_);

for (uint16_t slot_id = start_slot; slot_id <= end_slot; ++slot_id) { // use iter with slot_id
std::string ns_prefix = ComposeNamespaceKey(namespace_, "", false);
PutFixed16(&ns_prefix, slot_id);
ns_prefix.append(prefix);

uint64_t current_n_key = 0;

iter->Seek(ns_prefix);
for (; iter->Valid(); iter->Next()) {
if (!iter->key().starts_with(ns_prefix)) {
break;
}

auto [_, user_key] = ExtractNamespaceKey(iter->key(), storage_->IsSlotIdEncoded());
if (!util::StringMatch(suffix_glob, user_key.ToString().substr(prefix.size()))) {
continue;
}

Metadata metadata(kRedisNone, false);
auto s = metadata.Decode(iter->value());
if (!s.ok()) continue;

if (metadata.Expired()) {
if (stats) local_stats.n_expired++;
continue;
}

if (stats) {
int64_t ttl = metadata.TTL();
local_stats.n_key++;
current_n_key++;
if (ttl != -1) {
local_stats.n_expires++;
if (ttl > 0) ttl_sum += ttl;
}
}

if (keys) {
keys->emplace_back(user_key.ToString());
}
}

info("[ProcessSlotRangeOptimize] current slotId is {}, keys size is {}", slot_id, current_n_key);

if (auto s = iter->status(); !s.ok()) {
return s;
}
}

local_stats.ttl_sum = ttl_sum;
if (stats) {
*stats = local_stats;
}

return rocksdb::Status::OK();
}

rocksdb::Status Database::Scan(engine::Context &ctx, const std::string &cursor, uint64_t limit,
const std::string &prefix, const std::string &suffix_glob,
std::vector<std::string> *keys, std::string *end_cursor, RedisType type,
Expand Down
10 changes: 9 additions & 1 deletion src/storage/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,17 @@ class Database {
[[nodiscard]] rocksdb::Status Dump(engine::Context &ctx, const Slice &user_key, std::vector<std::string> *infos);
[[nodiscard]] rocksdb::Status FlushDB(engine::Context &ctx);
[[nodiscard]] rocksdb::Status FlushAll(engine::Context &ctx);
[[nodiscard]] rocksdb::Status GetKeyNumStats(engine::Context &ctx, const std::string &prefix, KeyNumStats *stats);
[[nodiscard]] rocksdb::Status GetKeyNumStats(engine::Context &ctx, const std::string &prefix, KeyNumStats *stats,
const std::vector<SlotRange> *slot_ranges);
[[nodiscard]] rocksdb::Status Keys(engine::Context &ctx, const std::string &prefix, const std::string &suffix_glob,
std::vector<std::string> *keys = nullptr, KeyNumStats *stats = nullptr);
[[nodiscard]] rocksdb::Status KeysParallel(engine::Context &ctx, const std::string &prefix,
const std::string &suffix_glob, std::vector<std::string> *keys = nullptr,
KeyNumStats *stats = nullptr,
const std::vector<SlotRange> *slot_ranges = nullptr);
[[nodiscard]] rocksdb::Status ProcessSlotRange(engine::Context &ctx, const std::string &prefix,
const std::string &suffix_glob, int start_slot, int end_slot,
std::vector<std::string> *keys, KeyNumStats *stats);
[[nodiscard]] rocksdb::Status Scan(engine::Context &ctx, const std::string &cursor, uint64_t limit,
const std::string &prefix, const std::string &suffix_glob,
std::vector<std::string> *keys, std::string *end_cursor = nullptr,
Expand Down
1 change: 1 addition & 0 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ struct KeyNumStats {
uint64_t n_expires = 0;
uint64_t n_expired = 0;
uint64_t avg_ttl = 0;
uint64_t ttl_sum = 0;
};

[[nodiscard]] uint16_t ExtractSlotId(Slice ns_key);
Expand Down
2 changes: 2 additions & 0 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ class Storage {
std::string GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq);
std::string GetReplIdFromDbEngine();

int GetDBScanKeyParallelism() const { return config_->dbsize_scan_key_parallelism; }

private:
std::unique_ptr<rocksdb::DB> db_ = nullptr;
std::string replid_;
Expand Down
Loading