Skip to content

Commit 0c612ae

Browse files
authored
Version 0.1.5 (#84)
* `async_lock` implementation * `when_any` data race fix * parallel coroutine to throw `std::invalid_argument` on null executor * tests compilation time optimization * `result` optimizations
1 parent 580f430 commit 0c612ae

40 files changed

Lines changed: 1664 additions & 259 deletions

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ jobs:
4242
cxx: clang++
4343
tsan: NO
4444

45-
- name: Windows (Visual Studio Enterprise 2019)
46-
os: windows-latest
45+
- name: Windows (Visual Studio Enterprise 2022)
46+
os: windows-2022
4747
cc: cl
4848
cxx: cl
4949
tsan: NO

CMakeLists.txt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
cmake_minimum_required(VERSION 3.16)
22

33
project(concurrencpp
4-
VERSION 0.1.4
4+
VERSION 0.1.5
55
LANGUAGES CXX)
66

77
include(cmake/coroutineOptions.cmake)
@@ -29,6 +29,7 @@ set(concurrencpp_sources
2929
source/results/impl/shared_result_state.cpp
3030
source/results/promises.cpp
3131
source/runtime/runtime.cpp
32+
source/threads/async_lock.cpp
3233
source/threads/binary_semaphore.cpp
3334
source/threads/thread.cpp
3435
source/timers/timer.cpp
@@ -71,6 +72,7 @@ set(concurrencpp_headers
7172
include/concurrencpp/results/generator.h
7273
include/concurrencpp/runtime/constants.h
7374
include/concurrencpp/runtime/runtime.h
75+
include/concurrencpp/threads/async_lock.h
7476
include/concurrencpp/threads/binary_semaphore.h
7577
include/concurrencpp/threads/thread.h
7678
include/concurrencpp/threads/cache_line.h
@@ -91,6 +93,9 @@ target_compile_features(concurrencpp PUBLIC cxx_std_20)
9193

9294
target_coroutine_options(concurrencpp)
9395

96+
find_package(Threads REQUIRED)
97+
target_link_libraries(concurrencpp PUBLIC Threads::Threads)
98+
9499
find_library(LIBRT NAMES rt DOC "Path to the Real Time shared library")
95100
target_link_libraries(concurrencpp PUBLIC "$<$<BOOL:${LIBRT}>:${LIBRT}>")
96101

README.md

Lines changed: 219 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
2-
31
# concurrencpp, the C++ concurrency library
42

53
![Latest Release](https://img.shields.io/github/v/release/David-Haim/concurrencpp.svg) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
@@ -57,6 +55,9 @@ concurrencpp main advantages are:
5755
* [Delay object example](#delay-object-example)
5856
* [Generators](#generators)
5957
* [`generator` API](#generator-api)
58+
* [Asynchronous locks](#asynchronous-locks)
59+
* [`async_lock` API](#async_lock-api)
60+
* [`scoped_async_lock` API](#scoped_async_lock-api)
6061
* [The runtime object](#the-runtime-object)
6162
* [`runtime` API](#runtime-api)
6263
* [Creating user-defined executors](#creating-user-defined-executors)
@@ -766,14 +767,16 @@ concurrencpp also provides parallel coroutines, which start to run inside a give
766767
Every parallel coroutine must meet the following preconditions:
767768
768769
1. Returns any of `result` / `null_result` .
769-
1. Gets `executor_tag` as its first argument .
770-
1. Gets any of `type*` / `type&` / `std::shared_ptr<type>`, where `type` is a concrete class of `executor` as its second argument.
771-
1. Contains any of `co_await` or `co_return` in its body.
770+
2. Gets `executor_tag` as its first argument .
771+
3. Gets any of `type*` / `type&` / `std::shared_ptr<type>`, where `type` is a concrete class of `executor` as its second argument.
772+
4. Contains any of `co_await` or `co_return` in its body.
773+
5. Is not a member function or a lambda function
772774
773775
If all the above applies, the function is a parallel coroutine:
774776
concurrencpp will start the coroutine suspended and immediately reschedule it to run in the provided executor.
775777
`concurrencpp::executor_tag` is a dummy placeholder to tell the concurrencpp runtime that this function is not a regular function, it needs to start running inside the given executor.
776-
Applications can then consume the result of the parallel coroutine by using the returned result object.
778+
If the executor passed to the parallel coroutine is null, the coroutine will not start to run and an `std::invalid_argument` exception will be thrown synchronously.
779+
If all preconditions are met, Applications can consume the result of the parallel coroutine by using the returned result object.
777780
778781
#### *Parallel Fibonacci example:*
779782
```cpp
@@ -1650,6 +1653,210 @@ class generator_iterator {
16501653
};
16511654
```
16521655
1656+
### Asynchronous locks
1657+
Regular synchronous locks cannot be used safely inside coroutines for a number of reasons:
1658+
1659+
- Synchronous locks, such as `std::mutex`, are expected to be locked and unlocked in the same thread of execution. Unlocking a synchronous lock in a thread which had not locked it is undefined behavior. Since tasks can be suspended and resumed in any thread of execution, synchronous locks will break when used inside tasks.
1660+
- Synchronous locks were created to work with *threads* and not with *coroutines*. If a synchronous lock is already locked by one thread, then when another thread tries to lock it, the entire thread of execution will be blocked and will be unblocked when the lock is released. This mechanism works well for traditional multi-threading paradigms but not for coroutines: with coroutines, we want *tasks* to be *suspended and resumed* without blocking or interfering with the execution of underlying threads and executors.
1661+
1662+
`concurrencpp::async_lock` solves those issues by providing a similar API to `std::mutex`, with the main difference that calls to `concurrencpp::async_lock` will return a lazy-result that can be `co_awaited` safely inside tasks. If one task tries to lock an async-lock and fails, the task will be suspended, and will be resumed when the lock is unlocked and acquired by the suspended task. This allows executors to process a huge amount of tasks waiting to acquire a lock without expensive context-switching and expensive kernel calls.
1663+
1664+
Similar to how `std::mutex` works, only one task can acquire `async_lock` at any given time, and a *read barrier* is place at the moment of acquiring. Releasing an async lock places a *write barrier* and allows the next task to acquire it, creating a chain of one-modifier at a time who sees the changes other modifiers had done and posts its modifications for the next modifiers to see.
1665+
1666+
Like `std::mutex`, `concurrencpp::async_lock` ***is not recursive***. Extra attention must be given when acquiring such lock - A lock must not be acquired again in a task that has been spawned by another task which had already acquired the lock. In such case, an unavoidable dead-lock will occur. Unlike other objects in concurrencpp, `async_lock` is neither copiable nor movable.
1667+
1668+
Like standard locks, `concurrencpp::async_lock` is meant to be used with scoped wrappers which leverage C++ RAII idiom to ensure locks are always unlocked upon function return or thrown exception. `async_lock::lock` returns a lazy-result of a scoped wrapper that calls `async_lock::unlock` on destruction. Raw uses of `async_lock::unlock` are discouraged. `concurrencpp::scoped_async_lock` acts as the scoped wrapper and provides an API which is almost identical to `std::unique_lock`. `concurrencpp::scoped_async_lock` is movable, but not copiable.
1669+
1670+
`async_lock::lock` and `scoped_async_lock::lock` require a resume-executor as their parameter. Upon calling those methods, if the lock is available for locking, then it is locked and the current task is resumed immediately. If not, then the current task is suspended, and will be resumed inside the given resume-executor when the lock is finally acquired by it.
1671+
1672+
`concurrencpp::scoped_async_lock` wraps an `async_lock` and ensure it's properly unlocked. like `std::unique_lock`, there are cases it does not wrap any lock, and in this case it's considered to be empty. An empty `scoped_async_lock` can happen when it's defaultly constructed, moved, or `scoped_async_lock::release` method is called. An empty scoped-async-lock will not unlock any lock on destruction.
1673+
1674+
Even if the scoped-async-lock is not empty, it does not mean that it owns the underlying async-lock and it will unlock it on destruction. Non-empty and non-owning scoped-async locks can happen if `scoped_async_lock::unlock` was called or the scoped-async-lock was constructed using `scoped_async_lock(async_lock&, std::defer_lock_t)` constructor.
1675+
1676+
#### `async_lock` *example:*
1677+
1678+
```cpp
1679+
#include "concurrencpp/concurrencpp.h"
1680+
1681+
#include <vector>
1682+
#include <iostream>
1683+
1684+
std::vector<size_t> numbers;
1685+
concurrencpp::async_lock lock;
1686+
1687+
concurrencpp::result<void> add_numbers(concurrencpp::executor_tag,
1688+
std::shared_ptr<concurrencpp::executor> executor,
1689+
size_t begin,
1690+
size_t end) {
1691+
for (auto i = begin; i < end; i++) {
1692+
concurrencpp::scoped_async_lock raii_wrapper = co_await lock.lock(executor);
1693+
numbers.push_back(i);
1694+
}
1695+
}
1696+
1697+
int main() {
1698+
concurrencpp::runtime runtime;
1699+
constexpr size_t range = 10'000'000;
1700+
constexpr size_t sections = 4;
1701+
concurrencpp::result<void> results[sections];
1702+
1703+
for (size_t i = 0; i < 4; i++) {
1704+
const auto range_start = i * range / sections;
1705+
const auto range_end = (i + 1) * range / sections;
1706+
1707+
results[i] = add_numbers({}, runtime.thread_pool_executor(), range_start, range_end);
1708+
}
1709+
1710+
for (auto& result : results) {
1711+
result.get();
1712+
}
1713+
1714+
std::cout << "vector size is " << numbers.size() << std::endl;
1715+
1716+
// make sure the vector state has not been corrupted by unprotected concurrent accesses
1717+
std::sort(numbers.begin(), numbers.end());
1718+
for (size_t i = 0; i < range; i++) {
1719+
if (numbers[i] != i) {
1720+
std::cerr << "vector state is corrupted." << std::endl;
1721+
return -1;
1722+
}
1723+
}
1724+
1725+
std::cout << "succeeded pushing range [0 - 10,000,000] concurrently to the vector!" << std::endl;
1726+
return 0;
1727+
}
1728+
```
1729+
#### `async_lock` API
1730+
```cpp
1731+
class async_lock {
1732+
/*
1733+
Constructs an async lock object.
1734+
*/
1735+
async_lock() noexcept;
1736+
1737+
/*
1738+
Destructs an async lock object.
1739+
*this is not automatically unlocked at the moment of destruction.
1740+
*/
1741+
~async_lock() noexcept;
1742+
1743+
/*
1744+
Asynchronously acquires the async lock.
1745+
If *this has already been locked by another non-parent task, the current task will be suspended
1746+
and will be resumed when *this is acquired, inside resume_executor.
1747+
If *this has not been locked by another task, then *this will be acquired and the current task will be resumed
1748+
immediately in the calling thread of execution.
1749+
If *this has already been locked by a parent task, then unavoidable dead-lock will occur.
1750+
Throws std::invalid_argument if resume_executor is null.
1751+
Throws std::system error if one of the underlying synhchronization primitives throws.
1752+
*/
1753+
lazy_result<scoped_async_lock> lock(std::shared_ptr<executor> resume_executor);
1754+
1755+
/*
1756+
Tries to acquire *this in the calling thread of execution.
1757+
Returns true if *this is acquired, false otherwise.
1758+
In any case, the current task is resumed immediately in the calling thread of execution.
1759+
Throws std::system error if one of the underlying synhchronization primitives throws.
1760+
*/
1761+
lazy_result<bool> try_lock();
1762+
1763+
/*
1764+
Releases *this and allows other tasks (including suspended tasks waiting for *this) to acquire it.
1765+
Throws std::system error if *this is not locked at the moment of calling this method.
1766+
Throws std::system error if one of the underlying synhchronization primitives throws.
1767+
*/
1768+
void unlock();
1769+
};
1770+
```
1771+
#### `scoped_async_lock` API
1772+
1773+
```cpp
1774+
class scoped_async_lock {
1775+
/*
1776+
Constructs an async lock wrapper that does not wrap any async lock.
1777+
*/
1778+
scoped_async_lock() noexcept = default;
1779+
1780+
/*
1781+
If *this wraps async_lock, this method releases the wrapped lock.
1782+
*/
1783+
~scoped_async_lock() noexcept;
1784+
1785+
/*
1786+
Moves rhs to *this.
1787+
After this call, *rhs does not wrap any async lock.
1788+
*/
1789+
scoped_async_lock(scoped_async_lock&& rhs) noexcept;
1790+
1791+
/*
1792+
Wrapps unlocked lock.
1793+
lock must not be in acquired mode when calling this method.
1794+
*/
1795+
scoped_async_lock(async_lock& lock, std::defer_lock_t) noexcept;
1796+
1797+
/*
1798+
Wrapps locked lock.
1799+
lock must be already acquired when calling this method.
1800+
*/
1801+
scoped_async_lock(async_lock& lock, std::adopt_lock_t) noexcept;
1802+
1803+
/*
1804+
Calls async_lock::lock on the wrapped locked, using resume_executor as a parameter.
1805+
Throws std::invalid_argument if resume_executor is nulll.
1806+
Throws std::system_error if *this does not wrap any lock.
1807+
Throws std::system_error if wrapped lock is already locked.
1808+
Throws any exception async_lock::lock throws.
1809+
*/
1810+
lazy_result<void> lock(std::shared_ptr<executor> resume_executor);
1811+
1812+
/*
1813+
Calls async_lock::try_lock on the wrapped lock.
1814+
Throws std::system_error if *this does not wrap any lock.
1815+
Throws std::system_error if wrapped lock is already locked.
1816+
Throws any exception async_lock::try_lock throws.
1817+
*/
1818+
lazy_result<bool> try_lock();
1819+
1820+
/*
1821+
Calls async_lock::unlock on the wrapped lock.
1822+
If *this does not wrap any lock, this method does nothing.
1823+
Throws std::system_error if *this wraps a lock and it is not locked.
1824+
*/
1825+
void unlock();
1826+
1827+
/*
1828+
Checks whether *this wraps a locked mutex or not.
1829+
Returns true if wrapped locked is in acquired state, false otherwise.
1830+
*/
1831+
bool owns_lock() const noexcept;
1832+
1833+
/*
1834+
Equivalent to owns_lock.
1835+
*/
1836+
explicit operator bool() const noexcept;
1837+
1838+
/*
1839+
Swaps the contents of *this and rhs.
1840+
*/
1841+
void swap(scoped_async_lock& rhs) noexcept;
1842+
1843+
/*
1844+
Empties *this and returns a pointer to the previously wrapped lock.
1845+
After a call to this method, *this doesn't wrap any lock.
1846+
The previously wrapped lock is not released,
1847+
it must be released by either unlocking it manually through the returned pointer or by
1848+
capturing the pointer with another scoped_async_lock which will take ownerwhip over it.
1849+
*/
1850+
async_lock* release() noexcept;
1851+
1852+
/*
1853+
Returns a pointer to the wrapped async_lock, or a null pointer if there is no wrapped async_lock.
1854+
*/
1855+
async_lock* mutex() const noexcept;
1856+
};
1857+
1858+
```
1859+
16531860
### The runtime object
16541861

16551862
The concurrencpp runtime object is the agent used to acquire, store and create new executors.
@@ -1988,14 +2195,17 @@ $ cd build/test
19882195
$ ctest . -V
19892196
```
19902197

1991-
##### Via vcpkg on Windows and *nix platforms
2198+
##### Via package managers on Windows and *nix platforms
19922199

1993-
Alternatively to building and installing the library manually, developers may get stable releases of concurrencpp as [vcpkg](https://vcpkg.io/) packages:
2200+
Alternatively to building and installing the library manually, developers may get stable releases of concurrencpp via the [vcpkg](https://vcpkg.io/) and [Conan](https://conan.io/) package managers:
19942201

2202+
vcpkg:
19952203
```shell
19962204
$ vcpkg install concurrencpp
19972205
```
19982206

2207+
Conan: [concurrencpp on ConanCenter](https://conan.io/center/concurrencpp)
2208+
19992209
##### Experimenting with the built-in sandbox
20002210
concurrencpp comes with a built-in sandbox program which developers can modify and experiment, without having to install or link the compiled library to a different code-base. In order to play with the sandbox, developers can modify `sandbox/main.cpp` and compile the application using the following commands:
20012211

@@ -2012,4 +2222,4 @@ $ cmake -S sandbox -B build/sandbox
20122222
#for release mode: cmake -DCMAKE_BUILD_TYPE=Release -S sandbox -B build/sandbox
20132223
$ cmake --build build/sandbox
20142224
$ ./build/sandbox #runs the sandbox
2015-
```
2225+
```

cmake/concurrencppConfig.cmake

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,4 @@
1+
include(CMakeFindDependencyMacro)
2+
find_dependency(Threads)
3+
14
include("${CMAKE_CURRENT_LIST_DIR}/concurrencppTargets.cmake")

cmake/coroutineOptions.cmake

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,7 @@
44
function(target_coroutine_options TARGET)
55
if(MSVC)
66
target_compile_options(${TARGET} PUBLIC /std:c++latest /permissive-)
7-
return()
8-
endif()
9-
10-
find_package(Threads REQUIRED)
11-
target_link_libraries(${TARGET} PRIVATE Threads::Threads)
12-
13-
if(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
7+
elseif(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
148
target_compile_options(${TARGET} PUBLIC -stdlib=libc++ -fcoroutines-ts)
159
target_link_options(${TARGET} PUBLIC -stdlib=libc++)
1610
set_target_properties(${TARGET} PROPERTIES CXX_EXTENSIONS NO)

cmake/setCiVars.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
if (os MATCHES "^windows")
22
execute_process(
3-
COMMAND "C:/Program Files (x86)/Microsoft Visual Studio/2019/Enterprise/VC/Auxiliary/Build/vcvars64.bat" && set
3+
COMMAND "C:/Program Files/Microsoft Visual Studio/2022/Enterprise/VC/Auxiliary/Build/vcvars64.bat" && set
44
OUTPUT_FILE environment_script_output.txt
55
)
66
file(STRINGS environment_script_output.txt output_lines)

include/concurrencpp/concurrencpp.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,6 @@
1717
#include "concurrencpp/results/resume_on.h"
1818
#include "concurrencpp/results/generator.h"
1919
#include "concurrencpp/executors/executor_all.h"
20+
#include "concurrencpp/threads/async_lock.h"
2021

2122
#endif

include/concurrencpp/coroutines/coroutine.h

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,23 @@
33

44
#include "../platform_defs.h"
55

6-
#ifdef CRCPP_MSVC_COMPILER
6+
#if !__has_include(<coroutine>) && __has_include(<experimental/coroutine>)
77

8-
# include <coroutine>
8+
# include <experimental/coroutine>
9+
# define CRCPP_COROUTINE_NAMESPACE std::experimental
910

10-
namespace concurrencpp::details {
11-
template<class promise_type>
12-
using coroutine_handle = std::coroutine_handle<promise_type>;
13-
using suspend_never = std::suspend_never;
14-
using suspend_always = std::suspend_always;
15-
} // namespace concurrencpp::details
11+
#else
1612

17-
#elif defined(CRCPP_CLANG_COMPILER)
13+
# include <coroutine>
14+
# define CRCPP_COROUTINE_NAMESPACE std
1815

19-
# include <experimental/coroutine>
16+
#endif
2017

2118
namespace concurrencpp::details {
2219
template<class promise_type>
23-
using coroutine_handle = std::experimental::coroutine_handle<promise_type>;
24-
using suspend_never = std::experimental::suspend_never;
25-
using suspend_always = std::experimental::suspend_always;
20+
using coroutine_handle = CRCPP_COROUTINE_NAMESPACE::coroutine_handle<promise_type>;
21+
using suspend_never = CRCPP_COROUTINE_NAMESPACE::suspend_never;
22+
using suspend_always = CRCPP_COROUTINE_NAMESPACE::suspend_always;
2623
} // namespace concurrencpp::details
2724

28-
#endif
29-
3025
#endif

0 commit comments

Comments
 (0)