1616#include < future>
1717#include < chrono>
1818#include < iomanip>
19+ #include < sstream>
1920
2021#include " client.h"
2122#include " core_workload.h"
2930void UsageMessage (const char *command);
3031bool StrStartWith (const char *str, const char *pre );
3132void ParseCommandLine (int argc, const char *argv[], ycsbc::utils::Properties &props);
33+ std::vector<std::pair<int64_t , int64_t >> LoadRateSchedule (const std::string &rate_file);
3234
3335void StatusThread (ycsbc::Measurements *measurements, ycsbc::utils::CountDownLatch *latch, int interval) {
3436 using namespace std ::chrono;
@@ -51,27 +53,52 @@ void StatusThread(ycsbc::Measurements *measurements, ycsbc::utils::CountDownLatc
5153 };
5254}
5355
54- void RateLimitThread (std::string rate_file, std::vector<ycsbc::utils::RateLimiter *> rate_limiters,
55- ycsbc::utils::CountDownLatch *latch) {
56- std::ifstream ifs;
57- ifs.open (rate_file);
58-
56+ std::vector<std::pair<int64_t , int64_t >> LoadRateSchedule (const std::string &rate_file) {
57+ std::ifstream ifs (rate_file);
5958 if (!ifs.is_open ()) {
60- ycsbc::utils::Exception (" failed to open: " + rate_file);
59+ throw ycsbc::utils::Exception (" failed to open: " + rate_file);
6160 }
6261
63- int64_t num_threads = rate_limiters.size ();
64-
62+ std::vector<std::pair<int64_t , int64_t >> rate_schedule;
6563 int64_t last_time = 0 ;
66- while (!ifs.eof ()) {
64+ std::string line;
65+ while (std::getline (ifs, line)) {
66+ std::string trimmed = ycsbc::utils::Trim (line);
67+ if (trimmed.empty () || trimmed[0 ] == ' #' ) {
68+ continue ;
69+ }
70+
71+ std::istringstream iss (trimmed);
6772 int64_t next_time;
6873 int64_t next_rate;
69- ifs >> next_time >> next_rate;
70-
74+ std::string trailing;
75+ if (!(iss >> next_time >> next_rate) || (iss >> trailing)) {
76+ throw ycsbc::utils::Exception (" invalid rate file" );
77+ }
7178 if (next_time <= last_time) {
72- ycsbc::utils::Exception (" invalid rate file" );
79+ throw ycsbc::utils::Exception (" invalid rate file" );
7380 }
81+ rate_schedule.emplace_back (next_time, next_rate);
82+ last_time = next_time;
83+ }
84+
85+ if (!ifs.eof () && ifs.fail ()) {
86+ throw ycsbc::utils::Exception (" invalid rate file" );
87+ }
88+ if (rate_schedule.empty ()) {
89+ throw ycsbc::utils::Exception (" invalid rate file" );
90+ }
91+
92+ return rate_schedule;
93+ }
94+
95+ void RateLimitThread (std::vector<std::pair<int64_t , int64_t >> rate_schedule,
96+ std::vector<ycsbc::utils::RateLimiter *> rate_limiters,
97+ ycsbc::utils::CountDownLatch *latch) {
98+ int64_t num_threads = rate_limiters.size ();
7499
100+ int64_t last_time = 0 ;
101+ for (const auto &[next_time, next_rate] : rate_schedule) {
75102 bool done = latch->AwaitFor (next_time - last_time);
76103 if (done) {
77104 break ;
@@ -171,6 +198,16 @@ int main(const int argc, const char *argv[]) {
171198 const int64_t ops_limit = std::stoi (props.GetProperty (" limit.ops" , " 0" ));
172199 // rate file path for dynamic rate limiting, format "time_stamp_sec new_ops_per_second" per line
173200 std::string rate_file = props.GetProperty (" limit.file" , " " );
201+ std::vector<std::pair<int64_t , int64_t >> rate_schedule;
202+ if (rate_file != " " ) {
203+ try {
204+ rate_schedule = LoadRateSchedule (rate_file);
205+ } catch (const std::exception &e) {
206+ std::cerr << " Failed to load rate schedule from " << rate_file << " : "
207+ << e.what () << std::endl;
208+ exit (1 );
209+ }
210+ }
174211
175212 const int total_ops = stoi (props[ycsbc::CoreWorkload::OPERATION_COUNT_PROPERTY]);
176213
@@ -202,7 +239,7 @@ int main(const int argc, const char *argv[]) {
202239
203240 std::future<void > rlim_future;
204241 if (rate_file != " " ) {
205- rlim_future = std::async (std::launch::async, RateLimitThread, rate_file , rate_limiters, &latch);
242+ rlim_future = std::async (std::launch::async, RateLimitThread, rate_schedule , rate_limiters, &latch);
206243 }
207244
208245 assert ((int )client_threads.size () == num_threads);
@@ -214,6 +251,10 @@ int main(const int argc, const char *argv[]) {
214251 }
215252 double runtime = timer.End ();
216253
254+ if (rate_file != " " ) {
255+ rlim_future.get ();
256+ }
257+
217258 if (show_status) {
218259 status_future.wait ();
219260 }
@@ -266,9 +307,10 @@ void ParseCommandLine(int argc, const char *argv[], ycsbc::utils::Properties &pr
266307 std::ifstream input (argv[argindex]);
267308 try {
268309 props.Load (input);
269- } catch (const std::string &message) {
270- std::cerr << message << std::endl;
271- exit (0 );
310+ } catch (const std::exception &e) {
311+ std::cerr << " Failed to load properties from " << filename << " : "
312+ << e.what () << std::endl;
313+ exit (1 );
272314 }
273315 input.close ();
274316 argindex++;
@@ -326,4 +368,3 @@ void UsageMessage(const char *command) {
326368inline bool StrStartWith (const char *str, const char *pre ) {
327369 return strncmp (str, pre , strlen (pre )) == 0 ;
328370}
329-
0 commit comments