Skip to content

Commit 6a8b2c6

Browse files
Storage: Improve small partition table read performance by limit concurrency (#10489) (#10499)
close #10487 * Limit the number of source ops by num of segment task * 4 in function `DeltaMergeStore::read`. In order to reduce concurrency overhead and let PartitionTableScan with small partitions that only contains 1~2 segments can schedule more segment read tasks in parallel. - For large partition, the storage layer still generate `num_streams` * `UnorderedSourceOp`, the behavior is the same as before. - For small partition, the storage layer only generate segment task * 4 * `UnorderedSourceOp`. And `ConcatBuilderPool` reorg the source ops and read the data from multiple partitions in parallel * Introduce `DMReadOptions` and reduce the complexity of adding has_multiple_partitions from compute layer to storage layer. * Add active_segment_limit, peak_active_segments, block_slot_limit, peak_blocks_in_queue when `SegmentReadTaskPool` finished Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io> Signed-off-by: JaySon-Huang <tshent@qq.com> Co-authored-by: JaySon <tshent@qq.com> Co-authored-by: JaySon-Huang <tshent@qq.com>
1 parent a38d75c commit 6a8b2c6

20 files changed

Lines changed: 389 additions & 229 deletions

dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,13 +921,15 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
921921
RUNTIME_CHECK_MSG(mvcc_query_info->scan_context != nullptr, "Unexpected null scan_context");
922922
if (table_scan.isPartitionTableScan())
923923
{
924+
bool has_multiple_partitions = table_scan.getPhysicalTableIDs().size() > 1;
924925
for (const auto physical_table_id : table_scan.getPhysicalTableIDs())
925926
{
926927
SelectQueryInfo query_info = create_query_info(physical_table_id);
927928
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>(
928929
mvcc_query_info->resolve_locks,
929930
mvcc_query_info->start_ts,
930931
mvcc_query_info->scan_context);
932+
query_info.has_multiple_partitions = has_multiple_partitions;
931933
ret.emplace(physical_table_id, std::move(query_info));
932934
}
933935
// Dispatch the regions_query_info to different physical table's query_info

dbms/src/Operators/tests/gtest_concat_source.cpp

Lines changed: 159 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,18 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#include <Common/Logger.h>
16+
#include <Common/Stopwatch.h>
17+
#include <Common/ThreadManager.h>
1518
#include <Flash/Executor/PipelineExecutorContext.h>
19+
#include <Flash/Pipeline/Schedule/Tasks/NotifyFuture.h>
1620
#include <Operators/ConcatSourceOp.h>
21+
#include <Storages/DeltaMerge/ReadThread/WorkQueue.h>
1722
#include <TestUtils/ColumnGenerator.h>
23+
#include <TestUtils/TiFlashTestBasic.h>
1824
#include <gtest/gtest.h>
1925

2026
#include <memory>
21-
#include <type_traits>
2227

2328
namespace DB::tests
2429
{
@@ -97,4 +102,157 @@ TEST_F(TestConcatSource, concatSink)
97102
ASSERT_EQ(actual_block_cnt, block_cnt);
98103
}
99104

105+
namespace
106+
{
107+
class SyncBlocks
108+
{
109+
public:
110+
explicit SyncBlocks(Blocks blks_)
111+
: blocks(std::move(blks_))
112+
, current(blocks.begin())
113+
{}
114+
115+
Block getNext()
116+
{
117+
std::lock_guard lock(mtx);
118+
if (current == blocks.end())
119+
return Block{};
120+
Block res = *current;
121+
++current;
122+
return res;
123+
}
124+
125+
private:
126+
std::mutex mtx;
127+
Blocks blocks;
128+
Blocks::iterator current;
129+
};
130+
131+
class MockSourceOpFromQueue : public SourceOp
132+
{
133+
public:
134+
MockSourceOpFromQueue(
135+
PipelineExecutorContext & exec_context_,
136+
const std::shared_ptr<SyncBlocks> & sync_blocks_,
137+
const Block & header)
138+
: SourceOp(exec_context_, "mock")
139+
, sync_blocks(sync_blocks_)
140+
{
141+
setHeader(header);
142+
}
143+
144+
String getName() const override { return "MockSourceOpFromQueue"; }
145+
146+
protected:
147+
OperatorStatus readImpl(Block & block) override
148+
{
149+
block = sync_blocks->getNext();
150+
return OperatorStatus::HAS_OUTPUT;
151+
}
152+
153+
private:
154+
std::shared_ptr<SyncBlocks> sync_blocks;
155+
};
156+
class SimpleGetResultSinkOp : public SinkOp
157+
{
158+
public:
159+
SimpleGetResultSinkOp(PipelineExecutorContext & exec_context_, const String & req_id, ResultHandler result_handler_)
160+
: SinkOp(exec_context_, req_id)
161+
, result_handler(std::move(result_handler_))
162+
{
163+
assert(result_handler);
164+
}
165+
166+
String getName() const override { return "SimpleGetResultSinkOp"; }
167+
168+
protected:
169+
OperatorStatus writeImpl(Block && block) override
170+
{
171+
if (!block)
172+
return OperatorStatus::FINISHED;
173+
174+
result_handler(block);
175+
return OperatorStatus::NEED_INPUT;
176+
}
177+
178+
private:
179+
ResultHandler result_handler;
180+
};
181+
182+
} // namespace
183+
184+
TEST_F(TestConcatSource, ConcatBuilderPoolWithDifferentConcurrency)
185+
try
186+
{
187+
LoggerPtr log = Logger::get();
188+
189+
Blocks blks{
190+
Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})},
191+
Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})},
192+
Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})},
193+
Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})},
194+
Block{ColumnGenerator::instance().generate({2, "Int32", DataDistribution::RANDOM})},
195+
};
196+
197+
Block header = blks[0].cloneEmpty();
198+
199+
200+
PipelineExecutorContext exec_context;
201+
size_t num_concurrency = 8;
202+
ConcatBuilderPool builder_pool{num_concurrency};
203+
// Mock that for each partition (physical table), there is a read task pool and multiple source ops reading from it.
204+
size_t num_partitions = 2;
205+
for (size_t idx_part = 0; idx_part < num_partitions; ++idx_part)
206+
{
207+
// Mock that different partitions have different concurrency.
208+
size_t partition_concurrency = num_concurrency;
209+
if (idx_part == 0)
210+
partition_concurrency = 4;
211+
else if (idx_part == 1)
212+
partition_concurrency = 2;
213+
// Mock a queue shared on one partition
214+
auto blks_queue = std::make_shared<SyncBlocks>(blks);
215+
PipelineExecGroupBuilder group_builder;
216+
for (size_t i = 0; i < partition_concurrency; ++i)
217+
{
218+
group_builder.addConcurrency(std::make_unique<MockSourceOpFromQueue>(exec_context, blks_queue, header));
219+
}
220+
builder_pool.add(group_builder);
221+
}
222+
223+
std::atomic<size_t> received_blocks = 0;
224+
ResultHandler h([&](const Block & /*block*/) { received_blocks.fetch_add(1, std::memory_order_relaxed); });
225+
226+
PipelineExecGroupBuilder result_builder;
227+
builder_pool.generate(result_builder, exec_context, "test");
228+
result_builder.transform(
229+
[&](auto & builder) { builder.setSinkOp(std::make_unique<SimpleGetResultSinkOp>(exec_context, "test", h)); });
230+
auto op_pipeline_grp = result_builder.build(false);
231+
232+
233+
auto mgr = newThreadPoolManager(num_concurrency);
234+
235+
for (const auto & pipe : op_pipeline_grp)
236+
{
237+
mgr->schedule(false, [&pipe, &log]() {
238+
pipe->executePrefix();
239+
while (true)
240+
{
241+
auto s = pipe->execute();
242+
if (s == OperatorStatus::FINISHED)
243+
{
244+
LOG_INFO(log, "ConcatPipelineExec is finished");
245+
break;
246+
}
247+
}
248+
pipe->executeSuffix();
249+
});
250+
}
251+
mgr->wait();
252+
253+
LOG_INFO(log, "ConcatPipelineExec is built and executed, received_blocks={}", received_blocks.load());
254+
ASSERT_EQ(received_blocks.load(), blks.size() * num_partitions);
255+
}
256+
CATCH
257+
100258
} // namespace DB::tests

dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1250,8 +1250,7 @@ BlockInputStreams DeltaMergeStore::read(
12501250
const RuntimeFilteList & runtime_filter_list,
12511251
int rf_max_wait_time_ms,
12521252
const String & tracing_id,
1253-
bool keep_order,
1254-
bool is_fast_scan,
1253+
const DMReadOptions & read_opts,
12551254
size_t expected_block_size,
12561255
const SegmentIdSet & read_segments,
12571256
size_t extra_table_id_index,
@@ -1261,7 +1260,7 @@ BlockInputStreams DeltaMergeStore::read(
12611260
auto dm_context = newDMContext(db_context, db_settings, tracing_id, scan_context);
12621261

12631262
// If keep order is required, disable read thread.
1264-
auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread && !keep_order;
1263+
auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread && !read_opts.keep_order;
12651264
// SegmentReadTaskScheduler and SegmentReadTaskPool use table_id + segment id as unique ID when read thread is enabled.
12661265
// 'try_split_task' can result in several read tasks with the same id that can cause some trouble.
12671266
// Also, too many read tasks of a segment with different small ranges is not good for data sharing cache.
@@ -1281,7 +1280,7 @@ BlockInputStreams DeltaMergeStore::read(
12811280

12821281
GET_METRIC(tiflash_storage_read_tasks_count).Increment(tasks.size());
12831282
size_t final_num_stream = std::max(1, std::min(num_streams, tasks.size()));
1284-
auto read_mode = getReadMode(db_context, is_fast_scan, keep_order, filter);
1283+
auto read_mode = getReadMode(db_context, read_opts.is_fast_scan, read_opts.keep_order, filter);
12851284
const auto & final_columns_to_read = filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read;
12861285
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(
12871286
extra_table_id_index,
@@ -1334,10 +1333,10 @@ BlockInputStreams DeltaMergeStore::read(
13341333
"Read create stream done, keep_order={} dt_enable_read_thread={} enable_read_thread={} "
13351334
"is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} "
13361335
"final_columns_to_read={}",
1337-
keep_order,
1336+
read_opts.keep_order,
13381337
db_context.getSettingsRef().dt_enable_read_thread,
13391338
enable_read_thread,
1340-
is_fast_scan,
1339+
read_opts.is_fast_scan,
13411340
filter == nullptr || filter->before_where == nullptr,
13421341
read_task_pool->pool_id,
13431342
final_num_stream,
@@ -1360,8 +1359,7 @@ void DeltaMergeStore::read(
13601359
const RuntimeFilteList & runtime_filter_list,
13611360
int rf_max_wait_time_ms,
13621361
const String & tracing_id,
1363-
bool keep_order,
1364-
bool is_fast_scan,
1362+
const DMReadOptions & read_opts,
13651363
size_t expected_block_size,
13661364
const SegmentIdSet & read_segments,
13671365
size_t extra_table_id_index,
@@ -1371,7 +1369,7 @@ void DeltaMergeStore::read(
13711369
auto dm_context = newDMContext(db_context, db_settings, tracing_id, scan_context);
13721370

13731371
// If keep order is required, disable read thread.
1374-
auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread && !keep_order;
1372+
auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread && !read_opts.keep_order;
13751373
// SegmentReadTaskScheduler and SegmentReadTaskPool use table_id + segment id as unique ID when read thread is enabled.
13761374
// 'try_split_task' can result in several read tasks with the same id that can cause some trouble.
13771375
// Also, too many read tasks of a segment with different small ranges is not good for data sharing cache.
@@ -1390,9 +1388,24 @@ void DeltaMergeStore::read(
13901388
};
13911389

13921390
GET_METRIC(tiflash_storage_read_tasks_count).Increment(tasks.size());
1393-
size_t final_num_stream
1394-
= enable_read_thread ? std::max(1, num_streams) : std::max(1, std::min(num_streams, tasks.size()));
1395-
auto read_mode = getReadMode(db_context, is_fast_scan, keep_order, filter);
1391+
size_t final_num_stream = 0;
1392+
if (enable_read_thread)
1393+
{
1394+
// For limited tasks size under `enable_read_thread`, too much source ops actually lead to
1395+
// the table scan speed can not match the compute layer speed and lead to more concurrency
1396+
// overhead. So we limit the final_num_stream to tasks.size() * 4 when read thread is enabled
1397+
// under multiple partitions.
1398+
if (read_opts.has_multiple_partitions)
1399+
final_num_stream = std::min(num_streams, tasks.size() * 4);
1400+
else
1401+
final_num_stream = num_streams;
1402+
final_num_stream = std::max(1, final_num_stream);
1403+
}
1404+
else
1405+
{
1406+
final_num_stream = std::max(1, std::min(num_streams, tasks.size()));
1407+
}
1408+
auto read_mode = getReadMode(db_context, read_opts.is_fast_scan, read_opts.keep_order, filter);
13961409
const auto & final_columns_to_read = filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read;
13971410
auto read_task_pool = std::make_shared<SegmentReadTaskPool>(
13981411
extra_table_id_index,
@@ -1454,10 +1467,10 @@ void DeltaMergeStore::read(
14541467
"Read create PipelineExec done, keep_order={} dt_enable_read_thread={} enable_read_thread={} "
14551468
"is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} "
14561469
"final_columns_to_read={}",
1457-
keep_order,
1470+
read_opts.keep_order,
14581471
db_context.getSettingsRef().dt_enable_read_thread,
14591472
enable_read_thread,
1460-
is_fast_scan,
1473+
read_opts.is_fast_scan,
14611474
filter == nullptr || filter->before_where == nullptr,
14621475
read_task_pool->pool_id,
14631476
final_num_stream,

dbms/src/Storages/DeltaMerge/DeltaMergeStore.h

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,13 @@ using LocalIndexesStats = std::vector<LocalIndexStats>;
198198
class DeltaMergeStore;
199199
using DeltaMergeStorePtr = std::shared_ptr<DeltaMergeStore>;
200200

201+
// TODO: merge more parameters to ReadOptions
202+
struct DMReadOptions
203+
{
204+
bool keep_order = false;
205+
bool is_fast_scan = false;
206+
bool has_multiple_partitions = false;
207+
};
201208
class DeltaMergeStore
202209
: private boost::noncopyable
203210
, public std::enable_shared_from_this<DeltaMergeStore>
@@ -376,7 +383,7 @@ class DeltaMergeStore
376383

377384
/// You must ensure external files are ordered and do not overlap. Otherwise exceptions will be thrown.
378385
/// You must ensure all of the external files are contained by the range. Otherwise exceptions will be thrown.
379-
/// Return the 'ingtested bytes'.
386+
/// Return the 'ingested bytes'.
380387
UInt64 ingestFiles(
381388
const Context & db_context, //
382389
const DB::Settings & db_settings,
@@ -468,8 +475,7 @@ class DeltaMergeStore
468475
const RuntimeFilteList & runtime_filter_list,
469476
int rf_max_wait_time_ms,
470477
const String & tracing_id,
471-
bool keep_order,
472-
bool is_fast_scan = false,
478+
const DMReadOptions & read_opts = {},
473479
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
474480
const SegmentIdSet & read_segments = {},
475481
size_t extra_table_id_index = InvalidColumnID,
@@ -493,8 +499,7 @@ class DeltaMergeStore
493499
const RuntimeFilteList & runtime_filter_list,
494500
int rf_max_wait_time_ms,
495501
const String & tracing_id,
496-
bool keep_order,
497-
bool is_fast_scan = false,
502+
const DMReadOptions & read_opts = {},
498503
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
499504
const SegmentIdSet & read_segments = {},
500505
size_t extra_table_id_index = InvalidColumnID,
@@ -633,7 +638,7 @@ class DeltaMergeStore
633638
void waitForDeleteRange(const DMContextPtr & context, const SegmentPtr & segment);
634639

635640
/// Should be called after every write into DeltaMergeStore.
636-
/// If the delta cache reaches the foreground flush limit, it will also trigger a KVStore flush of releated regions,
641+
/// If the delta cache reaches the foreground flush limit, it will also trigger a KVStore flush of related regions,
637642
/// by returning a non-empty DM::WriteResult.
638643
// Deferencing `Iter` can get a pointer to a Segment.
639644
template <typename Iter>
@@ -848,18 +853,18 @@ class DeltaMergeStore
848853
private:
849854
/**
850855
* Remove the segment from the store's memory structure.
851-
* Not protected by lock, should accquire lock before calling this function.
856+
* Not protected by lock, should acquire lock before calling this function.
852857
*/
853858
void removeSegment(std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment);
854859
/**
855860
* Add the segment to the store's memory structure.
856-
* Not protected by lock, should accquire lock before calling this function.
861+
* Not protected by lock, should acquire lock before calling this function.
857862
*/
858863
void addSegment(std::unique_lock<std::shared_mutex> &, const SegmentPtr & segment);
859864
/**
860865
* Replace the old segment with the new segment in the store's memory structure.
861866
* New segment should have the same segment id as the old segment.
862-
* Not protected by lock, should accquire lock before calling this function.
867+
* Not protected by lock, should acquire lock before calling this function.
863868
*/
864869
void replaceSegment(
865870
std::unique_lock<std::shared_mutex> &,

0 commit comments

Comments
 (0)