Skip to content

Commit de0cd02

Browse files
committed
fixes
Signed-off-by: Abhijat Malviya <abhijat@dragonflydb.io>
1 parent be288f3 commit de0cd02

3 files changed

Lines changed: 33 additions & 29 deletions

File tree

src/server/rdb_load.cc

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2100,11 +2100,11 @@ struct RdbLoader::ObjSettings {
21002100
// The first chunk carries the object type and key. Continuation chunks carry only payload,
21012101
// so we keep the object type, key, db, settings, and pending read.
21022102
struct RdbLoader::StreamState {
2103+
std::string key;
21032104
DbIndex db_index;
21042105
int type;
21052106
PendingRead pending_read;
21062107
ObjSettings settings;
2107-
std::string key;
21082108
};
21092109

21102110
RdbLoader::RdbLoader(Service* service, RdbLoadContext* load_context, std::string snapshot_id)
@@ -2187,12 +2187,6 @@ error_code RdbLoader::Load(io::Source* src) {
21872187
GetCurrentDbSlice().IncrLoadInProgress();
21882188
}
21892189

2190-
auto finalize_curr_chunk = [&] {
2191-
if (!stop_early_.load(memory_order_relaxed))
2192-
return FinishCurrentChunk();
2193-
return kOk;
2194-
};
2195-
21962190
while (!stop_early_.load(memory_order_relaxed)) {
21972191
if (pause_) {
21982192
ThisFiber::SleepFor(100ms);
@@ -2248,15 +2242,15 @@ error_code RdbLoader::Load(io::Source* src) {
22482242
}
22492243

22502244
if (type == RDB_OPCODE_EOF) {
2251-
if (current_chunk_state_.has_value() || !stream_states_.empty()) {
2252-
if (current_chunk_state_.has_value()) {
2253-
LOG(ERROR) << "eof seen while a previous chunk is not yet finished, stream id "
2254-
<< current_chunk_state_->stream_id << ", remaining bytes "
2255-
<< current_chunk_state_->remaining_payload_bytes
2256-
<< ", pending stream states: " << stream_states_.size();
2257-
return RdbError(errc::rdb_chunk_payload_remaining);
2258-
}
2245+
if (current_chunk_state_) {
2246+
LOG(ERROR) << "eof seen while a previous chunk is not yet finished, stream id "
2247+
<< current_chunk_state_->stream_id << ", remaining bytes "
2248+
<< current_chunk_state_->remaining_payload_bytes
2249+
<< ", pending stream states: " << stream_states_.size();
2250+
return RdbError(errc::rdb_chunk_payload_remaining);
2251+
}
22592252

2253+
if (!stream_states_.empty()) {
22602254
LOG(ERROR) << "eof seen while pending stream states: " << stream_states_.size();
22612255
return RdbError(errc::rdb_chunk_payload_remaining);
22622256
}
@@ -2402,7 +2396,7 @@ error_code RdbLoader::Load(io::Source* src) {
24022396
// path below will read its type and key.
24032397
if (stream_states_.contains(current_chunk_state_->stream_id)) {
24042398
RETURN_ON_ERR(LoadValueChunk());
2405-
RETURN_ON_ERR(finalize_curr_chunk());
2399+
RETURN_ON_ERR(FinalizeCurrentChunkIfNeeded());
24062400
}
24072401
continue;
24082402
}
@@ -2423,7 +2417,7 @@ error_code RdbLoader::Load(io::Source* src) {
24232417
VLOG(2) << "LoadKeyValPair key=" << last_key_loaded_ << " rdb_type=" << type
24242418
<< " db= " << cur_db_index_;
24252419
settings.Reset();
2426-
RETURN_ON_ERR(finalize_curr_chunk());
2420+
RETURN_ON_ERR(FinalizeCurrentChunkIfNeeded());
24272421
} // main load loop
24282422

24292423
DVLOG(1) << "RdbLoad loop finished";
@@ -3010,11 +3004,11 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
30103004
// the next chunk with the same stream id arrives.
30113005
if (!finalized && current_chunk_state_) {
30123006
StreamState stream_state{
3007+
.key = std::move(key),
30133008
.db_index = cur_db_index_,
30143009
.type = type,
30153010
.pending_read = std::move(pending_read_),
30163011
.settings = *settings,
3017-
.key = std::move(key),
30183012
};
30193013
const bool inserted =
30203014
stream_states_.try_emplace(current_chunk_state_->stream_id, std::move(stream_state)).second;
@@ -3286,6 +3280,12 @@ error_code RdbLoader::HandleShardDocIndex() {
32863280
return kOk;
32873281
}
32883282

3283+
std::error_code RdbLoader::FinalizeCurrentChunkIfNeeded() {
3284+
if (stop_early_.load(memory_order_relaxed))
3285+
return kOk;
3286+
return FinishCurrentChunk();
3287+
}
3288+
32893289
error_code RdbLoader::LoadVectorIndexNodes(uint64_t elements_number,
32903290
std::vector<search::HnswNodeData>* nodes) {
32913291
nodes->reserve(elements_number);

src/server/rdb_load.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,10 @@ class RdbLoader : protected RdbLoaderBase {
427427
std::error_code HandleVectorIndex();
428428
std::error_code HandleShardDocIndex();
429429

430+
// validates if the current chunk is fully read, resets the state. returns early if stop_early_ is
431+
// requested.
432+
std::error_code FinalizeCurrentChunkIfNeeded();
433+
430434
Service* service_;
431435
RdbLoadContext* load_context_;
432436

src/server/rdb_test.cc

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1544,8 +1544,8 @@ TEST_F(RdbTest, SplitSBF) {
15441544

15451545
// split the blob of the second filter into three chunks. this exercises the loader path where we
15461546
// first try to load the incomplete filter, and return early before that finishes
1547-
constexpr size_t first_split = 17;
1548-
constexpr size_t second_split = 13;
1547+
constexpr size_t kFirstSplit = 17;
1548+
constexpr size_t kSecondSplit = 13;
15491549

15501550
pp_->at(0)->Await([&] {
15511551
const DbContext ctx{&namespaces->GetDefaultNamespace(), 0, GetCurrentTimeMs()};
@@ -1559,7 +1559,7 @@ TEST_F(RdbTest, SplitSBF) {
15591559
const std::string blob0{sbf->data(0)};
15601560

15611561
blob1 = std::string{sbf->data(1)};
1562-
ASSERT_GT(blob1.size(), first_split + second_split);
1562+
ASSERT_GT(blob1.size(), kFirstSplit + kSecondSplit);
15631563

15641564
first.push_back(RDB_TYPE_SBF2);
15651565
// brand new key whose shape is copied off bf_src
@@ -1583,8 +1583,8 @@ TEST_F(RdbTest, SplitSBF) {
15831583
// total size of blob1
15841584
AppendLen(&first, blob1.size());
15851585
// only 17 bytes from blob1 in this chunk
1586-
AppendLen(&first, first_split);
1587-
first.append(blob1.data(), first_split);
1586+
AppendLen(&first, kFirstSplit);
1587+
first.append(blob1.data(), kFirstSplit);
15881588
});
15891589

15901590
// add this plain string between chunks of blob1 filter
@@ -1595,14 +1595,14 @@ TEST_F(RdbTest, SplitSBF) {
15951595

15961596
// p2 of blob1
15971597
std::string second;
1598-
AppendLen(&second, second_split);
1599-
second.append(blob1.data() + first_split, second_split);
1598+
AppendLen(&second, kSecondSplit);
1599+
second.append(blob1.data() + kFirstSplit, kSecondSplit);
16001600

1601-
// p2 of blob1
1601+
// p3 of blob1
16021602
std::string third;
1603-
constexpr auto sum = first_split + second_split;
1604-
AppendLen(&third, blob1.size() - sum);
1605-
third.append(blob1.data() + sum, blob1.size() - sum);
1603+
constexpr auto kPrefixConsumed = kFirstSplit + kSecondSplit;
1604+
AppendLen(&third, blob1.size() - kPrefixConsumed);
1605+
third.append(blob1.data() + kPrefixConsumed, blob1.size() - kPrefixConsumed);
16061606

16071607
std::string body;
16081608
body += MakeTaggedChunk(1, first);

0 commit comments

Comments
 (0)