Skip to content

Commit 54fff1b

Browse files
authored
[Enhancement](client) Supports dynamically changing the rate limiter config (apache#59465)
This PR adds support for dynamically updating S3 rate limiter configuration at runtime. It adds getter methods to S3RateLimiter and S3RateLimiterHolder, and attempts to trigger rate limiter updates when S3 clients are created.
1 parent 99446ca commit 54fff1b

3 files changed

Lines changed: 102 additions & 0 deletions

File tree

be/src/util/s3_util.cpp

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
#include <aws/s3/S3Client.h>
3131
#include <aws/sts/STSClient.h>
3232
#include <bvar/reducer.h>
33+
#include <cpp/s3_rate_limiter.h>
3334
#include <util/string_util.h>
3435

3536
#include <atomic>
37+
3638
#ifdef USE_AZURE
3739
#include <azure/core/diagnostics/logger.hpp>
3840
#include <azure/storage/blobs/blob_container_client.hpp>
@@ -131,11 +133,66 @@ bvar::Adder<int64_t> get_rate_limit_exceed_req_num("get_rate_limit_exceed_req_nu
131133
bvar::Adder<int64_t> put_rate_limit_ns("put_rate_limit_ns");
132134
bvar::Adder<int64_t> put_rate_limit_exceed_req_num("put_rate_limit_exceed_req_num");
133135

136+
static std::atomic<int64_t> last_s3_get_token_bucket_tokens {0};
137+
static std::atomic<int64_t> last_s3_get_token_limit {0};
138+
static std::atomic<int64_t> last_s3_get_token_per_second {0};
139+
static std::atomic<int64_t> last_s3_put_token_per_second {0};
140+
static std::atomic<int64_t> last_s3_put_token_bucket_tokens {0};
141+
static std::atomic<int64_t> last_s3_put_token_limit {0};
142+
143+
static std::atomic<bool> updating_get_limiter {false};
144+
static std::atomic<bool> updating_put_limiter {false};
145+
134146
S3RateLimiterHolder* S3ClientFactory::rate_limiter(S3RateLimitType type) {
135147
CHECK(type == S3RateLimitType::GET || type == S3RateLimitType::PUT) << to_string(type);
136148
return _rate_limiters[static_cast<size_t>(type)].get();
137149
}
138150

151+
template <S3RateLimitType LimiterType>
152+
void update_rate_limiter_if_changed(int64_t current_tps, int64_t current_bucket,
153+
int64_t current_limit, std::atomic<int64_t>& last_tps,
154+
std::atomic<int64_t>& last_bucket,
155+
std::atomic<int64_t>& last_limit,
156+
std::atomic<bool>& updating_flag, const char* limiter_name) {
157+
if (last_tps.load(std::memory_order_relaxed) != current_tps ||
158+
last_bucket.load(std::memory_order_relaxed) != current_bucket ||
159+
last_limit.load(std::memory_order_relaxed) != current_limit) {
160+
bool expected = false;
161+
if (!updating_flag.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
162+
return;
163+
}
164+
if (last_tps.load(std::memory_order_acquire) != current_tps ||
165+
last_bucket.load(std::memory_order_acquire) != current_bucket ||
166+
last_limit.load(std::memory_order_acquire) != current_limit) {
167+
int ret =
168+
reset_s3_rate_limiter(LimiterType, current_tps, current_bucket, current_limit);
169+
170+
if (ret == 0) {
171+
last_tps.store(current_tps, std::memory_order_release);
172+
last_bucket.store(current_bucket, std::memory_order_release);
173+
last_limit.store(current_limit, std::memory_order_release);
174+
} else {
175+
LOG(WARNING) << "Failed to reset S3 " << limiter_name
176+
<< " rate limiter, error code: " << ret;
177+
}
178+
}
179+
180+
updating_flag.store(false, std::memory_order_release);
181+
}
182+
}
183+
184+
void check_s3_rate_limiter_config_changed() {
185+
update_rate_limiter_if_changed<S3RateLimitType::GET>(
186+
config::s3_get_token_per_second, config::s3_get_bucket_tokens,
187+
config::s3_get_token_limit, last_s3_get_token_per_second,
188+
last_s3_get_token_bucket_tokens, last_s3_get_token_limit, updating_get_limiter, "GET");
189+
190+
update_rate_limiter_if_changed<S3RateLimitType::PUT>(
191+
config::s3_put_token_per_second, config::s3_put_bucket_tokens,
192+
config::s3_put_token_limit, last_s3_put_token_per_second,
193+
last_s3_put_token_bucket_tokens, last_s3_put_token_limit, updating_put_limiter, "PUT");
194+
}
195+
139196
int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t max_burst, size_t limit) {
140197
if (type == S3RateLimitType::UNKNOWN) {
141198
return -1;
@@ -204,6 +261,8 @@ std::shared_ptr<io::ObjStorageClient> S3ClientFactory::create(const S3ClientConf
204261
return nullptr;
205262
}
206263

264+
check_s3_rate_limiter_config_changed();
265+
207266
#ifdef BE_TEST
208267
{
209268
std::lock_guard l(_lock);

be/test/io/client/s3_file_system_test.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2414,4 +2414,35 @@ TEST_F(S3FileSystemTest, AzureRateLimiterDeleteDirectoryExceptionHandlingTest) {
24142414
ASSERT_TRUE(status.ok()) << "Failed to cleanup remaining directory: " << status.to_string();
24152415
}
24162416

2417+
TEST_F(S3FileSystemTest, DynamicUpdateRateLimiterConfig) {
2418+
// Save original config values
2419+
int64_t original_get_bucket_tokens = config::s3_get_bucket_tokens;
2420+
int64_t original_get_token_per_second = config::s3_get_token_per_second;
2421+
int64_t original_get_token_limit = config::s3_get_token_limit;
2422+
2423+
std::cout << "Original GET config: bucket_tokens=" << original_get_bucket_tokens
2424+
<< ", token_per_second=" << original_get_token_per_second
2425+
<< ", limit=" << original_get_token_limit << std::endl;
2426+
2427+
int64_t new_s3_get_bucket_tokens_val = 50;
2428+
int64_t new_s3_get_token_per_second_val = 1;
2429+
2430+
auto [success1, msg7] = config::set_config(
2431+
"s3_get_bucket_tokens", std::to_string(new_s3_get_bucket_tokens_val), false, false);
2432+
ASSERT_EQ(success1, 0) << "Failed to set s3_get_bucket_tokens: " << msg7;
2433+
auto [success2, msg8] =
2434+
config::set_config("s3_get_token_per_second",
2435+
std::to_string(new_s3_get_token_per_second_val), false, false);
2436+
ASSERT_EQ(success2, 0) << "Failed to set s3_get_token_per_second: " << msg8;
2437+
2438+
auto st = create_client();
2439+
ASSERT_TRUE(st.ok());
2440+
2441+
// Verify restoration
2442+
EXPECT_EQ(S3ClientFactory::instance().rate_limiter(S3RateLimitType::GET)->get_max_burst(),
2443+
new_s3_get_bucket_tokens_val);
2444+
EXPECT_EQ(S3ClientFactory::instance().rate_limiter(S3RateLimitType::GET)->get_max_speed(),
2445+
new_s3_get_token_per_second_val);
2446+
}
2447+
24172448
} // namespace doris

common/cpp/s3_rate_limiter.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ class S3RateLimiter {
5252
// Returns duration of sleep in nanoseconds (to distinguish sleeping on different kinds of S3RateLimiters for metrics)
5353
int64_t add(size_t amount);
5454

55+
size_t get_max_speed() const { return _max_speed; }
56+
57+
size_t get_max_burst() const { return _max_burst; }
58+
59+
size_t get_limit() const { return _limit; }
60+
5561
private:
5662
std::pair<size_t, double> _update_remain_token(long now, size_t amount);
5763
size_t _count {0};
@@ -75,6 +81,12 @@ class S3RateLimiterHolder {
7581

7682
int reset(size_t max_speed, size_t max_burst, size_t limit);
7783

84+
size_t get_max_speed() const { return rate_limiter->get_max_speed(); }
85+
86+
size_t get_max_burst() const { return rate_limiter->get_max_burst(); }
87+
88+
size_t get_limit() const { return rate_limiter->get_limit(); }
89+
7890
private:
7991
std::shared_mutex rate_limiter_rw_lock;
8092
std::unique_ptr<S3RateLimiter> rate_limiter;

0 commit comments

Comments
 (0)