Skip to content

Commit 6775b84

Browse files
author
kevin
committed
implement compact_filter for hash
Signed-off-by: kevin <kevin@kevin-desktop.lightspeed.mssnks.sbcglobal.net>
1 parent b6a959b commit 6775b84

2 files changed

Lines changed: 67 additions & 0 deletions

File tree

src/storage/compact_filter.cc

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "storage/redis_metadata.h"
3131
#include "time_util.h"
3232
#include "types/redis_bitmap.h"
33+
#include "types/redis_hash.h"
3334
#include "types/redis_timeseries.h"
3435

3536
namespace engine {
@@ -152,6 +153,54 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level, const Slice &key, const Sl
152153
return false;
153154
}
154155
return expired;
156+
} else if (metadata.Type() == kRedisHash) {
157+
auto md_expired = IsMetadataExpired(ikey, metadata);
158+
if (md_expired) {
159+
return true;
160+
}
161+
162+
auto db = stor_->GetDB();
163+
164+
// Hash field subkeys come in pairs:
165+
// 1. Field value key with version = metadata.version
166+
// 2. Field expire key with version = metadata.version + ExpireVersionOffset (8)
167+
// They are processed independently in compaction, so we must handle both in Filter to ensure consistency.
168+
// To avoid orphaning, we check expire time when processing key
169+
// but compaction filter handle key one by one, so we have to left the orphaned key exists here.
170+
171+
// when there is an hash field
172+
if (ikey.GetVersion() == metadata.version) {
173+
// Build the expire key for this field: same namespace + key but with version+8
174+
std::string ns_key = ComposeNamespaceKey(ikey.GetNamespace(), ikey.GetKey(), stor_->IsSlotIdEncoded());
175+
InternalKey expire_ikey(ns_key, ikey.GetSubKey(), metadata.version + ExpireVersionOffset,
176+
stor_->IsSlotIdEncoded());
177+
std::string expire_key = expire_ikey.Encode();
178+
std::string expire_value;
179+
auto expire_status =
180+
db->Get(rocksdb::ReadOptions(), stor_->GetCFHandle(ColumnFamilyID::PrimarySubkey), expire_key, &expire_value);
181+
182+
if (expire_status.ok() && expire_value.size() >= sizeof(uint64_t)) {
183+
uint64_t expire_at = DecodeFixed64(expire_value.data());
184+
uint64_t now = util::GetTimeStampMS();
185+
// Delete field value if its TTL has expired
186+
if (expire_at < now) {
187+
return true;
188+
}
189+
}
190+
return false;
191+
} else if (ikey.GetVersion() == metadata.version + ExpireVersionOffset) { // when there is an hash field expire key
192+
// Build the expire key for this field: same namespace + key but with version
193+
std::string ns_key = ComposeNamespaceKey(ikey.GetNamespace(), ikey.GetKey(), stor_->IsSlotIdEncoded());
194+
InternalKey origin_ikey(ns_key, ikey.GetSubKey(), metadata.version, stor_->IsSlotIdEncoded());
195+
std::string origin_key = origin_ikey.Encode();
196+
std::string origin_value;
197+
auto expire_status =
198+
db->Get(rocksdb::ReadOptions(), stor_->GetCFHandle(ColumnFamilyID::PrimarySubkey), origin_key, &origin_value);
199+
if (expire_status.IsNotFound()) {
200+
return true;
201+
}
202+
}
203+
return false;
155204
}
156205

157206
return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value));

tests/gocase/unit/type/hash/hash_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1543,6 +1543,24 @@ var testHash = func(t *testing.T, configs util.KvrocksServerConfigs) {
15431543
result = rdb.Do(ctx, "HEXPIRE", testKey, "60", "FIELDS", "3", "field1", "field2")
15441544
require.Error(t, result.Err())
15451545
})
1546+
1547+
t.Run("HASH COMPACT", func(t *testing.T) {
1548+
testKey := "hash-compact-test"
1549+
require.NoError(t, rdb.Del(ctx, testKey).Err())
1550+
require.NoError(t, rdb.HSet(ctx, testKey, "field1", "value1").Err())
1551+
1552+
// Set expiration to 300 ms
1553+
rdb.Do(ctx, "HPEXPIRE", testKey, 300, "FIELDS", "1", "field1")
1554+
1555+
// Wait for expiration
1556+
time.Sleep(350 * time.Millisecond)
1557+
1558+
// Field should be expired
1559+
val := rdb.HGet(ctx, testKey, "field1")
1560+
require.Error(t, val.Err())
1561+
1562+
require.NoError(t, rdb.Do(ctx, "COMPACT").Err())
1563+
})
15461564
}
15471565
}
15481566

0 commit comments

Comments
 (0)