Skip to content

Commit 4edc623

Browse files
Preemptive cancellation of TaskContext jobs
Tasks could be stuck on the cancellation, if task queue is already handling IO bound tasks: usual example is bad network connection, which forces tasks to wait for the timeout. This prevents tasks from being cancelled in time, and clients forced to wait, which in turn may cause ANR in unexpected places. - Enable pre-execution cancellation in TaskContext by registering a cancellation token that invokes PreExecuteCancel() - Schedule pre-execution cancellation on the separate cancellation lane in the task scheduler when it is available. Relates-To: HERESDK-12253 Signed-off-by: Mykhailo Diachenko <ext-mykhailo.z.diachenko@here.com>
1 parent b400ee0 commit 4edc623

File tree

13 files changed

+501
-66
lines changed

13 files changed

+501
-66
lines changed

olp-cpp-sdk-core/include/olp/core/client/CancellationContext.inl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#pragma once
2121

22+
#include <utility>
2223
namespace olp {
2324
namespace client {
2425

@@ -58,9 +59,10 @@ inline void CancellationContext::CancelOperation() {
5859
return;
5960
}
6061

61-
impl_->sub_operation_cancel_token_.Cancel();
62-
impl_->sub_operation_cancel_token_ = CancellationToken();
62+
auto token = CancellationToken();
63+
std::swap(token, impl_->sub_operation_cancel_token_);
6364
impl_->is_cancelled_ = true;
65+
token.Cancel();
6466
}
6567

6668
inline bool CancellationContext::IsCancelled() const {

olp-cpp-sdk-core/include/olp/core/client/OlpClientSettingsFactory.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,17 @@ class CORE_API OlpClientSettingsFactory final {
4747
* operations.
4848
*
4949
* Defaulted to `olp::thread::ThreadPoolTaskScheduler` with one worker
50-
* thread spawned by default.
50+
* thread spawned by default. The default scheduler can optionally create a
51+
* dedicated cancellation lane backed by an extra worker thread.
52+
*
53+
* @param[in] thread_count The number of regular worker threads.
54+
* @param[in] enable_cancellation_lane When true, enables the dedicated
55+
* cancellation lane on the default `ThreadPoolTaskScheduler`.
5156
*
5257
* @return The `TaskScheduler` instance.
5358
*/
5459
static std::unique_ptr<thread::TaskScheduler> CreateDefaultTaskScheduler(
55-
size_t thread_count = 1u);
60+
size_t thread_count = 1u, bool enable_cancellation_lane = false);
5661

5762
/**
5863
* @brief Creates the `Network` instance used for all the non-local requests.

olp-cpp-sdk-core/include/olp/core/client/TaskContext.h

Lines changed: 66 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <olp/core/client/CancellationContext.h>
2929
#include <olp/core/client/CancellationToken.h>
3030
#include <olp/core/client/Condition.h>
31+
#include <olp/core/thread/TaskScheduler.h>
3132

3233
namespace olp {
3334
namespace client {
@@ -56,10 +57,11 @@ class CORE_API TaskContext {
5657
template <typename Exec, typename Callback>
5758
static TaskContext Create(
5859
Exec execute_func, Callback callback,
59-
client::CancellationContext context = client::CancellationContext()) {
60+
client::CancellationContext context = client::CancellationContext(),
61+
std::shared_ptr<thread::TaskScheduler> task_scheduler = nullptr) {
6062
TaskContext task;
6163
task.SetExecutors(std::move(execute_func), std::move(callback),
62-
std::move(context));
64+
std::move(context), std::move(task_scheduler));
6365
return task;
6466
}
6567

@@ -126,9 +128,32 @@ class CORE_API TaskContext {
126128
* @param context The `CancellationContext` instance.
127129
*/
128130
void SetExecutors(Exec execute_func, Callback callback,
129-
client::CancellationContext context) {
130-
impl_ = std::make_shared<TaskContextImpl<ExecResult>>(
131-
std::move(execute_func), std::move(callback), std::move(context));
131+
client::CancellationContext context,
132+
std::shared_ptr<thread::TaskScheduler> task_scheduler) {
133+
auto impl = std::make_shared<TaskContextImpl<ExecResult>>(
134+
std::move(execute_func), std::move(callback), context);
135+
136+
if (task_scheduler) {
137+
std::weak_ptr<TaskContextImpl<ExecResult>> weak_impl = impl;
138+
auto cancellation_scheduler = task_scheduler;
139+
context.ExecuteOrCancelled(
140+
[weak_impl, cancellation_scheduler]() -> CancellationToken {
141+
return CancellationToken([weak_impl, cancellation_scheduler]() {
142+
auto impl = weak_impl.lock();
143+
if (impl && cancellation_scheduler) {
144+
cancellation_scheduler->ScheduleCancellationTask([weak_impl]() {
145+
auto impl = weak_impl.lock();
146+
if (impl) {
147+
impl->PreExecuteCancel();
148+
}
149+
});
150+
return;
151+
}
152+
});
153+
},
154+
[]() {});
155+
}
156+
impl_ = std::move(impl);
132157
}
133158

134159
/**
@@ -249,6 +274,40 @@ class CORE_API TaskContext {
249274
state_.store(State::COMPLETED);
250275
}
251276

277+
void PreExecuteCancel() {
278+
State expected_state = State::PENDING;
279+
280+
if (!state_.compare_exchange_strong(expected_state, State::IN_PROGRESS)) {
281+
return;
282+
}
283+
284+
// Moving the user callback and function guarantee that they are
285+
// executed exactly once
286+
ExecuteFunc function = nullptr;
287+
UserCallback callback = nullptr;
288+
289+
{
290+
std::lock_guard<std::mutex> lock(mutex_);
291+
function = std::move(execute_func_);
292+
callback = std::move(callback_);
293+
}
294+
295+
Response user_response =
296+
client::ApiError(client::ErrorCode::Cancelled, "Cancelled");
297+
298+
if (callback) {
299+
callback(std::move(user_response));
300+
}
301+
302+
// Resources need to be released before the notification, else lambas
303+
// would have captured resources like network or `TaskScheduler`.
304+
function = nullptr;
305+
callback = nullptr;
306+
307+
condition_.Notify();
308+
state_.store(State::COMPLETED);
309+
}
310+
252311
/**
253312
* @brief Cancels the operation and waits for the notification.
254313
*
@@ -330,8 +389,8 @@ struct CORE_API TaskContextHash {
330389
*/
331390
size_t operator()(const TaskContext& task_context) const {
332391
return std::hash<std::shared_ptr<TaskContext::Impl>>()(task_context.impl_);
333-
}
334-
};
392+
} // namespace client
393+
}; // namespace olp
335394

