Skip to content

Commit 027fd14

Browse files
committed
server: Carry db index in item
Signed-off-by: Abhijat Malviya <abhijat@dragonflydb.io>
1 parent 03f82eb commit 027fd14

3 files changed

Lines changed: 41 additions & 13 deletions

File tree

src/server/rdb_load.cc

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2803,14 +2803,14 @@ void RdbLoader::FlushShardAsync(ShardId sid) {
28032803
if (out_buf.empty())
28042804
return;
28052805

2806-
auto cb = [indx = this->cur_db_index_, this, ib = std::move(out_buf)] {
2806+
auto cb = [this, ib = std::move(out_buf)] {
28072807
auto& db_slice = GetCurrentDbSlice();
28082808

28092809
// Before we start loading, increment LoadInProgress.
28102810
// This is required because FlushShardAsync dispatches to multiple shards, and those shards
28112811
// might have not yet have their state (load in progress) incremented.
28122812
db_slice.IncrLoadInProgress();
2813-
this->LoadItemsBuffer(indx, ib);
2813+
this->LoadItemsBuffer(ib);
28142814
db_slice.DecrLoadInProgress();
28152815
};
28162816

@@ -2960,17 +2960,19 @@ void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item,
29602960
}
29612961
}
29622962

2963-
void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
2963+
void RdbLoader::LoadItemsBuffer(const ItemsBuf& ib) {
29642964
EngineShard* es = EngineShard::tlocal();
2965-
DbContext db_cntx{&namespaces->GetDefaultNamespace(), db_ind, GetCurrentTimeMs()};
2966-
DbSlice& db_slice = db_cntx.GetDbSlice(es->shard_id());
2967-
2968-
DCHECK(!db_slice.IsCacheMode());
2965+
const uint64_t now_ms = GetCurrentTimeMs();
2966+
Namespace* ns = &namespaces->GetDefaultNamespace();
29692967

29702968
for (const auto* item : ib) {
2969+
DbContext db_cntx{ns, item->db_index, now_ms};
2970+
DbSlice& db_slice = db_cntx.GetDbSlice(es->shard_id());
2971+
DCHECK(!db_slice.IsCacheMode());
29712972
CreateObjectOnShard(db_cntx, item, &db_slice);
29722973
if (stop_early_) {
2973-
return;
2974+
// force all items in ib to move into item_queue_ so they can be cleaned up later.
2975+
break;
29742976
}
29752977
}
29762978

@@ -3121,6 +3123,7 @@ io::Result<bool> RdbLoader::ReadAndDispatchObject(int object_type, std::string&
31213123
item->has_mc_flags = obj_settings.has_mc_flags;
31223124
item->mc_flags = obj_settings.mc_flags;
31233125
item->expire_ms = obj_settings.expiretime;
3126+
item->db_index = db_index;
31243127

31253128
std::move(cleanup).Cancel();
31263129

src/server/rdb_load.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ class RdbLoader : protected RdbLoaderBase {
353353
OpaqueObj val;
354354
uint64_t expire_ms;
355355
std::atomic<Item*> next;
356+
DbIndex db_index = 0;
356357
bool is_sticky = false;
357358
bool has_mc_flags = false;
358359
uint32_t mc_flags = 0;
@@ -395,7 +396,7 @@ class RdbLoader : protected RdbLoaderBase {
395396
void FlushShardAsync(ShardId sid);
396397
void FlushAllShards();
397398

398-
void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib);
399+
void LoadItemsBuffer(const ItemsBuf& ib);
399400

400401
void CreateObjectOnShard(const DbContext& db_cntx, const Item* item, DbSlice* db_slice);
401402

src/server/rdb_test.cc

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1470,32 +1470,56 @@ TEST_F(RdbTest, LoadTwoChunks) {
14701470
}
14711471

14721472
TEST_F(RdbTest, InterleavedLoad) {
1473+
// must have >1 shards for non inlined path check. find a key that lands in shard 1 by hashing, to
1474+
// test non inlined obj. creation
1475+
ASSERT_GT(shard_set->size(), 1u);
1476+
std::string key;
1477+
for (unsigned i = 0; i < 1000; ++i) {
1478+
key = StrCat("x", i);
1479+
if (Shard(key, shard_set->size()) == 1)
1480+
break;
1481+
}
1482+
ASSERT_EQ(Shard(key, shard_set->size()), 1u);
1483+
14731484
std::string a1;
1485+
// hash chunk 1
14741486
a1.push_back(RDB_TYPE_HASH);
1475-
AppendString(&a1, "a");
1487+
AppendString(&a1, key);
14761488
AppendLen(&a1, 2);
14771489
AddKV(&a1, "f1", "v1");
14781490

1491+
// string
14791492
std::string b;
14801493
b.push_back(RDB_TYPE_STRING);
14811494
AppendString(&b, "b");
14821495
AppendString(&b, "plain");
14831496

1497+
// hash chunk 2
14841498
std::string a2;
14851499
AddKV(&a2, "f2", "v2");
14861500

14871501
std::string body;
1488-
// interleave - one tag, one plain, then one tag for the same id=1
1502+
// chunk for db 0
14891503
body += MakeTaggedChunk(1, a1);
1504+
// simple string b=plain
14901505
body += b;
1506+
body.push_back(static_cast<char>(RDB_OPCODE_SELECTDB));
1507+
// switch to db 1
1508+
AppendLen(&body, 1);
1509+
// back to chunk for db 0
14911510
body += MakeTaggedChunk(1, a2);
14921511

14931512
auto ec = pp_->at(0)->Await([&] { return LoadRdbData(service_.get(), WrapInRdb(body)); });
14941513
ASSERT_FALSE(ec) << ec.message();
14951514

1496-
EXPECT_EQ(Run({"HGET", "a", "f1"}), "v1");
1497-
EXPECT_EQ(Run({"HGET", "a", "f2"}), "v2");
1515+
EXPECT_EQ(Run({"SELECT", "0"}), "OK");
1516+
EXPECT_EQ(Run({"HGET", key, "f1"}), "v1");
1517+
EXPECT_EQ(Run({"HGET", key, "f2"}), "v2");
14981518
EXPECT_EQ(Run({"GET", "b"}), "plain");
1519+
1520+
EXPECT_EQ(Run({"SELECT", "1"}), "OK");
1521+
EXPECT_THAT(Run({"EXISTS", key}), IntArg(0));
1522+
EXPECT_EQ(Run({"SELECT", "0"}), "OK");
14991523
}
15001524

15011525
TEST_F(RdbTest, ChunksAroundJournalOffset) {

0 commit comments

Comments
 (0)