Skip to content

Commit 2065661

Browse files
committed
Add rate limiting
1 parent daf6f55 commit 2065661

3 files changed

Lines changed: 168 additions & 28 deletions

File tree

core/client.h

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,37 +15,43 @@
1515
#include "db.h"
1616
#include "core_workload.h"
1717
#include "utils/countdown_latch.h"
18+
#include "utils/rate_limit.h"
1819
#include "utils/utils.h"
1920

2021
namespace ycsbc {
2122

2223
inline int ClientThread(ycsbc::DB *db, ycsbc::CoreWorkload *wl, const int num_ops, bool is_loading,
23-
bool init_db, bool cleanup_db, CountDownLatch *latch) {
24-
try {
25-
if (init_db) {
26-
db->Init();
27-
}
28-
29-
int ops = 0;
30-
for (int i = 0; i < num_ops; ++i) {
31-
if (is_loading) {
32-
wl->DoInsert(*db);
33-
} else {
34-
wl->DoTransaction(*db);
35-
}
36-
ops++;
37-
}
38-
39-
if (cleanup_db) {
40-
db->Cleanup();
41-
}
42-
43-
latch->CountDown();
44-
return ops;
45-
} catch(const utils::Exception& e) {
46-
std::cerr<<"Caught exception: "<<e.what()<<std::endl;
47-
exit(1);
24+
bool init_db, bool cleanup_db, utils::CountDownLatch *latch, utils::RateLimiter *rlim) {
25+
26+
try {
27+
if (init_db) {
28+
db->Init();
29+
}
30+
31+
int ops = 0;
32+
for (int i = 0; i < num_ops; ++i) {
33+
if (rlim) {
34+
rlim->Consume(1);
35+
}
36+
37+
if (is_loading) {
38+
wl->DoInsert(*db);
39+
} else {
40+
wl->DoTransaction(*db);
41+
}
42+
ops++;
4843
}
44+
45+
if (cleanup_db) {
46+
db->Cleanup();
47+
}
48+
49+
latch->CountDown();
50+
return ops;
51+
} catch (const utils::Exception &e) {
52+
std::cerr << "Caught exception: " << e.what() << std::endl;
53+
exit(1);
54+
}
4955
}
5056

5157
} // ycsbc

core/ycsbc.cc

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
#include "db_factory.h"
2323
#include "measurements.h"
2424
#include "utils/countdown_latch.h"
25+
#include "utils/rate_limit.h"
2526
#include "utils/timer.h"
2627
#include "utils/utils.h"
2728

2829
void UsageMessage(const char *command);
2930
bool StrStartWith(const char *str, const char *pre);
3031
void ParseCommandLine(int argc, const char *argv[], ycsbc::utils::Properties &props);
3132

32-
void StatusThread(ycsbc::Measurements *measurements, CountDownLatch *latch, int interval) {
3333
void StatusThread(ycsbc::Measurements *measurements, ycsbc::utils::CountDownLatch *latch, int interval) {
3434
using namespace std::chrono;
3535
time_point<system_clock> start = system_clock::now();
@@ -51,6 +51,39 @@ void StatusThread(ycsbc::Measurements *measurements, ycsbc::utils::CountDownLatc
5151
};
5252
}
5353

54+
void RateLimitThread(std::string rate_file, std::vector<ycsbc::utils::RateLimiter *> rate_limiters,
55+
ycsbc::utils::CountDownLatch *latch) {
56+
std::ifstream ifs;
57+
ifs.open(rate_file);
58+
59+
if (!ifs.is_open()) {
60+
ycsbc::utils::Exception("failed to open: " + rate_file);
61+
}
62+
63+
int64_t num_threads = rate_limiters.size();
64+
65+
int64_t last_time = 0;
66+
while (!ifs.eof()) {
67+
int64_t next_time;
68+
int64_t next_rate;
69+
ifs >> next_time >> next_rate;
70+
71+
if (next_time <= last_time) {
72+
ycsbc::utils::Exception("invalid rate file");
73+
}
74+
75+
bool done = latch->AwaitFor(next_time - last_time);
76+
if (done) {
77+
break;
78+
}
79+
last_time = next_time;
80+
81+
for (auto x : rate_limiters) {
82+
x->SetRate(next_rate / num_threads);
83+
}
84+
}
85+
}
86+
5487
int main(const int argc, const char *argv[]) {
5588
ycsbc::utils::Properties props;
5689
ParseCommandLine(argc, argv, props);
@@ -83,6 +116,7 @@ int main(const int argc, const char *argv[]) {
83116
ycsbc::CoreWorkload wl;
84117
wl.Init(props);
85118

119+
// print status periodically
86120
const bool show_status = (props.GetProperty("status", "false") == "true");
87121
const int status_interval = std::stoi(props.GetProperty("status.interval", "10"));
88122

@@ -105,8 +139,9 @@ int main(const int argc, const char *argv[]) {
105139
if (i < total_ops % num_threads) {
106140
thread_ops++;
107141
}
142+
108143
client_threads.emplace_back(std::async(std::launch::async, ycsbc::ClientThread, dbs[i], &wl,
109-
thread_ops, true, true, !do_transaction, &latch));
144+
thread_ops, true, true, !do_transaction, &latch, nullptr));
110145
}
111146
assert((int)client_threads.size() == num_threads);
112147

@@ -129,8 +164,14 @@ int main(const int argc, const char *argv[]) {
129164
measurements->Reset();
130165
std::this_thread::sleep_for(std::chrono::seconds(stoi(props.GetProperty("sleepafterload", "0"))));
131166

167+
132168
// transaction phase
133169
if (do_transaction) {
170+
// initial ops per second, unlimited if <= 0
171+
const int64_t ops_limit = std::stoi(props.GetProperty("limit.ops", "0"));
172+
// rate file path for dynamic rate limiting, format "time_stamp_sec new_ops_per_second" per line
173+
std::string rate_file = props.GetProperty("limit.file", "");
174+
134175
const int total_ops = stoi(props[ycsbc::CoreWorkload::OPERATION_COUNT_PROPERTY]);
135176

136177
ycsbc::utils::CountDownLatch latch(num_threads);
@@ -143,14 +184,27 @@ int main(const int argc, const char *argv[]) {
143184
measurements, &latch, status_interval);
144185
}
145186
std::vector<std::future<int>> client_threads;
187+
std::vector<ycsbc::utils::RateLimiter *> rate_limiters;
146188
for (int i = 0; i < num_threads; ++i) {
147189
int thread_ops = total_ops / num_threads;
148190
if (i < total_ops % num_threads) {
149191
thread_ops++;
150192
}
193+
ycsbc::utils::RateLimiter *rlim = nullptr;
194+
if (ops_limit > 0 || rate_file != "") {
195+
int64_t per_thread_ops = ops_limit / num_threads;
196+
rlim = new ycsbc::utils::RateLimiter(per_thread_ops, per_thread_ops);
197+
}
198+
rate_limiters.push_back(rlim);
151199
client_threads.emplace_back(std::async(std::launch::async, ycsbc::ClientThread, dbs[i], &wl,
152-
thread_ops, false, !do_load, true, &latch));
200+
thread_ops, false, !do_load, true, &latch, rlim));
153201
}
202+
203+
std::future<void> rlim_future;
204+
if (rate_file != "") {
205+
rlim_future = std::async(std::launch::async, RateLimitThread, rate_file, rate_limiters, &latch);
206+
}
207+
154208
assert((int)client_threads.size() == num_threads);
155209

156210
int sum = 0;

utils/rate_limit.h

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
//
2+
// rate_limit.h
3+
// YCSB-cpp
4+
//
5+
// Copyright (c) 2023 Youngjae Lee <ls4154.lee@gmail.com>.
6+
//
7+
8+
#ifndef YCSB_C_RATE_LIMIT_H_
9+
#define YCSB_C_RATE_LIMIT_H_
10+
11+
#include <algorithm>
12+
#include <chrono>
13+
#include <cstdio>
14+
#include <mutex>
15+
#include <ratio>
16+
#include <thread>
17+
18+
namespace ycsbc {
19+
20+
namespace utils {
21+
22+
// Token bucket rate limiter for single client
23+
class RateLimiter {
24+
public:
25+
RateLimiter(int64_t r, int64_t b) : r_(r * TOKEN_PRECISION), b_(b * TOKEN_PRECISION), tokens_(0), last_(Clock::now()) {}
26+
27+
inline void Consume(int64_t n) {
28+
std::unique_lock<std::mutex> lock(mutex_);
29+
30+
if (r_ <= 0) {
31+
return;
32+
}
33+
34+
// refill tokens
35+
auto now = Clock::now();
36+
auto diff = std::chrono::duration_cast<Duration>(now - last_);
37+
tokens_ = std::min(b_, tokens_ + diff.count() * r_ / 1000000000);
38+
last_ = now;
39+
40+
// check tokens
41+
tokens_ -= n * TOKEN_PRECISION;
42+
43+
// sleep
44+
if (tokens_ < 0) {
45+
lock.unlock();
46+
int64_t wait_time = -tokens_ * 1000000000 / r_;
47+
std::this_thread::sleep_for(std::chrono::nanoseconds(wait_time));
48+
}
49+
}
50+
51+
inline void SetRate(int64_t r) {
52+
std::lock_guard<std::mutex> lock(mutex_);
53+
54+
// refill tokens
55+
auto now = Clock::now();
56+
auto diff = std::chrono::duration_cast<Duration>(now - last_);
57+
tokens_ = std::min(b_, tokens_ + diff.count() * r_ * TOKEN_PRECISION / 1000000000);
58+
last_ = now;
59+
60+
// set rate
61+
r_ = r * TOKEN_PRECISION;
62+
}
63+
64+
private:
65+
using Clock = std::chrono::steady_clock;
66+
using Duration = std::chrono::nanoseconds;
67+
static constexpr int64_t TOKEN_PRECISION = 10000;
68+
69+
std::mutex mutex_;
70+
int64_t r_;
71+
int64_t b_;
72+
int64_t tokens_;
73+
Clock::time_point last_;
74+
};
75+
76+
} // utils
77+
78+
} // ycsbc
79+
80+
#endif // YCSB_C_RATE_LIMIT_H_

0 commit comments

Comments
 (0)