@@ -122,10 +122,11 @@ void PreHandlingTrace::waitForSubtaskResources(uint64_t region_id, size_t parall
122122 .Observe (watch.elapsedSeconds ());
123123 LOG_INFO (
124124 log,
125- " Prehandle resource acquired after {:.3f} seconds, region_id={} parallel={}" ,
125+ " Prehandle resource acquired after {:.3f} seconds, region_id={} parallel={} limit={} " ,
126126 watch.elapsedSeconds (),
127127 region_id,
128- parallel);
128+ parallel,
129+ parallel_subtask_limit);
129130 CurrentMetrics::sub (CurrentMetrics::RaftNumWaitedParallelPrehandlingTasks);
130131}
131132
@@ -301,20 +302,41 @@ PrehandleResult KVStore::preHandleSnapshotToFiles(
301302 return PrehandleResult{};
302303}
303304
304- size_t KVStore::getMaxParallelPrehandleSize () const
305+ size_t KVStore::getMaxParallelPrehandleSize (DM::FileConvertJobType job_type ) const
305306{
306- const auto & proxy_config = getProxyConfigSummay ();
307- size_t total_concurrency = 0 ;
308- if (proxy_config.valid )
307+ return getMaxPrehandleSubtaskSize (job_type);
308+ }
309+
310+ size_t KVStore::getMaxPrehandleSubtaskSize (DM::FileConvertJobType job_type) const
311+ {
312+ const auto & proxy_config = getProxyConfigSummary ();
313+ switch (job_type)
314+ {
315+ case DM::FileConvertJobType::ApplySnapshot:
309316 {
310- total_concurrency = proxy_config.snap_handle_pool_size ;
317+ if (proxy_config.valid && proxy_config.snap_handle_pool_size > 0 )
318+ {
319+ return proxy_config.snap_handle_pool_size ;
320+ }
321+ else
322+ {
323+ auto cpu_num = std::thread::hardware_concurrency ();
324+ return static_cast <size_t >(std::clamp (cpu_num * 0.7 , 2.0 , 16.0 ));
325+ }
311326 }
312- else
327+ case DM::FileConvertJobType::IngestSST:
313328 {
314- auto cpu_num = std::thread::hardware_concurrency ();
315- total_concurrency = static_cast <size_t >(std::clamp (cpu_num * 0.7 , 2.0 , 16.0 ));
329+ if (proxy_config.valid && proxy_config.apply_low_priority_pool_size > 0 )
330+ {
331+ return proxy_config.apply_low_priority_pool_size ;
332+ }
333+ else
334+ {
335+ auto cpu_num = std::thread::hardware_concurrency ();
336+ return std::max (1 , static_cast <size_t >(cpu_num));
337+ }
338+ }
316339 }
317- return total_concurrency;
318340}
319341
320342// If size is 0, do not parallel prehandle for this snapshot, which is regular.
@@ -323,7 +345,8 @@ static inline std::pair<std::vector<std::string>, size_t> getSplitKey(
323345 LoggerPtr log,
324346 KVStore * kvstore,
325347 RegionPtr new_region,
326- std::shared_ptr<DM::SSTFilesToBlockInputStream> sst_stream)
348+ std::shared_ptr<DM::SSTFilesToBlockInputStream> sst_stream,
349+ DM::FileConvertJobType job_type)
327350{
328351 // We don't use this is the single snapshot is small, due to overhead in decoding.
329352 constexpr size_t default_parallel_prehandle_threshold = 1 * 1024 * 1024 * 1024 ;
@@ -356,7 +379,12 @@ static inline std::pair<std::vector<std::string>, size_t> getSplitKey(
356379 // so we must add 1 here.
357380 auto ongoing_count = kvstore->getOngoingPrehandleSubtaskCount () + 1 ;
358381 uint64_t want_split_parts = 0 ;
359- auto total_concurrency = kvstore->getMaxParallelPrehandleSize ();
382+ // If total_concurrency is 4, and prehandle-pool is sized 8,
383+ // and if there are 4 ongoing snapshots, then we will not parallel prehandling any new snapshot.
384+ // This is because in serverless, too much parallelism causes performance reduction.
385+ // So, if there is already enough parallelism that is used to prehandle,
386+ // it is not necessary to manually split a snapshot.
387+ auto total_concurrency = kvstore->getMaxParallelPrehandleSize (job_type);
360388 if (total_concurrency + 1 > ongoing_count)
361389 {
362390 // Current thread takes 1 which is in `ongoing_count`.
@@ -385,7 +413,7 @@ static inline std::pair<std::vector<std::string>, size_t> getSplitKey(
385413 if (split_keys.size () + 1 < want_split_parts)
386414 {
387415 // If there are too few split keys, the `split_keys` itself may be not be uniformly distributed,
388- // it is even better that we still handle it sequantially .
416+ // it is even better that we still handle it sequentially .
389417 split_keys.clear ();
390418 LOG_INFO (
391419 log,
@@ -717,8 +745,11 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles(
717745 };
718746
719747 // `split_keys` do not begin with 'z'.
720- auto [split_keys, approx_bytes] = getSplitKey (log, this , new_region, sst_stream);
721- prehandling_trace.waitForSubtaskResources (region_id, split_keys.size () + 1 , getMaxParallelPrehandleSize ());
748+ auto [split_keys, approx_bytes] = getSplitKey (log, this , new_region, sst_stream, job_type);
749+ prehandling_trace.waitForSubtaskResources (
750+ region_id,
751+ split_keys.size () + 1 ,
752+ getMaxPrehandleSubtaskSize (job_type));
722753 ReadFromStreamResult result;
723754 if (split_keys.empty ())
724755 {
0 commit comments