Skip to content

Commit 56eedda

Browse files
authored
disagg: Limit the compute node cache download IOPS and bandwidth (#10558)
close #10557 * Prior to this PR, both the number of threads handling foreground reads and the number of background threads downloading files for FileCache were controlled by the same parameter: profiles.default.io_thread_count_scale, making it difficult to fine-tune these settings independently. This PR introduces two separate parameters to control the concurrency and queue length of FileCache downloads performed by Compute Node background threads, allowing adjustments to background thread settings without affecting the foreground read thread pool configuration: - Added profiles.default.dt_filecache_downloading_count_scale, with a default value of 2.0. This means the concurrency for FileCache background downloads is set to logical_cores profiles.default.dt_filecache_downloading_count_scale. - Changed the default value of profiles.default.dt_filecache_max_downloading_count_scale from 1.0 to 10.0. This parameter now defines the maximum queue length for FileCache background downloads as logical_cores profiles.default.dt_filecache_max_downloading_count_scale. Before this PR, the queue length is logical_cores * profiles.default.io_thread_count_scale * 2, so changing the default value is just making it not related to io_thread_count_scale any more. * FileCache downloads files from S3 to local storage, and this process is rate-limited by the rate_limiter parameter. * After persisting data received from Write Nodes into the LocalPageCache, Compute Nodes no longer invoke fsync, reducing their IOPS requirements. * Optimized the lock contention scope in `FileCache::get`. Signed-off-by: JaySon-Huang <tshent@qq.com>
1 parent fb92e9f commit 56eedda

18 files changed

Lines changed: 466 additions & 103 deletions

File tree

dbms/src/Common/TiFlashMetrics.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,6 +845,8 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
845845
Counter, \
846846
F(type_dtfile_hit, {"type", "dtfile_hit"}), \
847847
F(type_dtfile_miss, {"type", "dtfile_miss"}), \
848+
F(type_dtfile_not_cache_type, {"type", "dtfile_not_cache_type"}), \
849+
F(type_dtfile_too_many_download, {"type", "dtfile_too_many_download"}), \
848850
F(type_dtfile_evict, {"type", "dtfile_evict"}), \
849851
F(type_dtfile_full, {"type", "dtfile_full"}), \
850852
F(type_dtfile_download, {"type", "dtfile_download"}), \

dbms/src/IO/BaseFile/RateLimiter.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,12 @@ WriteLimiterPtr IORateLimiter::getWriteLimiter()
470470
return is_background_thread ? bg_write_limiter : fg_write_limiter;
471471
}
472472

