Skip to content

Commit b73e7a2

Browse files
Reduce redundant pack reads during building bitmap filter for delta merge case (release-8.5) (#10293)
close #9875 Reduce redundant pack reads during bitmap filter building for delta merge case Signed-off-by: gengliqi <gengliqiii@gmail.com> Signed-off-by: JaySon-Huang <tshent@qq.com> Co-authored-by: gengliqi <gengliqiii@gmail.com>
1 parent dbd76e7 commit b73e7a2

29 files changed

Lines changed: 1100 additions & 265 deletions

dbms/src/IO/Compression/CompressedReadBufferBase.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ static void readHeaderAndGetCodec(const char * compressed_buffer, CompressionCod
4646
throw Exception(
4747
ErrorCodes::CANNOT_DECOMPRESS,
4848
"Data compressed with different methods, given method "
49-
"byte {#x}, previous method byte {#x}",
49+
"byte {:#x}, previous method byte {:#x}",
5050
method_byte,
5151
codec->getMethodByte());
5252
}

dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ void ColumnFileBig::calculateStat(const DMContext & dm_context)
5353
/*tracing_id*/ dm_context.tracing_id,
5454
ReadTag::Internal);
5555

56-
std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
56+
std::tie(valid_rows, valid_bytes) = pack_filter->validRowsAndBytes(file);
5757
}
5858

5959
void ColumnFileBig::removeData(WriteBatches & wbs) const

dbms/src/Storages/DeltaMerge/DeltaMerge.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ class DeltaMergeBlockInputStream final
150150
}
151151
else
152152
{
153-
use_stable_rows = delta_index_it.getSid();
153+
use_stable_rows = delta_index_it->getSid();
154154
}
155155
auto all_range = RowKeyRange::newAll(is_common_handle, rowkey_column_size);
156156
last_value = all_range.getStart().toRowKeyValue();
@@ -366,28 +366,28 @@ class DeltaMergeBlockInputStream final
366366
}
367367
else
368368
{
369-
if (delta_index_it.isDelete())
369+
if (delta_index_it->isDelete())
370370
{
371371
// Delete.
372-
writeDeleteFromDelta(delta_index_it.getCount());
372+
writeDeleteFromDelta(delta_index_it->getCount());
373373
}
374374
else
375375
{
376376
// Insert.
377377
bool do_write = true;
378378
if constexpr (skippable_place)
379379
{
380-
if (delta_index_it.getSid() < sk_skip_stable_rows)
380+
if (delta_index_it->getSid() < sk_skip_stable_rows)
381381
{
382382
do_write = false;
383-
sk_skip_total_rows += delta_index_it.getCount();
383+
sk_skip_total_rows += delta_index_it->getCount();
384384
}
385385
}
386386

387387
if (do_write)
388388
{
389-
use_delta_offset = delta_index_it.getValue();
390-
use_delta_rows = delta_index_it.getCount();
389+
use_delta_offset = delta_index_it->getValue();
390+
use_delta_rows = delta_index_it->getCount();
391391
writeInsertFromDelta(output_columns, output_write_limit);
392392
}
393393
}
@@ -627,9 +627,9 @@ class DeltaMergeBlockInputStream final
627627
{
628628
UInt64 prev_sid;
629629
{
630-
prev_sid = delta_index_it.getSid();
631-
if (delta_index_it.isDelete())
632-
prev_sid += delta_index_it.getCount();
630+
prev_sid = delta_index_it->getSid();
631+
if (delta_index_it->isDelete())
632+
prev_sid += delta_index_it->getCount();
633633
}
634634

635635
++delta_index_it;
@@ -641,7 +641,7 @@ class DeltaMergeBlockInputStream final
641641
}
642642
else
643643
{
644-
use_stable_rows = delta_index_it.getSid() - prev_sid;
644+
use_stable_rows = delta_index_it->getSid() - prev_sid;
645645
}
646646
}
647647
};

dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1677,11 +1677,13 @@ bool DeltaMergeStore::checkSegmentUpdate(
16771677
if (shutdown_called.load(std::memory_order_relaxed))
16781678
return;
16791679

1680-
auto [added, heavy] = background_tasks.tryAddTask(
1681-
task,
1682-
thread_type,
1683-
std::max(id_to_segment.size() * 2, background_pool.getNumberOfThreads() * 3),
1684-
log);
1680+
size_t max_task_num = 0;
1681+
{
1682+
std::shared_lock lock(read_write_mutex); // protect `id_to_segment`
1683+
max_task_num = std::max(id_to_segment.size() * 2, background_pool.getNumberOfThreads() * 3);
1684+
}
1685+
1686+
auto [added, heavy] = background_tasks.tryAddTask(task, thread_type, max_task_num, log);
16851687
// Prevent too many tasks.
16861688
if (!added)
16871689
return;

dbms/src/Storages/DeltaMerge/DeltaTree.h

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -732,34 +732,30 @@ class DTCompactedEntries
732732
public:
733733
struct Entry
734734
{
735+
friend class DTCompactedEntries<M, F, S>;
736+
737+
public:
738+
Entry(UInt64 sid_, bool is_insert_, UInt32 count_, UInt64 value_)
739+
: sid(sid_)
740+
, is_insert(is_insert_)
741+
, count(count_)
742+
, value(value_)
743+
{}
744+
745+
UInt64 getSid() const { return sid; }
746+
bool isInsert() const { return is_insert; }
747+
bool isDelete() const { return !is_insert; }
748+
UInt32 getCount() const { return count; }
749+
UInt64 getValue() const { return value; }
750+
751+
private:
735752
UInt64 sid;
736753
bool is_insert;
737754
UInt32 count;
738755
UInt64 value;
739756
};
740757
using Entries = std::vector<Entry>;
741-
742-
struct Iterator
743-
{
744-
typename Entries::iterator it;
745-
746-
explicit Iterator(typename Entries::iterator it_)
747-
: it(it_)
748-
{}
749-
bool operator==(const Iterator & rhs) const { return it == rhs.it; }
750-
bool operator!=(const Iterator & rhs) const { return it != rhs.it; }
751-
Iterator & operator++()
752-
{
753-
++it;
754-
return *this;
755-
}
756-
757-
UInt64 getSid() const { return it->sid; }
758-
bool isInsert() const { return it->is_insert; }
759-
bool isDelete() const { return !it->is_insert; }
760-
UInt32 getCount() const { return it->count; }
761-
UInt64 getValue() const { return it->value; }
762-
};
758+
using Iterator = typename Entries::iterator;
763759

764760
private:
765761
Entries entries;
@@ -783,14 +779,12 @@ class DTCompactedEntries
783779
continue;
784780
}
785781
}
786-
Entry entry
787-
= {.sid = it.getSid(), .is_insert = it.isInsert(), .count = it.getCount(), .value = it.getValue()};
788-
entries.emplace_back(entry);
782+
entries.emplace_back(it.getSid(), it.isInsert(), it.getCount(), it.getValue());
789783
}
790784
}
791785

792-
auto begin() { return Iterator(entries.begin()); }
793-
auto end() { return Iterator(entries.end()); }
786+
auto begin() { return entries.begin(); }
787+
auto end() { return entries.end(); }
794788
};
795789

796790
template <class ValueSpace, size_t M, size_t F, size_t S, typename Allocator>

dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ std::unique_ptr<CompressedSeekableReaderBuffer> ColumnReadStream::buildColDataRe
163163

