feat(cf): add cf.add command#3481
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces initial CuckooFilter support to KVrocks (task #3351) by adding the CF.RESERVE and CF.ADD commands, along with a bucket-per-key RocksDB storage implementation and accompanying unit tests.
Changes:
- Add a new Redis metadata type (
kRedisCuckooFilter) andCuckooChainMetadataencoding/decoding. - Implement a bucket-based cuckoo filter chain (
redis::CuckooChain) withReserveandAddoperations. - Register new commands (
cf.reserve,cf.add) and add a comprehensive C++ unit test suite.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/cppunit/types/cuckoo_filter_test.cc | Adds unit tests for CF reserve/add behavior and cuckoo filter helper functions. |
| src/types/redis_cuckoo_chain.h | Declares the cuckoo chain DB wrapper and CF defaults. |
| src/types/redis_cuckoo_chain.cc | Implements CF.RESERVE/CF.ADD logic using per-bucket RocksDB keys plus kick-out/expand paths. |
| src/types/cuckoo_filter.h | Adds cuckoo filter hash/fingerprint/alt-hash helpers and bucket-count calculation. |
| src/storage/redis_metadata.h | Introduces the new Redis type and CuckooChainMetadata structure. |
| src/storage/redis_metadata.cc | Implements encode/decode and capacity calculation for CuckooChainMetadata. |
| src/commands/commander.h | Adds a new command category for CuckooFilter commands. |
| src/commands/cmd_cuckoo_filter.cc | Implements and registers cf.reserve and cf.add command handlers. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return s; | ||
| } | ||
|
|
||
| if (input->size() < 21) { |
| // Initialize metadata for the new cuckoo filter | ||
| metadata.size = 0; | ||
| metadata.base_capacity = capacity; | ||
| metadata.bucket_size = bucket_size; | ||
| metadata.max_iterations = max_iterations; | ||
| metadata.expansion = expansion; | ||
| metadata.n_filters = 1; | ||
| metadata.num_deleted_items = 0; |
| // Check if the key already exists | ||
| // Read without snapshot to ensure we see any committed data | ||
| std::string raw_value; | ||
| rocksdb::ReadOptions read_options; // No snapshot | ||
| auto s = storage_->Get(ctx, read_options, metadata_cf_handle_, ns_key, &raw_value); | ||
| if (s.ok()) { | ||
| return rocksdb::Status::InvalidArgument("the key already exists"); | ||
| } | ||
| if (!s.IsNotFound()) { | ||
| return s; // Return other errors |
| // Get metadata - use read options without snapshot to see latest data | ||
| CuckooChainMetadata metadata(false); | ||
| std::string raw_value; | ||
| rocksdb::ReadOptions read_options; // No snapshot | ||
| auto s = storage_->Get(ctx, read_options, metadata_cf_handle_, ns_key, &raw_value); | ||
| if (!s.ok()) { | ||
| if (s.IsNotFound()) { | ||
| return rocksdb::Status::NotFound("key not found"); | ||
| } | ||
| return s; | ||
| } | ||
|
|
||
| // Decode metadata | ||
| Slice slice(raw_value); | ||
| s = metadata.Decode(&slice); | ||
| if (!s.ok()) { | ||
| return s; | ||
| } | ||
|
|
| // Helper function: read bucket from storage and ensure correct size | ||
| static rocksdb::Status ReadBucket(engine::Storage *storage, engine::Context &ctx, const std::string &bucket_key, | ||
| uint8_t bucket_size, std::string *bucket_data) { | ||
| rocksdb::ReadOptions read_opts = ctx.DefaultScanOptions(); |
| // No space found in any filter, try kick-out on the last filter | ||
| uint16_t last_filter_idx = metadata.n_filters - 1; | ||
| uint64_t filter_capacity = metadata.base_capacity * IntPow(metadata.expansion, last_filter_idx); | ||
| uint32_t num_buckets = CuckooFilter::OptimalNumBuckets(filter_capacity, metadata.bucket_size); | ||
|
|
||
| bool inserted = false; | ||
| s = kickOutInsert(ctx, user_key, ns_key, metadata, last_filter_idx, num_buckets, fingerprint, hash, &inserted); | ||
| if (s.ok() && inserted) { | ||
| // Update metadata after successful kick-out | ||
| auto batch = storage_->GetWriteBatchBase(); | ||
| WriteBatchLogData log_data(kRedisCuckooFilter, std::vector<std::string>{"CF.ADD", user_key.ToString()}); | ||
| batch->PutLogData(log_data.Encode()); | ||
|
|
||
| metadata.size++; | ||
| std::string metadata_bytes; | ||
| metadata.Encode(&metadata_bytes); | ||
| batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); | ||
|
|
||
| s = storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); | ||
| if (!s.ok()) return s; | ||
|
|
||
| *added = true; |
| // CF.ADD command - adds an item to the cuckoo filter | ||
| // Returns true if item was added, false if item already exists (probably) |
| static uint32_t OptimalNumBuckets(uint64_t capacity, uint8_t bucket_size) { | ||
| // A load factor of 95.5% is chosen for the cuckoo filter | ||
| auto num_buckets = static_cast<uint32_t>(static_cast<long double>(capacity) / bucket_size / 0.955L); | ||
| // Round up to next power of 2 for better hash distribution | ||
| if (num_buckets == 0) num_buckets = 1; | ||
| uint32_t power = 1; | ||
| while (power < num_buckets) power <<= 1; | ||
| return power; | ||
| } |
| batch->PutLogData(log_data.Encode()); | ||
|
|
||
| // Store the metadata | ||
| std::string metadata_bytes; | ||
| metadata.Encode(&metadata_bytes); | ||
| batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); |
| Status Parse(const std::vector<std::string> &args) override { | ||
| // CF.RESERVE key capacity [BUCKETSIZE bs] [MAXITERATIONS mi] [EXPANSION ex] | ||
| if (args.size() < 3) { | ||
| return {Status::RedisParseErr, "wrong number of arguments"}; |
|
Hi @nagisa-kunhah. Thank you for your contribution. Additionally, please follow our AI policy: https://kvrocks.apache.org/community/contributing#guidelines-for-ai-assisted-contributions. Please let us know which AI tools and models you used, as this will help us better review the code. |
task id: #3351