336395
} // namespace client
337396
} // namespace olp

olp-cpp-sdk-core/include/olp/core/thread/TaskScheduler.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,20 @@ class CORE_API TaskScheduler {
7171
EnqueueTask(std::move(func), priority);
7272
}
7373

74+
/**
75+
* @brief Schedules cancellation work.
76+
*
77+
* By default, cancellation work falls back to the regular task queue. Custom
78+
* schedulers can override `EnqueueCancellationTask` to dispatch cancellation
79+
* work differently.
80+
*
81+
* @param[in] func The callable target that should be added to the scheduling
82+
* pipeline for cancellation work.
83+
*/
84+
void ScheduleCancellationTask(CallFuncType&& func) {
85+
EnqueueCancellationTask(std::move(func));
86+
}
87+
7488
/**
7589
* @brief Schedules the asynchronous cancellable task.
7690
*
@@ -136,6 +150,19 @@ class CORE_API TaskScheduler {
136150
OLP_SDK_CORE_UNUSED(priority);
137151
EnqueueTask(std::forward<CallFuncType>(func));
138152
}
153+
154+
/**
155+
* @brief The enqueue cancellation task interface that is implemented by the
156+
* subclass when cancellation work should use a dedicated dispatch path.
157+
*
158+
* By default, cancellation work falls back to the regular task queue.
159+
*
160+
* @param[in] func The rvalue reference of the cancellation task that should
161+
* be enqueued.
162+
*/
163+
virtual void EnqueueCancellationTask(CallFuncType&& func) {
164+
EnqueueTask(std::forward<CallFuncType>(func));
165+
}
139166
};
140167

