Skip to content

Commit da03915

Browse files
committed
feat: CuckooPageSet
1 parent 1ade399 commit da03915

10 files changed

Lines changed: 956 additions & 160 deletions

src/commands/cmd_cuckoo_filter.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ class CommandCFReserve : public Commander {
8484

8585
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
8686
redis::CuckooChain cuckoo_db(srv->storage, conn->GetNamespace());
87-
auto s = cuckoo_db.Reserve(ctx, args_[1], capacity_, bucket_size_, max_iterations_, expansion_);
87+
auto s = cuckoo_db.Reserve(ctx, args_[1], capacity_, bucket_size_, max_iterations_, expansion_,
88+
kCuckooFilterDefaultPageSize);
8889

8990
if (!s.ok()) {
9091
if (s.IsInvalidArgument()) {

src/storage/redis_metadata.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -654,14 +654,15 @@ void CuckooChainMetadata::Encode(std::string *dst) const {
654654
PutFixed8(dst, bucket_size);
655655
PutFixed16(dst, max_iterations);
656656
PutFixed64(dst, num_deleted_items);
657+
PutFixed32(dst, page_size);
657658
}
658659

659660
rocksdb::Status CuckooChainMetadata::Decode(Slice *input) {
660661
if (auto s = Metadata::Decode(input); !s.ok()) {
661662
return s;
662663
}
663664

664-
if (input->size() < sizeof(uint16_t) * 3 + sizeof(uint64_t) * 2 + sizeof(uint8_t)) {
665+
if (input->size() < sizeof(uint16_t) * 3 + sizeof(uint64_t) * 2 + sizeof(uint32_t) + sizeof(uint8_t)) {
665666
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
666667
}
667668

@@ -671,6 +672,7 @@ rocksdb::Status CuckooChainMetadata::Decode(Slice *input) {
671672
GetFixed8(input, &bucket_size);
672673
GetFixed16(input, &max_iterations);
673674
GetFixed64(input, &num_deleted_items);
675+
GetFixed32(input, &page_size);
674676

675677
return rocksdb::Status::OK();
676678
}

src/storage/redis_metadata.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,8 @@ class BloomChainMetadata : public Metadata {
335335
bool IsScaling() const { return expansion != 0; };
336336
};
337337

338+
constexpr uint32_t kCuckooFilterDefaultPageSize = 2048;
339+
338340
class CuckooChainMetadata : public Metadata {
339341
public:
340342
/// The number of sub-filters in the chain
@@ -356,14 +358,18 @@ class CuckooChainMetadata : public Metadata {
356358
/// Track number of deleted items for maintenance
357359
uint64_t num_deleted_items;
358360

361+
/// Target maximum payload size for each persisted Cuckoo Filter page
362+
uint32_t page_size;
363+
359364
explicit CuckooChainMetadata(bool generate_version = true)
360365
: Metadata(kRedisCuckooFilter, generate_version),
361366
n_filters(0),
362367
expansion(0),
363368
base_capacity(0),
364369
bucket_size(0),
365370
max_iterations(0),
366-
num_deleted_items(0) {}
371+
num_deleted_items(0),
372+
page_size(kCuckooFilterDefaultPageSize) {}
367373

368374
void Encode(std::string *dst) const override;
369375
using Metadata::Decode;

src/types/cuckoo_filter.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ namespace redis {
3434

3535
// Cuckoo filter implementation from the paper:
3636
// "Cuckoo Filter: Practically Better Than Bloom" by Fan et al.
37-
// This is a bucket-based storage implementation where each bucket is stored
38-
// as an independent key-value pair in RocksDB
37+
// Buckets are grouped into page values in RocksDB. The Cuckoo algorithm still
38+
// works with logical bucket indexes, while the storage layer maps buckets to pages.
3939
//
4040
// Hash calculation follows RedisBloom's design:
4141
// - fp = hash % 255 + 1 (fingerprint, non-zero, range: 1-255)
@@ -85,13 +85,10 @@ class CuckooFilter {
8585
return hash ^ (static_cast<uint64_t>(fingerprint) * 0x5bd1e995);
8686
}
8787

88-
// Legacy function for backward compatibility with tests
89-
// Converts bucket index to hash, applies GetAltHash, then converts back to bucket index
88+
// Calculate an alternate bucket from a bucket index and fingerprint.
9089
static uint32_t GetAltBucketIndex(uint32_t bucket_idx, uint8_t fingerprint, uint32_t num_buckets) {
91-
// Treat bucket_idx as a hash value for the calculation
9290
uint64_t hash = bucket_idx;
9391
uint64_t alt_hash = GetAltHash(fingerprint, hash);
94-
// Convert back to bucket index
9592
return static_cast<uint32_t>(alt_hash % num_buckets);
9693
}
9794

src/types/cuckoo_filter_page.cc

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*
19+
*/
20+
21+
#include "cuckoo_filter_page.h"
22+
23+
#include <algorithm>
24+
25+
#include "cuckoo_filter.h"
26+
#include "storage/redis_db.h"
27+
28+
namespace redis {
29+
30+
namespace {
31+
32+
uint32_t GetBucketsPerPage(uint32_t page_size, uint8_t bucket_size) {
33+
return std::max<uint32_t>(1, page_size / bucket_size);
34+
}
35+
36+
uint32_t GetPageIndex(uint32_t bucket_index, uint32_t buckets_per_page) { return bucket_index / buckets_per_page; }
37+
38+
uint32_t GetBucketOffset(uint32_t bucket_index, uint32_t buckets_per_page, uint8_t bucket_size) {
39+
return (bucket_index % buckets_per_page) * bucket_size;
40+
}
41+
42+
uint32_t GetPageValueSize(uint32_t page_index, uint32_t num_buckets, uint32_t buckets_per_page, uint8_t bucket_size) {
43+
uint32_t first_bucket = page_index * buckets_per_page;
44+
uint32_t page_bucket_count = std::min(buckets_per_page, num_buckets - first_bucket);
45+
return page_bucket_count * bucket_size;
46+
}
47+
48+
std::string GetCuckooPageKey(const Slice &ns_key, const CuckooChainMetadata &metadata, bool slot_id_encoded,
49+
uint16_t filter_index, uint32_t page_index) {
50+
std::string sub_key;
51+
PutFixed16(&sub_key, filter_index);
52+
PutFixed32(&sub_key, page_index);
53+
return InternalKey(ns_key, sub_key, metadata.version, slot_id_encoded).Encode();
54+
}
55+
56+
} // namespace
57+
58+
CuckooPageSet::CuckooPageSet(engine::Storage *storage, engine::Context &ctx, const Slice &ns_key,
59+
const CuckooChainMetadata &metadata, bool slot_id_encoded)
60+
: storage_(storage),
61+
ctx_(ctx),
62+
ns_key_(ns_key.ToString()),
63+
metadata_(metadata),
64+
slot_id_encoded_(slot_id_encoded) {}
65+
66+
rocksdb::Status CuckooPageSet::TryInsertInCandidateBuckets(uint16_t filter_index, uint32_t num_buckets,
67+
uint32_t bucket1_index, uint32_t bucket2_index,
68+
uint8_t fingerprint, bool *inserted) {
69+
*inserted = false;
70+
BucketRef bucket1, bucket2;
71+
auto s = EnsureCandidateBucketsLoaded(filter_index, num_buckets, bucket1_index, bucket2_index, &bucket1, &bucket2);
72+
if (!s.ok()) return s;
73+
74+
size_t slot_idx = 0;
75+
if (TryInsertInBucketRef(bucket1, fingerprint, &slot_idx)) {
76+
*inserted = true;
77+
return rocksdb::Status::OK();
78+
}
79+
if (bucket1_index != bucket2_index && TryInsertInBucketRef(bucket2, fingerprint, &slot_idx)) {
80+
*inserted = true;
81+
}
82+
return rocksdb::Status::OK();
83+
}
84+
85+
rocksdb::Status CuckooPageSet::TryInsertInBucket(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index,
86+
uint8_t fingerprint, bool *inserted) {
87+
*inserted = false;
88+
BucketRef bucket;
89+
auto s = EnsureBucketLoaded(filter_index, num_buckets, bucket_index, &bucket);
90+
if (!s.ok()) return s;
91+
92+
size_t slot_idx = 0;
93+
*inserted = TryInsertInBucketRef(bucket, fingerprint, &slot_idx);
94+
return rocksdb::Status::OK();
95+
}
96+
97+
rocksdb::Status CuckooPageSet::GetBucketSlot(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index,
98+
uint32_t slot_idx, uint8_t *fingerprint) {
99+
if (slot_idx >= metadata_.bucket_size) return rocksdb::Status::InvalidArgument("invalid cuckoo filter bucket slot");
100+
101+
BucketRef bucket;
102+
auto s = EnsureBucketLoaded(filter_index, num_buckets, bucket_index, &bucket);
103+
if (!s.ok()) return s;
104+
*fingerprint = GetBucketRefSlot(bucket, slot_idx);
105+
return rocksdb::Status::OK();
106+
}
107+
108+
rocksdb::Status CuckooPageSet::SetBucketSlot(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index,
109+
uint32_t slot_idx, uint8_t fingerprint) {
110+
if (slot_idx >= metadata_.bucket_size) return rocksdb::Status::InvalidArgument("invalid cuckoo filter bucket slot");
111+
112+
BucketRef bucket;
113+
auto s = EnsureBucketLoaded(filter_index, num_buckets, bucket_index, &bucket);
114+
if (!s.ok()) return s;
115+
SetBucketRefSlot(bucket, slot_idx, fingerprint);
116+
return rocksdb::Status::OK();
117+
}
118+
119+
rocksdb::Status CuckooPageSet::WriteBackDirtyPages(rocksdb::WriteBatchBase *batch) {
120+
for (const auto &entry : pages_) {
121+
if (!entry.second.is_dirty) continue;
122+
auto s = batch->Put(entry.first, entry.second.data);
123+
if (!s.ok()) return s;
124+
}
125+
return rocksdb::Status::OK();
126+
}
127+
128+
rocksdb::Status CuckooPageSet::ResolveBucketLocation(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index,
129+
BucketLocation *location) const {
130+
if (metadata_.bucket_size == 0 || num_buckets == 0 || bucket_index >= num_buckets) {
131+
return rocksdb::Status::Corruption("invalid cuckoo filter bucket location");
132+
}
133+
134+
uint32_t buckets_per_page = GetBucketsPerPage(metadata_.page_size, metadata_.bucket_size);
135+
uint32_t page_index = GetPageIndex(bucket_index, buckets_per_page);
136+
location->page_key = GetCuckooPageKey(ns_key_, metadata_, slot_id_encoded_, filter_index, page_index);
137+
location->offset = GetBucketOffset(bucket_index, buckets_per_page, metadata_.bucket_size);
138+
location->expected_page_size = GetPageValueSize(page_index, num_buckets, buckets_per_page, metadata_.bucket_size);
139+
return rocksdb::Status::OK();
140+
}
141+
142+
rocksdb::Status CuckooPageSet::EnsureBucketLoaded(uint16_t filter_index, uint32_t num_buckets, uint32_t bucket_index,
143+
BucketRef *bucket) {
144+
BucketLocation location;
145+
auto s = ResolveBucketLocation(filter_index, num_buckets, bucket_index, &location);
146+
if (!s.ok()) return s;
147+
148+
PageEntry *page = nullptr;
149+
s = LoadPage(location, &page);
150+
if (!s.ok()) return s;
151+
152+
bucket->page = page;
153+
bucket->offset = location.offset;
154+
bucket->size = metadata_.bucket_size;
155+
return rocksdb::Status::OK();
156+
}
157+
158+
rocksdb::Status CuckooPageSet::EnsureCandidateBucketsLoaded(uint16_t filter_index, uint32_t num_buckets,
159+
uint32_t bucket1_index, uint32_t bucket2_index,
160+
BucketRef *bucket1, BucketRef *bucket2) {
161+
BucketLocation location1, location2;
162+
auto s = ResolveBucketLocation(filter_index, num_buckets, bucket1_index, &location1);
163+
if (!s.ok()) return s;
164+
s = ResolveBucketLocation(filter_index, num_buckets, bucket2_index, &location2);
165+
if (!s.ok()) return s;
166+
167+
std::vector<BucketLocation> missing_locations;
168+
if (pages_.find(location1.page_key) == pages_.end()) missing_locations.push_back(location1);
169+
if (location2.page_key != location1.page_key && pages_.find(location2.page_key) == pages_.end()) {
170+
missing_locations.push_back(location2);
171+
}
172+
s = LoadPages(missing_locations);
173+
if (!s.ok()) return s;
174+
175+
bucket1->page = &pages_.at(location1.page_key);
176+
bucket1->offset = location1.offset;
177+
bucket1->size = metadata_.bucket_size;
178+
bucket2->page = &pages_.at(location2.page_key);
179+
bucket2->offset = location2.offset;
180+
bucket2->size = metadata_.bucket_size;
181+
return rocksdb::Status::OK();
182+
}
183+
184+
rocksdb::Status CuckooPageSet::LoadPage(const BucketLocation &location, PageEntry **page) {
185+
auto iter = pages_.find(location.page_key);
186+
if (iter != pages_.end()) {
187+
*page = &iter->second;
188+
return rocksdb::Status::OK();
189+
}
190+
191+
PageEntry page_entry;
192+
auto s = storage_->Get(ctx_, ctx_.GetReadOptions(), location.page_key, &page_entry.data);
193+
if (!s.ok() && !s.IsNotFound()) return s;
194+
s = NormalizePage(s, location.expected_page_size, &page_entry);
195+
if (!s.ok()) return s;
196+
197+
auto result = pages_.emplace(location.page_key, std::move(page_entry));
198+
*page = &result.first->second;
199+
return rocksdb::Status::OK();
200+
}
201+
202+
rocksdb::Status CuckooPageSet::LoadPages(const std::vector<BucketLocation> &locations) {
203+
if (locations.empty()) return rocksdb::Status::OK();
204+
if (locations.size() == 1) {
205+
PageEntry *page = nullptr;
206+
return LoadPage(locations[0], &page);
207+
}
208+
209+
std::vector<rocksdb::Slice> keys;
210+
keys.reserve(locations.size());
211+
for (const auto &location : locations) keys.emplace_back(location.page_key);
212+
213+
std::vector<rocksdb::PinnableSlice> values(locations.size());
214+
std::vector<rocksdb::Status> statuses(locations.size());
215+
storage_->MultiGet(ctx_, ctx_.DefaultMultiGetOptions(), storage_->GetDB()->DefaultColumnFamily(), keys.size(),
216+
keys.data(), values.data(), statuses.data());
217+
218+
for (size_t i = 0; i < locations.size(); ++i) {
219+
PageEntry page_entry;
220+
if (statuses[i].ok()) page_entry.data.assign(values[i].data(), values[i].size());
221+
auto s = NormalizePage(statuses[i], locations[i].expected_page_size, &page_entry);
222+
if (!s.ok()) return s;
223+
pages_.emplace(locations[i].page_key, std::move(page_entry));
224+
}
225+
return rocksdb::Status::OK();
226+
}
227+
228+
rocksdb::Status CuckooPageSet::NormalizePage(const rocksdb::Status &status, uint32_t expected_size,
229+
PageEntry *page) const {
230+
if (!status.ok() && !status.IsNotFound()) return status;
231+
if (status.IsNotFound()) page->data.clear();
232+
if (page->data.size() > expected_size) return rocksdb::Status::Corruption("invalid cuckoo filter page size");
233+
if (page->data.size() < expected_size) page->data.resize(expected_size, 0);
234+
return rocksdb::Status::OK();
235+
}
236+
237+
bool CuckooPageSet::TryInsertInBucketRef(const BucketRef &bucket, uint8_t fingerprint, size_t *slot_idx) {
238+
for (size_t i = 0; i < bucket.size; ++i) {
239+
size_t offset = bucket.offset + i;
240+
if (static_cast<uint8_t>(bucket.page->data[offset]) == 0) {
241+
bucket.page->data[offset] = static_cast<char>(fingerprint);
242+
bucket.page->is_dirty = true;
243+
*slot_idx = i;
244+
return true;
245+
}
246+
}
247+
return false;
248+
}
249+
250+
uint8_t CuckooPageSet::GetBucketRefSlot(const BucketRef &bucket, uint32_t slot_idx) const {
251+
return static_cast<uint8_t>(bucket.page->data[bucket.offset + slot_idx]);
252+
}
253+
254+
void CuckooPageSet::SetBucketRefSlot(const BucketRef &bucket, uint32_t slot_idx, uint8_t fingerprint) {
255+
bucket.page->data[bucket.offset + slot_idx] = static_cast<char>(fingerprint);
256+
bucket.page->is_dirty = true;
257+
}
258+
259+
} // namespace redis

0 commit comments

Comments
 (0)