Skip to content

Commit 05ec537

Browse files
authored
Bugfix: bthread_worker_usage could exceed bthread_worker_count (#3009)
* Bugfix: bthread_worker_usage would be greater than bthread_worker_count * Use CPU atomic 128-bit aligned loads and stores * Encapsulating AtomicInteger128 class * Remove unused header files
1 parent 1ff5f3f commit 05ec537

9 files changed

Lines changed: 240 additions & 82 deletions

src/brpc/load_balancer.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -184,13 +184,6 @@ inline Extension<const LoadBalancer>* LoadBalancerExtension() {
184184
return Extension<const LoadBalancer>::instance();
185185
}
186186

187-
inline uint32_t GenRandomStride() {
188-
uint32_t prime_offset[] = {
189-
#include "bthread/offset_inl.list"
190-
};
191-
return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
192-
}
193-
194187
} // namespace brpc
195188

196189

src/brpc/policy/randomized_load_balancer.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include "butil/macros.h"
2020
#include "butil/fast_rand.h"
21+
#include "bthread/prime_offset.h"
2122
#include "brpc/socket.h"
2223
#include "brpc/policy/randomized_load_balancer.h"
2324
#include "butil/strings/string_number_conversions.h"
@@ -118,7 +119,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
118119
return 0;
119120
}
120121
if (stride == 0) {
121-
stride = GenRandomStride();
122+
stride = bthread::prime_offset();
122123
}
123124
// If `Address' failed, use `offset+stride' to retry so that
124125
// this failed server won't be visited again inside for

src/brpc/policy/round_robin_load_balancer.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include "butil/macros.h"
2020
#include "butil/fast_rand.h"
21+
#include "bthread/prime_offset.h"
2122
#include "brpc/socket.h"
2223
#include "brpc/policy/round_robin_load_balancer.h"
2324