141168
/**

olp-cpp-sdk-core/include/olp/core/thread/ThreadPoolTaskScheduler.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,21 @@ namespace thread {
3131
* @brief An implementation of the `TaskScheduler` instance that uses a thread
3232
* pool.
3333
*
34+
* The scheduler can optionally expose a dedicated cancellation lane backed by a
35+
* separate worker thread. When disabled, cancellation work falls back to the
36+
* regular task queue.
3437
*/
3538
class CORE_API ThreadPoolTaskScheduler final : public TaskScheduler {
3639
public:
3740
/**
3841
* @brief Creates the `ThreadPoolTaskScheduler` object with one thread.
3942
*
4043
* @param thread_count The number of threads initialized in the thread pool.
44+
* @param enable_cancellation_lane When true, creates a dedicated worker
45+
* thread for cancellation work scheduled via `ScheduleCancellationTask`.
4146
*/
42-
explicit ThreadPoolTaskScheduler(size_t thread_count = 1u);
47+
explicit ThreadPoolTaskScheduler(size_t thread_count = 1u,
48+
bool enable_cancellation_lane = false);
4349

4450
/**
4551
* @brief Closes the `SyncQueue` instance and joins threads.
@@ -81,13 +87,22 @@ class CORE_API ThreadPoolTaskScheduler final : public TaskScheduler {
8187
void EnqueueTask(TaskScheduler::CallFuncType&& func,
8288
uint32_t priority) override;
8389

90+
void EnqueueCancellationTask(TaskScheduler::CallFuncType&& func) override;
91+
8492
private:
8593
class QueueImpl;
94+
class CancellationQueueImpl;
8695

8796
/// Thread pool created in constructor.
8897
std::vector<std::thread> thread_pool_;
8998
/// SyncQueue used to manage tasks.
9099
std::unique_ptr<QueueImpl> queue_;
100+
/// Dedicated cancellation worker thread.
101+
std::thread cancellation_thread_;
102+
/// SyncQueue used to manage cancellation tasks when enabled.
103+
std::unique_ptr<CancellationQueueImpl> cancellation_queue_;
104+
/// Indicates whether the dedicated cancellation lane is enabled.
105+
bool cancellation_lane_enabled_;
91106
};
92107

93108
} // namespace thread

olp-cpp-sdk-core/src/client/OlpClientSettingsFactory.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@ namespace olp {
3737
namespace client {
3838

3939
std::unique_ptr<thread::TaskScheduler>
40-
OlpClientSettingsFactory::CreateDefaultTaskScheduler(size_t thread_count) {
41-
return std::make_unique<thread::ThreadPoolTaskScheduler>(thread_count);
40+
OlpClientSettingsFactory::CreateDefaultTaskScheduler(
41+
size_t thread_count, bool enable_cancellation_lane) {
42+
return std::make_unique<thread::ThreadPoolTaskScheduler>(
43+
thread_count, enable_cancellation_lane);
4244
}
4345

4446
std::shared_ptr<http::Network>

olp-cpp-sdk-core/src/thread/ThreadPoolTaskScheduler.cpp

Lines changed: 79 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ namespace thread {
4242

4343
namespace {
4444
constexpr auto kLogTag = "ThreadPoolTaskScheduler";
45+
constexpr auto kCancellationExecutorName = "OLPSDKCANCEL";
4546

4647
struct PrioritizedTask {
4748
TaskScheduler::CallFuncType function;
@@ -61,6 +62,34 @@ void SetExecutorName(size_t idx) {
6162
OLP_SDK_LOG_INFO_F(kLogTag, "Starting thread '%s'", thread_name.c_str());
6263
}
6364

65+
void SetCancellationExecutorName() {
66+
olp::utils::Thread::SetCurrentThreadName(kCancellationExecutorName);
67+
OLP_SDK_LOG_INFO_F(kLogTag, "Starting thread '%s'",
68+
kCancellationExecutorName);
69+
}
70+
71+
TaskScheduler::CallFuncType WrapWithLogContext(
72+
TaskScheduler::CallFuncType&& func) {
73+
auto log_context = logging::GetContext();
74+
75+
#if __cplusplus >= 201402L
76+
// At least C++14, use generalized lambda capture
77+
return [log_context = std::move(log_context), func = std::move(func)]() {
78+
olp::logging::ScopedLogContext scoped_context(log_context);
79+
func();
80+
};
81+
#else
82+
// C++11 does not support generalized lambda capture :(
83+
return std::bind(
84+
[](std::shared_ptr<const olp::logging::LogContext>& log_context,
85+
TaskScheduler::CallFuncType& func) {
86+
olp::logging::ScopedLogContext scoped_context(log_context);
87+
func();
88+
},
89+
std::move(log_context), std::move(func));
90+
#endif
91+
}
92+
6493
} // namespace
6594

6695
class ThreadPoolTaskScheduler::QueueImpl {
@@ -77,10 +106,39 @@ class ThreadPoolTaskScheduler::QueueImpl {
77106
SyncQueue<ElementType, PriorityQueue> sync_queue_;
78107
};
79108

80-
ThreadPoolTaskScheduler::ThreadPoolTaskScheduler(size_t thread_count)
81-
: queue_{std::make_unique<QueueImpl>()} {
109+
class ThreadPoolTaskScheduler::CancellationQueueImpl {
110+
public:
111+
using ElementType = TaskScheduler::CallFuncType;
112+
113+
bool Pull(ElementType& element) { return sync_queue_.Pull(element); }
114+
void Push(ElementType&& element) { sync_queue_.Push(std::move(element)); }
115+
void Close() { sync_queue_.Close(); }
116+
117+
private:
118+
SyncQueueFifo<ElementType> sync_queue_;
119+
};
120+
121+
ThreadPoolTaskScheduler::ThreadPoolTaskScheduler(size_t thread_count,
122+
bool enable_cancellation_lane)
123+
: queue_{std::make_unique<QueueImpl>()},
124+
cancellation_lane_enabled_{enable_cancellation_lane} {
82125
thread_pool_.reserve(thread_count);
83126

127+
if (cancellation_lane_enabled_) {
128+
cancellation_queue_ = std::make_unique<CancellationQueueImpl>();
129+
cancellation_thread_ = std::thread([this]() {
130+
SetCancellationExecutorName();
131+
132+
for (;;) {
133+
TaskScheduler::CallFuncType task;
134+
if (!cancellation_queue_->Pull(task)) {
135+
return;
136+
}
137+
task();
138+
}
139+
});
140+
}
141+
84142
for (size_t idx = 0; idx < thread_count; ++idx) {
85143
std::thread executor([this, idx]() {
86144
// Set thread name for easy profiling and debugging
@@ -100,40 +158,38 @@ ThreadPoolTaskScheduler::ThreadPoolTaskScheduler(size_t thread_count)
100158
}
101159

102160
ThreadPoolTaskScheduler::~ThreadPoolTaskScheduler() {
161+
if (cancellation_queue_) {
162+
cancellation_queue_->Close();
163+
}
103164
queue_->Close();
165+
if (cancellation_thread_.joinable()) {
166+
cancellation_thread_.join();
167+
}
104168
for (auto& thread : thread_pool_) {
105169
thread.join();
106170
}
107171
thread_pool_.clear();
108172
}
109173

174+
void ThreadPoolTaskScheduler::EnqueueCancellationTask(
175+
TaskScheduler::CallFuncType&& func) {
176+
auto task = WrapWithLogContext(std::move(func));
177+
178+
if (!cancellation_lane_enabled_ || !cancellation_queue_) {
179+
queue_->Push({std::move(task), thread::NORMAL});
180+
return;
181+
}
182+
183+
cancellation_queue_->Push(std::move(task));
184+
}
185+
110186
void ThreadPoolTaskScheduler::EnqueueTask(TaskScheduler::CallFuncType&& func) {
111187
EnqueueTask(std::move(func), thread::NORMAL);
112188
}
113189

114190
void ThreadPoolTaskScheduler::EnqueueTask(TaskScheduler::CallFuncType&& func,
115191
uint32_t priority) {
116-
auto logContext = logging::GetContext();
117-
118-
#if __cplusplus >= 201402L
119-
// At least C++14, use generalized lambda capture
120-
auto funcWithCapturedLogContext = [logContext = std::move(logContext),
121-
func = std::move(func)]() {
122-
olp::logging::ScopedLogContext scopedContext(logContext);
123-
func();
124-
};
125-
#else
126-
// C++11 does not support generalized lambda capture :(
127-
auto funcWithCapturedLogContext = std::bind(
128-
[](std::shared_ptr<const olp::logging::LogContext>& logContext,
129-
TaskScheduler::CallFuncType& func) {
130-
olp::logging::ScopedLogContext scopedContext(logContext);
131-
func();
132-
},
133-
std::move(logContext), std::move(func));
134-
#endif
135-
136-
queue_->Push({std::move(funcWithCapturedLogContext), priority});
192+
queue_->Push({WrapWithLogContext(std::move(func)), priority});
137193
}
138194

139195
} // namespace thread

0 commit comments

Comments
 (0)