@@ -2187,6 +2187,12 @@ error_code RdbLoader::Load(io::Source* src) {
21872187 GetCurrentDbSlice ().IncrLoadInProgress ();
21882188 }
21892189
2190+ auto finalize_curr_chunk = [&] {
2191+ if (!stop_early_.load (memory_order_relaxed))
2192+ return FinishCurrentChunk ();
2193+ return kOk ;
2194+ };
2195+
21902196 while (!stop_early_.load (memory_order_relaxed)) {
21912197 if (pause_) {
21922198 ThisFiber::SleepFor (100ms);
@@ -2251,8 +2257,7 @@ error_code RdbLoader::Load(io::Source* src) {
22512257 return RdbError (errc::rdb_chunk_payload_remaining);
22522258 }
22532259
2254- if (!stream_states_.empty ())
2255- LOG (ERROR) << " eof seen while pending stream states: " << stream_states_.size ();
2260+ LOG (ERROR) << " eof seen while pending stream states: " << stream_states_.size ();
22562261 return RdbError (errc::rdb_chunk_payload_remaining);
22572262 }
22582263 /* EOF: End of file, exit the main loop. */
@@ -2397,14 +2402,7 @@ error_code RdbLoader::Load(io::Source* src) {
23972402 // path below will read its type and key.
23982403 if (stream_states_.contains (current_chunk_state_->stream_id )) {
23992404 RETURN_ON_ERR (LoadValueChunk ());
2400- if (!stop_early_.load (memory_order_relaxed) &&
2401- current_chunk_state_->remaining_payload_bytes != 0 ) {
2402- LOG (ERROR) << " chunk fully consumed but payload bytes remain "
2403- << current_chunk_state_->remaining_payload_bytes ;
2404- return RdbError (errc::rdb_chunk_payload_remaining);
2405- }
2406- // This chunk is fully consumed, clear the state
2407- current_chunk_state_.reset ();
2405+ RETURN_ON_ERR (finalize_curr_chunk ());
24082406 }
24092407 continue ;
24102408 }
@@ -2425,16 +2423,7 @@ error_code RdbLoader::Load(io::Source* src) {
24252423 VLOG (2 ) << " LoadKeyValPair key=" << last_key_loaded_ << " rdb_type=" << type
24262424 << " db= " << cur_db_index_;
24272425 settings.Reset ();
2428- if (!stop_early_.load (memory_order_relaxed) && current_chunk_state_ &&
2429- current_chunk_state_->remaining_payload_bytes != 0 ) {
2430- LOG (ERROR) << " chunk fully consumed but payload bytes remain "
2431- << current_chunk_state_->remaining_payload_bytes ;
2432- return RdbError (errc::rdb_chunk_payload_remaining);
2433- }
2434-
2435- // If we just read the first chunk of a key, then reset state here because LoadKeyValPair will
2436- // only return when the chunk finishes
2437- current_chunk_state_.reset ();
2426+ RETURN_ON_ERR (finalize_curr_chunk ());
24382427 } // main load loop
24392428
24402429 DVLOG (1 ) << " RdbLoad loop finished" ;
@@ -2633,12 +2622,7 @@ error_code RdbLoaderBase::HandleCompressedBlob(int op_type) {
26332622 // Stop counting payload bytes on decompressed data. At this point the entire payload size must be
26342623 // consumed as it was the compressed blob. We switch to another buffer and must be able to read
26352624 // everything from it without any checks
2636- if (current_chunk_state_ && current_chunk_state_->remaining_payload_bytes > 0 ) {
2637- LOG (ERROR) << " Compressed blob not fully consumed, remaining bytes "
2638- << current_chunk_state_->remaining_payload_bytes ;
2639- return RdbError (errc::rdb_chunk_payload_remaining);
2640- }
2641- current_chunk_state_.reset ();
2625+ RETURN_ON_ERR (FinishCurrentChunk ());
26422626
26432627 // Decompress blob and switch membuf pointer
26442628 // Last type in the compressed blob is RDB_OPCODE_COMPRESSED_BLOB_END
@@ -2853,6 +2837,20 @@ std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, LoadConfig co
28532837 return visitor.ec ();
28542838}
28552839
2840+ std::error_code RdbLoaderBase::FinishCurrentChunk () {
2841+ if (!current_chunk_state_)
2842+ return kOk ;
2843+
2844+ if (!ChunkBudgetExhausted ()) {
2845+ LOG (ERROR) << " chunk fully consumed but payload bytes remain "
2846+ << current_chunk_state_->remaining_payload_bytes ;
2847+ return RdbError (errc::rdb_chunk_payload_remaining);
2848+ }
2849+
2850+ current_chunk_state_.reset ();
2851+ return kOk ;
2852+ }
2853+
28562854void RdbLoaderBase::CopyStreamId (const StreamID& src, struct streamID * dest) {
28572855 dest->ms = src.ms ;
28582856 dest->seq = src.seq ;
@@ -3011,21 +3009,23 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
30113009 // If the first tagged chunk did not finish the object, save enough state to resume it when
30123010 // the next chunk with the same stream id arrives.
30133011 if (!finalized && current_chunk_state_) {
3014- if (stream_states_.contains (current_chunk_state_->stream_id )) {
3015- LOG (ERROR) << " attempt to add first chunk for id " << current_chunk_state_->stream_id
3016- << " which already exists" ;
3017- return RdbError (errc::rdb_file_corrupted);
3018- }
3019- stream_states_[current_chunk_state_->stream_id ] = {
3012+ StreamState stream_state{
30203013 .db_index = cur_db_index_,
30213014 .type = type,
30223015 .pending_read = std::move (pending_read_),
30233016 .settings = *settings,
30243017 .key = std::move (key),
30253018 };
3019+ const bool inserted =
3020+ stream_states_.try_emplace (current_chunk_state_->stream_id , std::move (stream_state)).second ;
3021+ if (!inserted) {
3022+ LOG (ERROR) << " attempt to add first chunk for id " << current_chunk_state_->stream_id
3023+ << " which already exists" ;
3024+ return RdbError (errc::rdb_file_corrupted);
3025+ }
30263026 }
30273027
3028- int delta_ms = (absl::GetCurrentTimeNanos () - start) / 1000'000 ;
3028+ const int delta_ms = (absl::GetCurrentTimeNanos () - start) / 1000'000 ;
30293029 LOG_IF (INFO, delta_ms > 1000 ) << " Took " << delta_ms << " ms to load rdb_type " << type;
30303030
30313031 pending_read_ = {};
0 commit comments