473+
WriteLimiterPtr IORateLimiter::getBgWriteLimiter()
474+
{
475+
std::lock_guard lock(limiter_mtx);
476+
return bg_write_limiter;
477+
}
478+
473479
ReadLimiterPtr IORateLimiter::getReadLimiter()
474480
{
475481
std::lock_guard lock(limiter_mtx);

dbms/src/IO/BaseFile/RateLimiter.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ class IORateLimiter
213213
~IORateLimiter();
214214

215215
WriteLimiterPtr getWriteLimiter();
216+
WriteLimiterPtr getBgWriteLimiter();
217+
216218
ReadLimiterPtr getReadLimiter();
217219
void init(Poco::Util::AbstractConfiguration & config_);
218220
void updateConfig(Poco::Util::AbstractConfiguration & config_);

dbms/src/Interpreters/Settings.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,8 @@ struct Settings
244244
M(SettingInt64, remote_gc_small_size, 128 * 1024, "The files with total size less than this threshold will be compacted") \
245245
/* Disagg arch reading settings */ \
246246
M(SettingUInt64, dt_write_page_cache_limit_size, 2 * 1024 * 1024, "Limit size per write batch when compute node writing to PageStorage cache") \
247-
M(SettingDouble, dt_filecache_max_downloading_count_scale, 1.0, "Max downloading task count of FileCache = io thread count * dt_filecache_max_downloading_count_scale.") \
247+
M(SettingDouble, dt_filecache_downloading_count_scale, 2.0, "Max concurrency of download task count of FileCache = number of logical cpu cores * dt_filecache_downloading_count_scale.") \
248+
M(SettingDouble, dt_filecache_max_downloading_count_scale, 10.0, "Max queue size of download task count of FileCache = number of logical cpu cores * dt_filecache_max_downloading_count_scale.") \
248249
M(SettingUInt64, dt_filecache_min_age_seconds, 1800, "Files of the same priority can only be evicted from files that were not accessed within `dt_filecache_min_age_seconds` seconds.") \
249250
M(SettingBool, dt_enable_fetch_memtableset, true, "Whether fetching delta cache in FetchDisaggPages") \
250251
M(SettingUInt64, dt_fetch_pages_packet_limit_size, 512 * 1024, "Response packet bytes limit of FetchDisaggPages, 0 means one page per packet") \

dbms/src/Interpreters/SharedContexts/Disagg.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,27 @@ void SharedContextDisagg::initReadNodePageCache(
3737
PSDiskDelegatorPtr delegator;
3838
if (!cache_dir.empty())
3939
{
40+
{
41+
// Compute node disable fsync to improve performance, so we need to
42+
// clear the cache dir before use to avoid corrupted data.
43+
auto dir = Poco::File(cache_dir);
44+
if (dir.exists())
45+
{
46+
LOG_INFO(
47+
Logger::get(),
48+
"Local cache directory will be cleared before setting up LocalPageCache",
49+
cache_dir);
50+
dir.remove(true);
51+
}
52+
else
53+
{
54+
LOG_INFO(
55+
Logger::get(),
56+
"Local cache directory not exist when setting up LocalPageCache",
57+
cache_dir);
58+
}
59+
}
60+
4061
delegator = path_pool.getPSDiskDelegatorFixedDirectory(cache_dir);
4162
LOG_INFO(
4263
Logger::get(),

dbms/src/Server/Server.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,9 +328,11 @@ void adjustThreadPoolSize(const Settings & settings, size_t logical_cores)
328328
}
329329
if (S3FileCachePool::instance)
330330
{
331-
S3FileCachePool::instance->setMaxThreads(max_io_thread_count);
332-
S3FileCachePool::instance->setMaxFreeThreads(max_io_thread_count / 2);
333-
S3FileCachePool::instance->setQueueSize(max_io_thread_count * 2);
331+
auto concurrency = logical_cores * settings.dt_filecache_downloading_count_scale;
332+
auto queue_size = logical_cores * settings.dt_filecache_max_downloading_count_scale;
333+
S3FileCachePool::instance->setMaxThreads(concurrency);
334+
S3FileCachePool::instance->setMaxFreeThreads(concurrency / 2);
335+
S3FileCachePool::instance->setQueueSize(queue_size);
334336
}
335337
if (RNWritePageCachePool::instance)
336338
{
@@ -691,7 +693,11 @@ try
691693
if (const auto & config = storage_config.remote_cache_config; config.isCacheEnabled() && is_disagg_compute_mode)
692694
{
693695
config.initCacheDir();
694-
FileCache::initialize(global_context->getPathCapacity(), config);
696+
FileCache::initialize(
697+
global_context->getPathCapacity(),
698+
config,
699+
server_info.cpu_info.logical_cores,
700+
global_context->getIORateLimiter());
695701
}
696702

697703
/// Determining PageStorage run mode based on current files on disk and storage config.

dbms/src/Storages/DeltaMerge/File/tests/gtest_dm_meta_version.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,12 @@ try
473473
.dir = fmt::format("{}/fs_cache", getTemporaryPath()),
474474
.capacity = 1 * 1000 * 1000 * 1000,
475475
};
476-
FileCache::initialize(db_context->getGlobalContext().getPathCapacity(), file_cache_config);
476+
UInt16 vcores = 8;
477+
FileCache::initialize(
478+
db_context->getGlobalContext().getPathCapacity(),
479+
file_cache_config,
480+
vcores,
481+
db_context->getGlobalContext().getIORateLimiter());
477482

478483
auto dm_file = prepareDMFileRemote(/* file_id= */ 1);
479484
ASSERT_TRUE(dm_file->path().starts_with("s3://"));

dbms/src/Storages/DeltaMerge/Index/VectorIndex/tests/gtest_dm_vector_index.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1926,7 +1926,13 @@ class VectorIndexSegmentOnS3Test
19261926
.dir = fmt::format("{}/fs_cache", getTemporaryPath()),
19271927
.capacity = 1 * 1000 * 1000 * 1000,
19281928
};
1929-
FileCache::initialize(global_context.getPathCapacity(), file_cache_config);
1929+
1930+
UInt16 vcores = 8;
1931+
FileCache::initialize(
1932+
global_context.getPathCapacity(),
1933+
file_cache_config,
1934+
vcores,
1935+
global_context.getIORateLimiter());
19301936

19311937
auto cols = DMTestEnv::getDefaultColumns();
19321938
cols->emplace_back(cdVec());

dbms/src/Storages/DeltaMerge/Remote/RNLocalPageCache.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ void RNLocalPageCache::write(
8383
}
8484

8585
UniversalWriteBatch cache_wb;
86+
cache_wb.disableFSync(); // RNLocalPageCache doesn't need fsync. Disable it to reduce IOPS requirements.
8687
cache_wb.putPage(key, 0, read_buffer, size, field_sizes);
8788
GET_METRIC(tiflash_storage_remote_cache, type_page_download).Increment();
8889
GET_METRIC(tiflash_storage_remote_cache_bytes, type_page_download_bytes).Increment(size);
@@ -113,6 +114,7 @@ void RNLocalPageCache::write(UniversalWriteBatch && wb)
113114
itr->second.size);
114115
}
115116
}
117+
wb.disableFSync(); // RNLocalPageCache doesn't need fsync. Disable it to reduce IOPS requirements.
116118
GET_METRIC(tiflash_storage_remote_cache, type_page_download).Increment(wb.getWrites().size());
117119
GET_METRIC(tiflash_storage_remote_cache_bytes, type_page_download_bytes).Increment(wb.getTotalDataSize());
118120
storage->write(std::move(wb));

dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,12 @@ class SegmentReplaceStableDataDisaggregated
426426
.dir = fmt::format("{}/fs_cache", getTemporaryPath()),
427427
.capacity = 1 * 1000 * 1000 * 1000,
428428
};
429-
FileCache::initialize(global_context.getPathCapacity(), file_cache_config);
429+
UInt16 vcores = 8;
430+
FileCache::initialize(
431+
global_context.getPathCapacity(),
432+
file_cache_config,
433+
vcores,
434+
global_context.getIORateLimiter());
430435
}
431436

432437
table_columns = DMTestEnv::getDefaultColumns();

0 commit comments

Comments
 (0)