Skip to content

Commit 9a115c9

Browse files
author
Kuankuan Guo
committed
feat: add worker idle task's init and register
- Allow application init and register new logics - The init function will only runs once
1 parent 2635ef6 commit 9a115c9

7 files changed

Lines changed: 554 additions & 4 deletions

File tree

src/bthread/bthread.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "bthread/timer_thread.h"
3131
#include "bthread/list_of_abafree_id.h"
3232
#include "bthread/bthread.h"
33+
#include "bthread/worker_idle.h"
3334

3435
namespace bthread {
3536
extern void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
@@ -597,6 +598,17 @@ int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)) {
597598
return 0;
598599
}
599600

601+
int bthread_register_worker_idle_function(int (*init_fn)(void),
602+
bool (*idle_fn)(void),
603+
uint64_t timeout_us,
604+
int* handle) {
605+
return bthread::register_worker_idle_function(init_fn, idle_fn, timeout_us, handle);
606+
}
607+
608+
int bthread_unregister_worker_idle_function(int handle) {
609+
return bthread::unregister_worker_idle_function(handle);
610+
}
611+
600612
int bthread_set_create_span_func(void* (*func)()) {
601613
if (func == NULL) {
602614
return EINVAL;

src/bthread/parking_lot.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,15 @@ class BAIDU_CACHELINE_ALIGNMENT ParkingLot {
6464

6565
// Wait for tasks.
6666
// If the `expected_state' does not match, wait() may finish directly.
67-
void wait(const State& expected_state) {
67+
void wait(const State& expected_state, const timespec* timeout = NULL) {
6868
if (get_state().val != expected_state.val) {
6969
// Fast path, no need to futex_wait.
7070
return;
7171
}
7272
if (_no_signal_when_no_waiter) {
7373
_waiter_num.fetch_add(1, butil::memory_order_relaxed);
7474
}
75-
futex_wait_private(&_pending_signal, expected_state.val, NULL);
75+
futex_wait_private(&_pending_signal, expected_state.val, timeout);
7676
if (_no_signal_when_no_waiter) {
7777
_waiter_num.fetch_sub(1, butil::memory_order_relaxed);
7878
}

src/bthread/task_group.cpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "bthread/task_group.h"
3838
#include "bthread/timer_thread.h"
3939
#include "bthread/bthread.h"
40+
#include "bthread/worker_idle.h"
4041

4142
#ifdef __x86_64__
4243
#include <x86intrin.h>
@@ -167,7 +168,19 @@ bool TaskGroup::wait_task(bthread_t* tid) {
167168
if (_last_pl_state.stopped()) {
168169
return false;
169170
}
170-
_pl->wait(_last_pl_state);
171+
if (has_worker_idle_functions()) {
172+
if (run_worker_idle_functions()) {
173+
return true;
174+
}
175+
const timespec timeout = get_worker_idle_timeout();
176+
if (timeout.tv_sec == 0 && timeout.tv_nsec == 0) {
177+
_pl->wait(_last_pl_state);
178+
} else {
179+
_pl->wait(_last_pl_state, &timeout);
180+
}
181+
} else {
182+
_pl->wait(_last_pl_state);
183+
}
171184
if (steal_task(tid)) {
172185
return true;
173186
}
@@ -179,7 +192,19 @@ bool TaskGroup::wait_task(bthread_t* tid) {
179192
if (steal_task(tid)) {
180193
return true;
181194
}
182-
_pl->wait(st);
195+
if (has_worker_idle_functions()) {
196+
if (run_worker_idle_functions()) {
197+
return true;
198+
}
199+
const timespec timeout = get_worker_idle_timeout();
200+
if (timeout.tv_sec == 0 && timeout.tv_nsec == 0) {
201+
_pl->wait(st);
202+
} else {
203+
_pl->wait(st, &timeout);
204+
}
205+
} else {
206+
_pl->wait(st);
207+
}
183208
#endif
184209
} while (true);
185210
}

src/bthread/unstable.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,46 @@ extern int bthread_set_worker_startfn(void (*start_fn)());
9292
// Add a startup function with tag
9393
extern int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t));
9494

95+
// Registers a per-worker init function and an idle function.
96+
//
97+
// The init function is called at most once per worker thread, before the first
98+
// invocation of idle_fn in that worker.
99+
//
100+
// The idle function is called when a worker has no task to run. If idle_fn
101+
// returns true, it means some work is done and the worker should check the
102+
// runqueue again immediately. If no registered idle function reports work, the
103+
// worker waits for at most the minimal timeout among registered functions
104+
// before trying again.
105+
//
106+
// This function is thread-safe.
107+
//
108+
// Args:
109+
// init_fn: Optional. Called once per worker thread. Return 0 on success. A
110+
// non-zero return value disables idle_fn for that worker thread.
111+
// idle_fn: Required. Must not be NULL. Return true if any work is done.
112+
// timeout_us: Required. Must be > 0. Maximum waiting time when worker is idle.
113+
// handle: Optional output. On success, set to a positive handle for later
114+
// unregistration.
115+
//
116+
// Returns:
117+
// 0 on success, error code otherwise.
118+
extern int bthread_register_worker_idle_function(int (*init_fn)(void),
119+
bool (*idle_fn)(void),
120+
uint64_t timeout_us,
121+
int* handle);
122+
123+
// Unregisters an idle function by handle returned by
124+
// bthread_register_worker_idle_function().
125+
//
126+
// This function is thread-safe.
127+
//
128+
// Args:
129+
// handle: Handle returned by bthread_register_worker_idle_function().
130+
//
131+
// Returns:
132+
// 0 on success, error code otherwise.
133+
extern int bthread_unregister_worker_idle_function(int handle);
134+
95135
// Add a create span function
96136
extern int bthread_set_create_span_func(void* (*func)());
97137

src/bthread/worker_idle.cpp

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "bthread/worker_idle.h"
19+
20+
#include <errno.h>
21+
22+
#include <algorithm>
23+
#include <new>
24+
#include <vector>
25+
26+
#include "butil/atomicops.h"
27+
#include "butil/containers/doubly_buffered_data.h"
28+
#include "butil/thread_local.h"
29+
30+
namespace bthread {
31+
namespace {
32+
33+
enum InitState : uint8_t {
34+
INIT_STATE_NOT_RUN = 0,
35+
INIT_STATE_OK = 1,
36+
INIT_STATE_FAILED = 2,
37+
};
38+
39+
struct WorkerIdleEntry {
40+
int id;
41+
int (*init_fn)(void);
42+
bool (*idle_fn)(void);
43+
uint64_t timeout_us;
44+
};
45+
46+
typedef std::vector<WorkerIdleEntry> WorkerIdleEntryList;
47+
48+
static butil::DoublyBufferedData<WorkerIdleEntryList, butil::Void, true> g_entries;
49+
static butil::atomic<int> g_next_id(1);
50+
51+
struct WorkerIdleTLS {
52+
std::vector<uint8_t> init_states;
53+
};
54+
55+
BAIDU_THREAD_LOCAL WorkerIdleTLS* tls_worker_idle = NULL;
56+
57+
static WorkerIdleTLS* get_or_create_tls() {
58+
if (tls_worker_idle) {
59+
return tls_worker_idle;
60+
}
61+
tls_worker_idle = new (std::nothrow) WorkerIdleTLS;
62+
return tls_worker_idle;
63+
}
64+
65+
static timespec to_timespec(uint64_t timeout_us) {
66+
timespec ts;
67+
ts.tv_sec = timeout_us / 1000000;
68+
ts.tv_nsec = (timeout_us % 1000000) * 1000;
69+
return ts;
70+
}
71+
72+
} // namespace
73+
74+
int register_worker_idle_function(int (*init_fn)(void),
75+
bool (*idle_fn)(void),
76+
uint64_t timeout_us,
77+
int* handle) {
78+
if (idle_fn == NULL) {
79+
return EINVAL;
80+
}
81+
if (timeout_us == 0) {
82+
return EINVAL;
83+
}
84+
const int id = g_next_id.fetch_add(1, butil::memory_order_relaxed);
85+
WorkerIdleEntry e;
86+
e.id = id;
87+
e.init_fn = init_fn;
88+
e.idle_fn = idle_fn;
89+
e.timeout_us = timeout_us;
90+
g_entries.Modify([&](WorkerIdleEntryList& bg) {
91+
bg.push_back(e);
92+
return static_cast<size_t>(1);
93+
});
94+
if (handle) {
95+
*handle = id;
96+
}
97+
return 0;
98+
}
99+
100+
int unregister_worker_idle_function(int handle) {
101+
if (handle <= 0) {
102+
return EINVAL;
103+
}
104+
size_t removed = g_entries.Modify([&](WorkerIdleEntryList& bg) {
105+
const size_t old_size = bg.size();
106+
bg.erase(std::remove_if(bg.begin(), bg.end(),
107+
[&](const WorkerIdleEntry& e) {
108+
return e.id == handle;
109+
}),
110+
bg.end());
111+
return old_size - bg.size();
112+
});
113+
return removed ? 0 : EINVAL;
114+
}
115+
116+
bool has_worker_idle_functions() {
117+
butil::DoublyBufferedData<WorkerIdleEntryList, butil::Void, true>::ScopedPtr p;
118+
if (g_entries.Read(&p) != 0) {
119+
return false;
120+
}
121+
return !p->empty();
122+
}
123+
124+
bool run_worker_idle_functions() {
125+
butil::DoublyBufferedData<WorkerIdleEntryList, butil::Void, true>::ScopedPtr p;
126+
if (g_entries.Read(&p) != 0) {
127+
return false;
128+
}
129+
if (p->empty()) {
130+
return false;
131+
}
132+
133+
WorkerIdleTLS* tls = get_or_create_tls();
134+
if (tls == NULL) {
135+
return false;
136+
}
137+
138+
bool did_work = false;
139+
for (const auto& e : *p) {
140+
if (e.id <= 0 || e.idle_fn == NULL) {
141+
continue;
142+
}
143+
if (tls->init_states.size() <= static_cast<size_t>(e.id)) {
144+
tls->init_states.resize(static_cast<size_t>(e.id) + 1, INIT_STATE_NOT_RUN);
145+
}
146+
uint8_t& st = tls->init_states[static_cast<size_t>(e.id)];
147+
if (st == INIT_STATE_NOT_RUN) {
148+
// Run the init callback function once.
149+
if (e.init_fn) {
150+
const int rc = e.init_fn();
151+
st = (rc == 0) ? INIT_STATE_OK : INIT_STATE_FAILED;
152+
} else {
153+
st = INIT_STATE_OK;
154+
}
155+
}
156+
if (st != INIT_STATE_OK) {
157+
continue;
158+
}
159+
// Run the idle callback function.
160+
did_work |= e.idle_fn();
161+
}
162+
return did_work;
163+
}
164+
165+
timespec get_worker_idle_timeout() {
166+
butil::DoublyBufferedData<WorkerIdleEntryList, butil::Void, true>::ScopedPtr p;
167+
if (g_entries.Read(&p) != 0) {
168+
return {0, 0};
169+
}
170+
if (p->empty()) {
171+
return {0, 0};
172+
}
173+
uint64_t min_us = 0;
174+
for (const auto& e : *p) {
175+
if (e.timeout_us == 0) {
176+
continue;
177+
}
178+
if (min_us == 0 || e.timeout_us < min_us) {
179+
min_us = e.timeout_us;
180+
}
181+
}
182+
if (min_us == 0) {
183+
return {0, 0};
184+
}
185+
return to_timespec(min_us);
186+
}
187+
188+
} // namespace bthread
189+
190+

0 commit comments

Comments
 (0)