Skip to content

Commit 897842a

Browse files
Storages: Fix VersionChain::getBytes (#10322)
ref #10323 Co-authored-by: JaySon <tshent@qq.com>
1 parent 6d670a2 commit 897842a

4 files changed

Lines changed: 35 additions & 3 deletions

File tree

dbms/src/Storages/DeltaMerge/Remote/RNMVCCIndexCache.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <Storages/DeltaMerge/VersionChain/VersionChain_fwd.h>
2020
#include <Storages/KVStore/Types.h>
2121
#include <common/types.h>
22+
#include <fmt/format.h>
2223

2324
#include <boost/noncopyable.hpp>
2425

@@ -57,6 +58,20 @@ class RNMVCCIndexCache : private boost::noncopyable
5758
&& segment_id == other.segment_id && segment_epoch == other.segment_epoch
5859
&& delta_index_epoch == other.delta_index_epoch && is_version_chain == other.is_version_chain;
5960
}
61+
62+
String toString() const
63+
{
64+
return fmt::format(
65+
"<store_id={}, table_id={}, segment_id={}, segment_epoch={}, "
66+
"delta_index_epoch={}, keyspace={}, is_version_chain={}>",
67+
store_id,
68+
table_id,
69+
segment_id,
70+
segment_epoch,
71+
delta_index_epoch,
72+
keyspace_id,
73+
is_version_chain);
74+
}
6075
};
6176

6277
// Returns a cached or newly created delta index, which is assigned to the specified segment(at)epoch.

dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -871,8 +871,24 @@ void SegmentReadTask::updateMVCCIndexSize(ReadMode read_mode, size_t initial_ind
871871

872872
auto & cache = dm_context->global_context.getSharedContextDisagg()->rn_mvcc_index_cache;
873873
assert(cache != nullptr);
874-
if (cache_key->is_version_chain && getVersionChainBytes(*(segment->getVersionChain())) != initial_index_bytes)
875-
cache->setVersionChain(*cache_key, segment->getVersionChain());
874+
static constexpr size_t LOGGING_INDEX_BYTES_THRESHOLD = 1024 * 1024;
875+
if (cache_key->is_version_chain)
876+
{
877+
auto current_index_bytes = getVersionChainBytes(*(segment->getVersionChain()));
878+
if (current_index_bytes >= LOGGING_INDEX_BYTES_THRESHOLD)
879+
{
880+
LOG_INFO(
881+
read_snapshot->log,
882+
"Version chain index size is {}, cache key is {}, initial_index_bytes is {}",
883+
current_index_bytes,
884+
cache_key->toString(),
885+
initial_index_bytes);
886+
}
887+
if (current_index_bytes != initial_index_bytes)
888+
{
889+
cache->setVersionChain(*cache_key, segment->getVersionChain());
890+
}
891+
}
876892
else if (
877893
!cache_key->is_version_chain && read_snapshot->delta->getSharedDeltaIndex()->getBytes() != initial_index_bytes)
878894
cache->setDeltaIndex(*cache_key, read_snapshot->delta->getSharedDeltaIndex());

dbms/src/Storages/DeltaMerge/VersionChain/VersionChain.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,7 @@ DeltaValueReader VersionChain<HandleType>::createDeltaValueReader(
393393
template <ExtraHandleType HandleType>
394394
size_t VersionChain<HandleType>::getBytes() const
395395
{
396+
std::lock_guard lock(mtx);
396397
// Ignore the size of `dmfile_or_delete_range_list` because it is small.
397398
return base_versions->capacity() * sizeof(RowID) + new_handle_to_row_ids.getBytes();
398399
}

dbms/src/Storages/DeltaMerge/VersionChain/VersionChain.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class VersionChain
142142
friend class tests::NewHandleIndexTest;
143143
friend class tests::VersionChainTest;
144144

145-
std::mutex mtx;
145+
mutable std::mutex mtx;
146146
UInt32 replayed_rows_and_deletes = 0; // delta.getRows() + delta.getDeletes()
147147
// After replaySnapshot, base_versions->size() == delta.getRows().
148148
// The records in delta correspond one-to-one with base_versions.

0 commit comments

Comments
 (0)