Skip to content

Commit 0ef82b4

Browse files
authored
Version 0.0.9 (#35)
* addition of task objects * result refactor * executor refactor + optimizations * tests were completly re-written * move to MSVC 18.6.2 and clang 11 * move to standard coroutines on MSVC * awaitables are uncopiable and unmovable * when timer is cancelled/destructed, spawned tasks that are not yet executed are cancelled. Note: CI/CD fail as Clang 11 is still not supported on Github Actions. Tests were run locally on Window, Linux and Mac. Note: This version breaks ABI if applications implemented their own executors.
1 parent 547c557 commit 0ef82b4

80 files changed

Lines changed: 6139 additions & 3258 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
# Others
4141
.vs
42+
CMakeSettings.json
4243

4344
# Specific directories
4445
build/

CMakeLists.txt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
cmake_minimum_required(VERSION 3.16)
22

33
project(concurrencpp
4-
VERSION 0.0.8
4+
VERSION 0.0.9
55
LANGUAGES CXX)
66

77
include(cmake/coroutineOptions.cmake)
@@ -17,13 +17,15 @@ endif()
1717
# ---- Declare library ----
1818

1919
set(concurrencpp_sources
20+
source/task.cpp
2021
source/executors/executor.cpp
2122
source/executors/manual_executor.cpp
2223
source/executors/thread_executor.cpp
2324
source/executors/thread_pool_executor.cpp
2425
source/executors/worker_thread_executor.cpp
26+
source/results/impl/consumer_context.cpp
27+
source/results/impl/result_state.cpp
2528
source/results/promises.cpp
26-
source/results/result_core.cpp
2729
source/runtime/runtime.cpp
2830
source/threads/thread.cpp
2931
source/timers/timer.cpp
@@ -32,8 +34,10 @@ set(concurrencpp_sources
3234
set(concurrencpp_headers
3335
include/concurrencpp/concurrencpp.h
3436
include/concurrencpp/errors.h
37+
include/concurrencpp/task.h
3538
include/concurrencpp/forward_declerations.h
3639
include/concurrencpp/platform_defs.h
40+
include/concurrencpp/coroutines/coroutine.h
3741
include/concurrencpp/executors/constants.h
3842
include/concurrencpp/executors/derivable_executor.h
3943
include/concurrencpp/executors/executor.h
@@ -43,13 +47,14 @@ set(concurrencpp_headers
4347
include/concurrencpp/executors/thread_executor.h
4448
include/concurrencpp/executors/thread_pool_executor.h
4549
include/concurrencpp/executors/worker_thread_executor.h
50+
include/concurrencpp/results/impl/consumer_context.h
51+
include/concurrencpp/results/impl/producer_context.h
52+
include/concurrencpp/results/impl/result_state.h
4653
include/concurrencpp/results/constants.h
47-
include/concurrencpp/results/executor_exception.h
4854
include/concurrencpp/results/make_result.h
4955
include/concurrencpp/results/promises.h
5056
include/concurrencpp/results/result.h
5157
include/concurrencpp/results/result_awaitable.h
52-
include/concurrencpp/results/result_core.h
5358
include/concurrencpp/results/result_fwd_declerations.h
5459
include/concurrencpp/results/when_result.h
5560
include/concurrencpp/runtime/constants.h

README.md

Lines changed: 201 additions & 117 deletions
Large diffs are not rendered by default.

cmake/coroutineOptions.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44
function(target_coroutine_options TARGET)
55
if(MSVC)
6-
target_compile_options(${TARGET} PUBLIC /await /permissive-)
6+
target_compile_options(${TARGET} PUBLIC /std:c++latest /permissive-)
77
return()
88
endif()
99

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#ifndef CONCURRENCPP_COROUTINE_H
2+
#define CONCURRENCPP_COROUTINE_H
3+
4+
#include "../platform_defs.h"
5+
6+
#ifdef CRCPP_MSVC_COMPILER
7+
8+
# include <coroutine>
9+
10+
namespace concurrencpp::details {
11+
template<class promise_type>
12+
using coroutine_handle = std::coroutine_handle<promise_type>;
13+
using suspend_never = std::suspend_never;
14+
using suspend_always = std::suspend_always;
15+
} // namespace concurrencpp::details
16+
17+
#elif defined(CRCPP_CLANG_COMPILER)
18+
19+
# include <experimental/coroutine>
20+
21+
namespace concurrencpp::details {
22+
template<class promise_type>
23+
using coroutine_handle = std::experimental::coroutine_handle<promise_type>;
24+
using suspend_never = std::experimental::suspend_never;
25+
using suspend_always = std::experimental::suspend_always;
26+
} // namespace concurrencpp::details
27+
28+
#endif
29+
30+
#endif

include/concurrencpp/executors/constants.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#ifndef CONCURRENCPP_EXECUTORS_CONSTS_H
22
#define CONCURRENCPP_EXECUTORS_CONSTS_H
33

4+
#include <numeric>
5+
46
namespace concurrencpp::details::consts {
57
inline const char* k_inline_executor_name = "concurrencpp::inline_executor";
68
constexpr int k_inline_executor_max_concurrency_level = 0;

include/concurrencpp/executors/derivable_executor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#ifndef CONCURRENCPP_DERIVABLE_EXECUTOR_H
22
#define CONCURRENCPP_DERIVABLE_EXECUTOR_H
33

4+
#include "concurrencpp/utils/bind.h"
45
#include "concurrencpp/executors/executor.h"
56

67
namespace concurrencpp {

include/concurrencpp/executors/executor.h

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
#ifndef CONCURRENCPP_EXECUTOR_H
22
#define CONCURRENCPP_EXECUTOR_H
33

4+
#include "concurrencpp/task.h"
45
#include "concurrencpp/results/result.h"
56

67
#include <span>
8+
#include <vector>
79
#include <string>
810
#include <string_view>
911

@@ -16,27 +18,13 @@ namespace concurrencpp {
1618
class executor {
1719

1820
private:
19-
template<class executor_type, class callable_type, class... argument_types>
20-
static null_result post_bridge(executor_tag, executor_type*, callable_type callable, argument_types... arguments) {
21-
callable(arguments...);
22-
co_return;
23-
}
24-
25-
template<class callable_type>
26-
static null_result bulk_post_bridge(details::executor_bulk_tag, std::vector<std::experimental::coroutine_handle<>>* accumulator, callable_type callable) {
27-
callable();
28-
co_return;
29-
}
30-
3121
template<class return_type, class executor_type, class callable_type, class... argument_types>
3222
static result<return_type> submit_bridge(executor_tag, executor_type*, callable_type callable, argument_types... arguments) {
3323
co_return callable(arguments...);
3424
}
3525

3626
template<class callable_type, class return_type = typename std::invoke_result_t<callable_type>>
37-
static result<return_type> bulk_submit_bridge(details::executor_bulk_tag,
38-
std::vector<std::experimental::coroutine_handle<>>* accumulator,
39-
callable_type callable) {
27+
static result<return_type> bulk_submit_bridge(details::executor_bulk_tag, std::vector<concurrencpp::task>* accumulator, callable_type callable) {
4028
co_return callable();
4129
}
4230

@@ -46,7 +34,8 @@ namespace concurrencpp {
4634
static_assert(std::is_invocable_v<callable_type, argument_types...>,
4735
"concurrencpp::executor::post - <<callable_type>> is not invokable with <<argument_types...>>");
4836

49-
post_bridge({}, executor_ptr, std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
37+
assert(executor_ptr != nullptr);
38+
executor_ptr->enqueue(details::bind_with_try_catch(std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...));
5039
}
5140

5241
template<class executor_type, class callable_type, class... argument_types>
@@ -55,37 +44,40 @@ namespace concurrencpp {
5544
"concurrencpp::executor::submit - <<callable_type>> is not invokable with <<argument_types...>>");
5645

5746
using return_type = typename std::invoke_result_t<callable_type, argument_types...>;
58-
5947
return submit_bridge<return_type>({}, executor_ptr, std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
6048
}
6149

6250
template<class executor_type, class callable_type>
6351
static void do_bulk_post(executor_type* executor_ptr, std::span<callable_type> callable_list) {
64-
std::vector<std::experimental::coroutine_handle<>> accumulator;
65-
accumulator.reserve(callable_list.size());
52+
assert(executor_ptr != nullptr);
53+
assert(!callable_list.empty());
54+
55+
std::vector<task> tasks;
56+
tasks.reserve(callable_list.size());
6657

6758
for (auto& callable : callable_list) {
68-
bulk_post_bridge<callable_type>({}, &accumulator, std::move(callable));
59+
tasks.emplace_back(details::bind_with_try_catch(std::move(callable)));
6960
}
7061

71-
assert(!accumulator.empty());
72-
executor_ptr->enqueue(accumulator);
62+
std::span<task> span = tasks;
63+
executor_ptr->enqueue(span);
7364
}
7465

7566
template<class executor_type, class callable_type, class return_type = std::invoke_result_t<callable_type>>
7667
static std::vector<concurrencpp::result<return_type>> do_bulk_submit(executor_type* executor_ptr, std::span<callable_type> callable_list) {
77-
std::vector<std::experimental::coroutine_handle<>> accumulator;
68+
std::vector<task> accumulator;
7869
accumulator.reserve(callable_list.size());
7970

80-
std::vector<concurrencpp::result<return_type>> results;
71+
std::vector<result<return_type>> results;
8172
results.reserve(callable_list.size());
8273

8374
for (auto& callable : callable_list) {
8475
results.emplace_back(bulk_submit_bridge<callable_type>({}, &accumulator, std::move(callable)));
8576
}
8677

8778
assert(!accumulator.empty());
88-
executor_ptr->enqueue(accumulator);
79+
std::span<task> span = accumulator;
80+
executor_ptr->enqueue(span);
8981
return results;
9082
}
9183

@@ -96,8 +88,8 @@ namespace concurrencpp {
9688

9789
const std::string name;
9890

99-
virtual void enqueue(std::experimental::coroutine_handle<> task) = 0;
100-
virtual void enqueue(std::span<std::experimental::coroutine_handle<>> tasks) = 0;
91+
virtual void enqueue(concurrencpp::task task) = 0;
92+
virtual void enqueue(std::span<concurrencpp::task> tasks) = 0;
10193

10294
virtual int max_concurrency_level() const noexcept = 0;
10395

include/concurrencpp/executors/inline_executor.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,13 @@ namespace concurrencpp {
2121
public:
2222
inline_executor() noexcept : executor(details::consts::k_inline_executor_name), m_abort(false) {}
2323

24-
void enqueue(std::experimental::coroutine_handle<> task) override {
24+
void enqueue(concurrencpp::task task) override {
2525
throw_if_aborted();
2626
task();
2727
}
2828

29-
void enqueue(std::span<std::experimental::coroutine_handle<>> tasks) override {
29+
void enqueue(std::span<concurrencpp::task> tasks) override {
3030
throw_if_aborted();
31-
3231
for (auto& task : tasks) {
3332
task();
3433
}

include/concurrencpp/executors/manual_executor.h

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,42 @@
22
#define CONCURRENCPP_MANUAL_EXECUTOR_H
33

44
#include "concurrencpp/executors/derivable_executor.h"
5-
#include "concurrencpp/executors/constants.h"
65

76
#include <deque>
7+
#include <chrono>
88

99
namespace concurrencpp {
1010
class alignas(64) manual_executor final : public derivable_executor<manual_executor> {
1111

1212
private:
1313
mutable std::mutex m_lock;
14-
std::deque<std::experimental::coroutine_handle<>> m_tasks;
14+
std::deque<task> m_tasks;
1515
std::condition_variable m_condition;
1616
bool m_abort;
1717
std::atomic_bool m_atomic_abort;
1818

19-
void destroy_tasks(std::unique_lock<std::mutex>& lock) noexcept;
19+
template<class clock_type, class duration_type>
20+
static std::chrono::system_clock::time_point to_system_time_point(std::chrono::time_point<clock_type, duration_type> time_point) {
21+
const auto src_now = clock_type::now();
22+
const auto dst_now = std::chrono::system_clock::now();
23+
return dst_now + std::chrono::duration_cast<std::chrono::milliseconds>(time_point - src_now);
24+
}
25+
26+
static std::chrono::system_clock::time_point time_point_from_now(std::chrono::milliseconds ms) {
27+
return std::chrono::system_clock::now() + ms;
28+
}
29+
30+
size_t loop_impl(size_t max_count);
31+
size_t loop_until_impl(size_t max_count, std::chrono::time_point<std::chrono::system_clock> deadline);
32+
33+
void wait_for_tasks_impl(size_t count);
34+
size_t wait_for_tasks_impl(size_t count, std::chrono::time_point<std::chrono::system_clock> deadline);
2035

2136
public:
2237
manual_executor();
2338

24-
void enqueue(std::experimental::coroutine_handle<> task) override;
25-
void enqueue(std::span<std::experimental::coroutine_handle<>> tasks) override;
39+
void enqueue(task task) override;
40+
void enqueue(std::span<task> tasks) override;
2641

2742
int max_concurrency_level() const noexcept override;
2843

@@ -32,15 +47,43 @@ namespace concurrencpp {
3247
size_t size() const noexcept;
3348
bool empty() const noexcept;
3449

50+
size_t clear();
51+
3552
bool loop_once();
36-
bool loop_once(std::chrono::milliseconds max_waiting_time);
53+
54+
bool loop_once_for(std::chrono::milliseconds max_waiting_time);
55+
56+
template<class clock_type, class duration_type>
57+
bool loop_once_until(std::chrono::time_point<clock_type, duration_type> timeout_time) {
58+
return loop_until_impl(1, to_system_time_point(timeout_time));
59+
}
3760

3861
size_t loop(size_t max_count);
3962

40-
size_t clear() noexcept;
63+
size_t loop_for(size_t max_count, std::chrono::milliseconds max_waiting_time);
64+
65+
template<class clock_type, class duration_type>
66+
size_t loop_until(size_t max_count, std::chrono::time_point<clock_type, duration_type> timeout_time) {
67+
return loop_until_impl(max_count, to_system_time_point(timeout_time));
68+
}
4169

4270
void wait_for_task();
43-
bool wait_for_task(std::chrono::milliseconds max_waiting_time);
71+
72+
bool wait_for_task_for(std::chrono::milliseconds max_waiting_time);
73+
74+
template<class clock_type, class duration_type>
75+
bool wait_for_task_until(std::chrono::time_point<clock_type, duration_type> timeout_time) {
76+
return wait_for_tasks_impl(1, to_system_time_point(timeout_time)) == 1;
77+
}
78+
79+
void wait_for_tasks(size_t count);
80+
81+
size_t wait_for_tasks_for(size_t count, std::chrono::milliseconds max_waiting_time);
82+
83+
template<class clock_type, class duration_type>
84+
size_t wait_for_tasks_until(size_t count, std::chrono::time_point<clock_type, duration_type> timeout_time) {
85+
return wait_for_tasks_impl(count, to_system_time_point(timeout_time));
86+
}
4487
};
4588
} // namespace concurrencpp
4689

0 commit comments

Comments
 (0)