@@ -108,7 +109,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) {
108109
}
109110
TLS tls = s.tls();
110111
if (tls.stride == 0) {
111-
tls.stride = GenRandomStride();
112+
tls.stride = bthread::prime_offset();
112113
// use random at first time, for the case of
113114
// use rr lb every time in new thread
114115
tls.offset = butil::fast_rand_less_than(n);

src/brpc/policy/weighted_randomized_load_balancer.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <algorithm>
2020

2121
#include "butil/fast_rand.h"
22+
#include "bthread/prime_offset.h"
2223
#include "brpc/socket.h"
2324
#include "brpc/policy/weighted_randomized_load_balancer.h"
2425
#include "butil/strings/string_number_conversions.h"
@@ -152,7 +153,7 @@ int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
152153
if (random_traversed.size() == n) {
153154
// Try to traverse the remaining servers to find an available server.
154155
uint32_t offset = butil::fast_rand_less_than(n);
155-
uint32_t stride = GenRandomStride();
156+
uint32_t stride = bthread::prime_offset();
156157
for (size_t i = 0; i < n; ++i) {
157158
offset = (offset + stride) % n;
158159
SocketId id = s->server_list[offset].id;

src/bthread/prime_offset.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#ifndef BTHREAD_PRIME_OFFSET_H
19+
#define BTHREAD_PRIME_OFFSET_H
20+
21+
#include "butil/fast_rand.h"
22+
#include "butil/macros.h"
23+
24+
namespace bthread {
25+
// Prime number offset for hash function.
26+
inline size_t prime_offset(size_t seed) {
27+
uint32_t offsets[] = {
28+
#include "bthread/offset_inl.list"
29+
};
30+
return offsets[seed % ARRAY_SIZE(offsets)];
31+
}
32+
33+
inline size_t prime_offset() {
34+
return prime_offset(butil::fast_rand());
35+
}
36+
}
37+
38+
39+
#endif // BTHREAD_PRIME_OFFSET_H

src/bthread/task_control.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ static double get_cumulated_worker_time_from_this_with_tag(void* arg) {
152152
auto a = static_cast<CumulatedWithTagArgs*>(arg);
153153
auto c = a->c;
154154
auto t = a->t;
155-
return c->get_cumulated_worker_time_with_tag(t);
155+
return c->get_cumulated_worker_time(t);
156156
}
157157

158158
static int64_t get_cumulated_switch_count_from_this(void *arg) {
@@ -526,22 +526,18 @@ double TaskControl::get_cumulated_worker_time() {
526526
int64_t cputime_ns = 0;
527527
BAIDU_SCOPED_LOCK(_modify_group_mutex);
528528
for_each_task_group([&](TaskGroup* g) {
529-
if (g) {
530-
cputime_ns += g->_cumulated_cputime_ns;
531-
}
529+
cputime_ns += g->cumulated_cputime_ns();
532530
});
533531
return cputime_ns / 1000000000.0;
534532
}
535533

536-
double TaskControl::get_cumulated_worker_time_with_tag(bthread_tag_t tag) {
534+
double TaskControl::get_cumulated_worker_time(bthread_tag_t tag) {
537535
int64_t cputime_ns = 0;
538536
BAIDU_SCOPED_LOCK(_modify_group_mutex);
539537
const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed);
540538
auto& groups = tag_group(tag);
541539
for (size_t i = 0; i < ngroup; ++i) {
542-
if (groups[i]) {
543-
cputime_ns += groups[i]->_cumulated_cputime_ns;
544-
}
540+
cputime_ns += groups[i]->cumulated_cputime_ns();
545541
}
546542
return cputime_ns / 1000000000.0;
547543
}

src/bthread/task_control.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ friend bthread_t init_for_pthread_stack_trace();
7979
void print_rq_sizes(std::ostream& os);
8080

8181
double get_cumulated_worker_time();
82-
double get_cumulated_worker_time_with_tag(bthread_tag_t tag);
82+
double get_cumulated_worker_time(bthread_tag_t tag);
8383
int64_t get_cumulated_switch_count();
8484
int64_t get_cumulated_signal_count();
8585

src/bthread/task_group.cpp

Lines changed: 72 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@
3737
#include "bthread/task_group.h"
3838
#include "bthread/timer_thread.h"
3939

40+
#ifdef __x86_64__
41+
#include <x86intrin.h>
42+
#endif // __x86_64__
43+
44+
#ifdef __ARM_NEON
45+
#include <arm_neon.h>
46+
#endif // __ARM_NEON
47+
4048
namespace bthread {
4149

4250
static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = {
@@ -69,10 +77,6 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, NULL);
6977

7078
const TaskStatistics EMPTY_STAT = { 0, 0, 0 };
7179

72-
const size_t OFFSET_TABLE[] = {
73-
#include "bthread/offset_inl.list"
74-
};
75-
7680
void* (*g_create_span_func)() = NULL;
7781

7882
void* run_create_span_func() {
@@ -82,6 +86,39 @@ void* run_create_span_func() {
8286
return BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls).rpcz_parent_span;
8387
}
8488

89+
AtomicInteger128::Value AtomicInteger128::load() const {
90+
#if __x86_64__ || __ARM_NEON
91+
// Supress compiler warning.
92+
(void)_mutex;
93+
#endif // __x86_64__ || __ARM_NEON
94+
95+
#if __x86_64__ || __ARM_NEON
96+
#ifdef __x86_64__
97+
__m128i value = _mm_load_si128(reinterpret_cast<const __m128i*>(&_value));
98+
#else // __ARM_NEON
99+
int64x2_t value = vld1q_s64(reinterpret_cast<const int64_t*>(&_value));
100+
#endif // __x86_64__
101+
return {value[0], value[1]};
102+
#else // __x86_64__ || __ARM_NEON
103+
BAIDU_SCOPED_LOCK(_mutex);
104+
return _value;
105+
#endif // __x86_64__ || __ARM_NEON
106+
}
107+
108+
void AtomicInteger128::store(Value value) {
109+
#if __x86_64__
110+
__m128i v = _mm_load_si128(reinterpret_cast<__m128i*>(&value));
111+
_mm_store_si128(reinterpret_cast<__m128i*>(&_value), v);
112+
#elif __ARM_NEON
113+
int64x2_t v = vld1q_s64(reinterpret_cast<int64_t*>(&value));
114+
vst1q_s64(reinterpret_cast<int64_t*>(&_value), v);
115+
#else
116+
BAIDU_SCOPED_LOCK(_mutex);
117+
_value = value;
118+
#endif // __x86_64__ || __ARM_NEON
119+
}
120+
121+
85122
int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) {
86123
TaskMeta* const m = address_meta(tid);
87124
if (m != NULL) {
@@ -148,6 +185,16 @@ static double get_cumulated_cputime_from_this(void* arg) {
148185
return static_cast<TaskGroup*>(arg)->cumulated_cputime_ns() / 1000000000.0;
149186
}
150187

188+
int64_t TaskGroup::cumulated_cputime_ns() const {
189+
CPUTimeStat cpu_time_stat = _cpu_time_stat.load();
190+
// Add the elapsed time of running bthread.
191+
int64_t cumulated_cputime_ns = cpu_time_stat.cumulated_cputime_ns();
192+
if (!cpu_time_stat.is_main_task()) {
193+
cumulated_cputime_ns += butil::cpuwide_time_ns() - cpu_time_stat.last_run_ns();
194+
}
195+
return cumulated_cputime_ns;
196+
}
197+
151198
void TaskGroup::run_main_task() {
152199
bvar::PassiveStatus<double> cumulated_cputime(
153200
get_cumulated_cputime_from_this, this);
@@ -156,11 +203,11 @@ void TaskGroup::run_main_task() {
156203
TaskGroup* dummy = this;
157204
bthread_t tid;
158205
while (wait_task(&tid)) {
159-
TaskGroup::sched_to(&dummy, tid);
206+
sched_to(&dummy, tid);
160207
DCHECK_EQ(this, dummy);
161208
DCHECK_EQ(_cur_meta->stack, _main_stack);
162209
if (_cur_meta->tid != _main_tid) {
163-
TaskGroup::task_runner(1/*skip remained*/);
210+
task_runner(1/*skip remained*/);
164211
}
165212
if (FLAGS_show_per_worker_usage_in_vars && !usage_bvar) {
166213
char name[32];
@@ -176,31 +223,12 @@ void TaskGroup::run_main_task() {
176223
}
177224
}
178225
// Don't forget to add elapse of last wait_task.
179-
current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns;
226+
current_task()->stat.cputime_ns +=
227+
butil::cpuwide_time_ns() - _cpu_time_stat.load_unsafe().last_run_ns();
180228
}
181229

182230
TaskGroup::TaskGroup(TaskControl* c)
183-
: _cur_meta(NULL)
184-
, _control(c)
185-
, _num_nosignal(0)
186-
, _nsignaled(0)
187-
, _last_run_ns(butil::cpuwide_time_ns())
188-
, _cumulated_cputime_ns(0)
189-
, _nswitch(0)
190-
, _last_context_remained(NULL)
191-
, _last_context_remained_arg(NULL)
192-
, _pl(NULL)
193-
, _main_stack(NULL)
194-
, _main_tid(0)
195-
, _remote_num_nosignal(0)
196-
, _remote_nsignaled(0)
197-
#ifndef NDEBUG
198-
, _sched_recursive_guard(0)
199-
#endif
200-
, _tag(BTHREAD_TAG_DEFAULT)
201-
, _tid(-1) {
202-
_steal_seed = butil::fast_rand();
203-
_steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)];
231+
: _control(c) {
204232
CHECK(c);
205233
}
206234

@@ -292,8 +320,12 @@ int TaskGroup::init(size_t runqueue_capacity) {
292320
_cur_meta = m;
293321
_main_tid = m->tid;
294322
_main_stack = stk;
295-
_last_run_ns = butil::cpuwide_time_ns();
323+
324+
CPUTimeStat cpu_time_stat;
325+
cpu_time_stat.set_last_run_ns(m->cpuwide_start_ns, true);
326+
_cpu_time_stat.store(cpu_time_stat);
296327
_last_cpu_clock_ns = 0;
328+
297329
return 0;
298330
}
299331

@@ -414,7 +446,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
414446

415447
g->_control->_nbthreads << -1;
416448
g->_control->tag_nbthreads(g->tag()) << -1;
417-
g->set_remained(TaskGroup::_release_last_context, m);
449+
g->set_remained(_release_last_context, m);
418450
ending_sched(&g);
419451

420452
} while (g->_cur_meta->tid != g->_main_tid);
@@ -491,9 +523,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
491523
fn = ready_to_run_in_worker;
492524
}
493525
ReadyToRunArgs args = {
494-
g->tag(),
495-
g->_cur_meta,
496-
(bool)(using_attr.flags & BTHREAD_NOSIGNAL)
526+
g->tag(), g->_cur_meta, (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
497527
};
498528
g->set_remained(fn, &args);
499529
sched_to(pg, m->tid);
@@ -678,14 +708,18 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) {
678708
}
679709
#endif
680710
// Save errno so that errno is bthread-specific.
681-
const int saved_errno = errno;
711+
int saved_errno = errno;
682712
void* saved_unique_user_ptr = tls_unique_user_ptr;
683713

684714
TaskMeta* const cur_meta = g->_cur_meta;
685-
const int64_t now = butil::cpuwide_time_ns();
686-
const int64_t elp_ns = now - g->_last_run_ns;
687-
g->_last_run_ns = now;
715+
int64_t now = butil::cpuwide_time_ns();
716+
CPUTimeStat cpu_time_stat = g->_cpu_time_stat.load_unsafe();
717+
int64_t elp_ns = now - cpu_time_stat.last_run_ns();
688718
cur_meta->stat.cputime_ns += elp_ns;
719+
// Update cpu_time_stat.
720+
cpu_time_stat.set_last_run_ns(now, is_main_task(g, next_meta->tid));
721+
cpu_time_stat.add_cumulated_cputime_ns(elp_ns, is_main_task(g, cur_meta->tid));
722+
g->_cpu_time_stat.store(cpu_time_stat);
689723

690724
if (FLAGS_bthread_enable_cpu_clock_stat) {
691725
const int64_t cpu_thread_time = butil::cputhread_time_ns();
@@ -696,10 +730,7 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) {
696730
} else {
697731
g->_last_cpu_clock_ns = 0;
698732
}
699-
700-
if (cur_meta->tid != g->main_tid()) {
701-
g->_cumulated_cputime_ns += elp_ns;
702-
}
733+
703734
++cur_meta->stat.nswitch;
704735
++ g->_nswitch;
705736
// Switch to the task

0 commit comments

Comments
 (0)