164164
// Try to get the largest buffer size of reading continuous packs
165165
size_t buffer_size = 0;
166-
const auto & pack_res = reader.pack_filter.getPackResConst();
166+
const auto & pack_res = reader.pack_filter->getPackResConst();
167167
for (size_t i = 0; i < n_packs; /*empty*/)
168168
{
169169
if (!pack_res[i].isUse())

dbms/src/Storages/DeltaMerge/File/DMFile.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ class DMFile : private boost::noncopyable
340340
friend class MinMaxIndexLoader;
341341
friend class ColumnReadStream;
342342
friend class DMFilePackFilter;
343+
friend class DMFilePackFilterResult;
343344
friend class DMFileBlockInputStreamBuilder;
344345
friend class DMFileWithVectorIndexBlockInputStream;
345346
friend class tests::DMFileTest;

dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,21 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(
5858

5959
bool is_common_handle = !rowkey_ranges.empty() && rowkey_ranges[0].is_common_handle;
6060

61-
DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom(
62-
dmfile,
63-
index_cache,
64-
/*set_cache_if_miss*/ true,
65-
rowkey_ranges,
66-
rs_filter,
67-
read_packs,
68-
file_provider,
69-
read_limiter,
70-
scan_context,
71-
tracing_id,
72-
read_tag);
61+
if (!pack_filter)
62+
{
63+
pack_filter = DMFilePackFilter::loadFrom(
64+
dmfile,
65+
index_cache,
66+
/*set_cache_if_miss*/ true,
67+
rowkey_ranges,
68+
rs_filter,
69+
read_packs,
70+
file_provider,
71+
read_limiter,
72+
scan_context,
73+
tracing_id,
74+
read_tag);
75+
}
7376

7477
bool enable_read_thread = SegmentReaderPoolManager::instance().isSegmentReader();
7578

@@ -181,18 +184,21 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
181184

182185
// All check passed. Let's read via vector index.
183186

184-
DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom(
185-
dmfile,
186-
index_cache,
187-
/*set_cache_if_miss*/ true,
188-
rowkey_ranges,
189-
rs_filter,
190-
read_packs,
191-
file_provider,
192-
read_limiter,
193-
scan_context,
194-
tracing_id,
195-
ReadTag::Query);
187+
if (!pack_filter)
188+
{
189+
pack_filter = DMFilePackFilter::loadFrom(
190+
dmfile,
191+
index_cache,
192+
/*set_cache_if_miss*/ true,
193+
rowkey_ranges,
194+
rs_filter,
195+
read_packs,
196+
file_provider,
197+
read_limiter,
198+
scan_context,
199+
tracing_id,
200+
ReadTag::Query);
201+
}
196202

197203
bool enable_read_thread = SegmentReaderPoolManager::instance().isSegmentReader();
198204
bool is_common_handle = !rowkey_ranges.empty() && rowkey_ranges[0].is_common_handle;
@@ -205,7 +211,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
205211
enable_del_clean_read,
206212
is_fast_scan,
207213
max_data_version,
208-
std::move(pack_filter),
214+
pack_filter,
209215
mark_cache,
210216
enable_column_cache,
211217
column_cache,

dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
2121
#include <Storages/DeltaMerge/File/ColumnCache.h>
2222
#include <Storages/DeltaMerge/File/ColumnCacheLongTerm_fwd.h>
23+
#include <Storages/DeltaMerge/File/DMFilePackFilterResult.h>
2324
#include <Storages/DeltaMerge/File/DMFileReader.h>
2425
#include <Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream_fwd.h>
2526
#include <Storages/DeltaMerge/Index/VectorIndex_fwd.h>
@@ -180,6 +181,15 @@ class DMFileBlockInputStreamBuilder
180181
return *this;
181182
}
182183

184+
DMFileBlockInputStreamBuilder & setDMFilePackFilterResult(const DMFilePackFilterResultPtr & pack_filter_)
185+
{
186+
pack_filter = pack_filter_;
187+
RUNTIME_CHECK_MSG(
188+
pack_filter == nullptr || read_packs == nullptr,
189+
"read_packs is not nullptr when setting pack_filter");
190+
return *this;
191+
}
192+
183193
/**
184194
* @note To really enable the long term cache, you also need to ensure
185195
* ColumnCacheLongTerm is initialized in the global context.
@@ -234,6 +244,8 @@ class DMFileBlockInputStreamBuilder
234244
String tracing_id;
235245
ReadTag read_tag = ReadTag::Internal;
236246

247+
DMFilePackFilterResultPtr pack_filter;
248+
237249
VectorIndexCachePtr vector_index_cache;
238250
// Note: Currently thie field is assigned only for Stable streams, not available for ColumnFileBig
239251
std::optional<BitmapFilterView> bitmap_filter;

0 commit comments

Comments
 (0)