Skip to content

Commit 16a9f13

Browse files
lucteoericniebler
andauthored
System scheduler R6 work (#1464)
* Minor fixes * Use generic functions for replaceability. One can replace different specialization of `query_system_context`. The runtime setter for system context (`set_system_context_backend`) is visible and can be specialized. * Add support for runtime environment to be passed to the backend. * Allow inplace_stop_token to be passed to system context backend. * Implement replaceability with a Phoenix singleton. * Use libdispatch backend if available. * Improve handling of type-erased receiver environment. Instead of having a dedicated parameter of type `env`, the receiver environment is now exposed thorough `receiver`. We don't need to store any extra data in the operation state, and the receiver can be queried at any time for receiver environment properties. * Fix include problem. * Use a spin lock instead of a mutex in `__instance_data` Hoping that msvc will allow it to be `constinit`. --------- Co-authored-by: Eric Niebler <eniebler@nvidia.com>
1 parent dc8f168 commit 16a9f13

7 files changed

Lines changed: 373 additions & 134 deletions

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,7 @@ include(CheckIncludeFileCXX)
472472
if (CMAKE_SYSTEM_NAME STREQUAL "Darwin")
473473
CHECK_INCLUDE_FILE_CXX("dispatch/dispatch.h" STDEXEC_FOUND_LIBDISPATCH)
474474
option(STDEXEC_ENABLE_LIBDISPATCH "Enable the tests for the Grand Central Dispatch scheduler" ${STDEXEC_FOUND_LIBDISPATCH})
475+
target_compile_definitions(stdexec INTERFACE STDEXEC_ENABLE_LIBDISPATCH)
475476
endif()
476477

477478
option (STDEXEC_ENABLE_NUMA "Enable NUMA affinity for static_thread_pool" OFF)

include/exec/__detail/__system_context_default_impl.hpp

Lines changed: 79 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,20 @@
1818
#include "__system_context_replaceability_api.hpp"
1919
#include "stdexec/execution.hpp"
2020
#include "exec/static_thread_pool.hpp"
21+
#if STDEXEC_ENABLE_LIBDISPATCH
22+
# include "exec/libdispatch_queue.hpp"
23+
#endif
24+
25+
#include <thread>
26+
#include <atomic>
2127

2228
namespace exec::__system_context_default_impl {
2329
using namespace stdexec::tags;
2430
using system_context_replaceability::receiver;
2531
using system_context_replaceability::bulk_item_receiver;
2632
using system_context_replaceability::storage;
2733
using system_context_replaceability::system_scheduler;
28-
using system_context_replaceability::__system_context_replaceability;
29-
30-
using __pool_scheduler_t = decltype(std::declval<exec::static_thread_pool>().get_scheduler());
34+
using system_context_replaceability::__system_context_backend_factory;
3135

3236
/// Receiver that calls the callback when the operation completes.
3337
template <class _Sender>
@@ -53,6 +57,8 @@ namespace exec::__system_context_default_impl {
5357
---------------------
5458
Total: 152; extra 24 bytes compared to internal operation state.
5559
60+
Using libdispatch backend, the operation sizes are 48 (down from 80) and 128 (down from 160).
61+
5662
[*] sizes taken on an Apple M2 Pro arm64 arch. They may differ on other architectures, or with different implementations.
5763
*/
5864

@@ -89,6 +95,12 @@ namespace exec::__system_context_default_impl {
8995
__op->__destruct(); // destroys the operation, including `this`.
9096
__r->set_stopped();
9197
}
98+
99+
decltype(auto) get_env() const noexcept {
100+
auto __o = __r_->try_query<stdexec::inplace_stop_token>();
101+
stdexec::inplace_stop_token __st = __o ? *__o : stdexec::inplace_stop_token{};
102+
return stdexec::prop{stdexec::get_stop_token, __st};
103+
}
92104
};
93105

94106
/// Ensure that `__storage` is aligned to `__alignment`. Shrinks the storage, if needed, to match desired alignment.
@@ -145,13 +157,16 @@ namespace exec::__system_context_default_impl {
145157
}
146158
};
147159

148-
struct __system_scheduler_impl : system_scheduler {
149-
__system_scheduler_impl()
160+
template <typename _BaseSchedulerContext>
161+
struct __system_scheduler_generic_impl : system_scheduler {
162+
__system_scheduler_generic_impl()
150163
: __pool_scheduler_(__pool_.get_scheduler()) {
151164
}
152165
private:
166+
using __pool_scheduler_t = decltype(std::declval<_BaseSchedulerContext>().get_scheduler());
167+
153168
/// The underlying thread pool.
154-
exec::static_thread_pool __pool_;
169+
_BaseSchedulerContext __pool_;
155170
__pool_scheduler_t __pool_scheduler_;
156171

157172
//! Functor called by the `bulk` operation; sends a `start` signal to the frontend.
@@ -184,7 +199,8 @@ namespace exec::__system_context_default_impl {
184199
}
185200

186201
void
187-
bulk_schedule(uint32_t __size, storage __storage, bulk_item_receiver* __r) noexcept override {
202+
bulk_schedule(uint32_t __size, storage __storage, bulk_item_receiver* __r) noexcept
203+
override {
188204
try {
189205
auto __sndr =
190206
stdexec::bulk(stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{__r});
@@ -197,51 +213,74 @@ namespace exec::__system_context_default_impl {
197213
}
198214
};
199215

200-
/// Keeps track of the object implementing the system context interfaces.
201-
struct __instance_holder {
216+
/// Keeps track of the backends for the system context interfaces.
217+
template <typename _Interface, typename _Impl>
218+
struct __instance_data {
219+
/// Gets the current instance; if there is no instance, uses the current factory to create one.
220+
std::shared_ptr<_Interface> __get_current_instance() {
221+
// If we have a valid instance, return it.
222+
__acquire_instance_lock();
223+
auto __r = __instance_;
224+
__release_instance_lock();
225+
if (__r) {
226+
return __r;
227+
}
202228

203-
/// Get the only instance of this class.
204-
static __instance_holder& __singleton() {
205-
static __instance_holder __this_instance_;
206-
return __this_instance_;
207-
}
229+
// Otherwise, create a new instance using the factory.
230+
// Note: we are lazy-loading the instance to avoid creating it if it is not needed.
231+
auto __new_instance = __factory_.load(std::memory_order_relaxed)();
208232

209-
/// Get the currently selected system context object.
210-
system_scheduler* __get_current_instance() const noexcept {
211-
return __current_instance_;
233+
// Store the newly created instance.
234+
__acquire_instance_lock();
235+
__instance_ = __new_instance;
236+
__release_instance_lock();
237+
return __new_instance;
212238
}
213239

214-
/// Allows changing the currently selected system context object; used for testing.
215-
void __set_current_instance(system_scheduler* __instance) noexcept {
216-
__current_instance_ = __instance;
240+
/// Set `__new_factory` as the new factory for `_Interface` and return the old one.
241+
__system_context_backend_factory<_Interface>
242+
__set_backend_factory(__system_context_backend_factory<_Interface> __new_factory) {
243+
// Replace the factory, keeping track of the old one.
244+
auto __old_factory = __factory_.exchange(__new_factory);
245+
// Create a new instance with the new factory.
246+
auto __new_instance = __new_factory();
247+
// Replace the current instance with the new one.
248+
__acquire_instance_lock();
249+
auto __old_instance = std::exchange(__instance_, __new_instance);
250+
__release_instance_lock();
251+
// Make sure to delete the old instance after releasing the lock.
252+
__old_instance.reset();
253+
return __old_factory;
217254
}
218255

219256
private:
220-
__instance_holder() {
221-
static __system_scheduler_impl __default_instance_;
222-
__current_instance_ = &__default_instance_;
223-
}
257+
std::atomic<bool> __instance_locked_{false};
258+
std::shared_ptr<_Interface> __instance_{nullptr};
259+
std::atomic<__system_context_backend_factory<_Interface>> __factory_{__default_factory};
224260

225-
system_scheduler* __current_instance_;
226-
};
261+
/// The default factory returns an instance of `_Impl`.
262+
static std::shared_ptr<_Interface> __default_factory() {
263+
return std::make_shared<_Impl>();
264+
}
227265

228-
struct __system_context_replaceability_impl : __system_context_replaceability {
229-
//! Globally replaces the system scheduler backend.
230-
//! This needs to be called within `main()` and before the system scheduler is accessed.
231-
void __set_system_scheduler(system_scheduler* __backend) noexcept override {
232-
__instance_holder::__singleton().__set_current_instance(__backend);
266+
void __acquire_instance_lock() {
267+
while (__instance_locked_.exchange(true, std::memory_order_acquire)) {
268+
// Spin until we acquire the lock.
269+
}
270+
}
271+
void __release_instance_lock() {
272+
__instance_locked_.store(false, std::memory_order_release);
233273
}
234274
};
235275

236-
inline void* __default_query_system_context_interface(const __uuid& __id) noexcept {
237-
if (__id == system_scheduler::__interface_identifier) {
238-
return __instance_holder::__singleton().__get_current_instance();
239-
} else if (__id == __system_context_replaceability::__interface_identifier) {
240-
static __system_context_replaceability_impl __impl;
241-
return &__impl;
242-
}
276+
#if STDEXEC_ENABLE_LIBDISPATCH
277+
using __system_scheduler_impl = __system_scheduler_generic_impl<exec::libdispatch_queue>;
278+
#else
279+
using __system_scheduler_impl = __system_scheduler_generic_impl<exec::static_thread_pool>;
280+
#endif
243281

244-
return nullptr;
245-
}
282+
/// The singleton to hold the `system_scheduler` instance.
283+
inline constinit __instance_data<system_scheduler, __system_scheduler_impl>
284+
__system_scheduler_singleton{};
246285

247286
} // namespace exec::__system_context_default_impl

include/exec/__detail/__system_context_default_impl_entry.hpp

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,36 @@
2323

2424
#include "__system_context_default_impl.hpp" // IWYU pragma: keep
2525

26-
STDEXEC_PRAGMA_PUSH()
27-
STDEXEC_PRAGMA_IGNORE_GNU("-Wattributes") // warning: inline function '[...]' declared weak
26+
#define __STDEXEC_SYSTEM_CONTEXT_API extern STDEXEC_SYSTEM_CONTEXT_INLINE STDEXEC_ATTRIBUTE((weak))
2827

29-
/// Gets the default system context implementation.
30-
extern STDEXEC_SYSTEM_CONTEXT_INLINE STDEXEC_ATTRIBUTE((weak)) void*
31-
__query_system_context_interface(const __uuid& __id) noexcept {
32-
return exec::__system_context_default_impl::__default_query_system_context_interface(__id);
33-
}
28+
namespace exec::system_context_replaceability {
29+
/// The default implementation of the `query_system_context` function template.
30+
template <__queryable_interface _Interface>
31+
__STDEXEC_SYSTEM_CONTEXT_API std::shared_ptr<_Interface> query_system_context() {
32+
return {};
33+
}
3434

35-
STDEXEC_PRAGMA_POP()
35+
/// The default specialization of `query_system_context` for `system_scheduler`.
36+
template <>
37+
std::shared_ptr<system_scheduler> query_system_context<system_scheduler>() {
38+
return __system_context_default_impl::__system_scheduler_singleton.__get_current_instance();
39+
}
40+
41+
/// The default implementation of the `set_system_context_backend_factory` function template.
42+
template <__queryable_interface _Interface>
43+
__STDEXEC_SYSTEM_CONTEXT_API __system_context_backend_factory<_Interface>
44+
set_system_context_backend_factory(__system_context_backend_factory<_Interface> __new_factory) {
45+
return nullptr;
46+
}
47+
48+
/// The default specialization of `set_system_context_backend_factory` for `system_scheduler`.
49+
template <>
50+
__system_context_backend_factory<system_scheduler>
51+
set_system_context_backend_factory<system_scheduler>(
52+
__system_context_backend_factory<system_scheduler> __new_factory) {
53+
return __system_context_default_impl::__system_scheduler_singleton.__set_backend_factory(
54+
__new_factory);
55+
}
56+
57+
58+
} // namespace exec::system_context_replaceability

include/exec/__detail/__system_context_replaceability_api.hpp

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717
#ifndef STDEXEC_SYSTEM_CONTEXT_REPLACEABILITY_API_H
1818
#define STDEXEC_SYSTEM_CONTEXT_REPLACEABILITY_API_H
1919

20+
#include "stdexec/__detail/__execution_fwd.hpp"
21+
2022
#include <cstdint>
2123
#include <exception>
24+
#include <optional>
25+
#include <memory>
2226

2327
struct __uuid {
2428
std::uint64_t __parts1;
@@ -27,39 +31,78 @@ struct __uuid {
2731
friend bool operator==(__uuid, __uuid) noexcept = default;
2832
};
2933

30-
/// Implementation-defined mechanism of querying a system context interface identified by `__id`.
31-
extern void* __query_system_context_interface(const __uuid& __id) noexcept;
32-
3334
namespace exec::system_context_replaceability {
3435

35-
//! Helper for the `__queryable_interface` concept.
36+
/// Helper for the `__queryable_interface` concept.
3637
template <__uuid X>
3738
using __check_constexpr_uuid = void;
3839

39-
//! Concept for a queryable interface. Ensures that the interface has a `__interface_identifier`
40-
//! member.
40+
/// Concept for a queryable interface. Ensures that the interface has a `__interface_identifier` member.
4141
template <typename _T>
4242
concept __queryable_interface =
4343
requires() { typename __check_constexpr_uuid<_T::__interface_identifier>; };
4444

45+
/// The details for making `_T` a runtime property.
46+
template <typename _T>
47+
struct __runtime_property_helper {
48+
/// Is `_T` a property?
49+
static constexpr bool __is_property = false;
50+
/// The unique identifier for the property.
51+
static constexpr __uuid __property_identifier{0, 0};
52+
};
53+
54+
/// `inplace_stope_token` is a runtime property.
55+
template <>
56+
struct __runtime_property_helper<stdexec::inplace_stop_token> {
57+
static constexpr bool __is_property = true;
58+
static constexpr __uuid __property_identifier{0x8779c09d8aa249df, 0x867db0e653202604};
59+
};
60+
61+
/// Concept for a runtime property.
62+
template <typename _T>
63+
concept __runtime_property = __runtime_property_helper<_T>::__is_property;
64+
4565
/// Query the system context for an interface of type `_Interface`.
4666
template <__queryable_interface _Interface>
47-
inline _Interface* query_system_context() {
48-
return static_cast<_Interface*>(
49-
__query_system_context_interface(_Interface::__interface_identifier));
50-
}
67+
extern std::shared_ptr<_Interface> query_system_context();
68+
69+
/// The type of a factory that can create interfaces of type `_Interface`.
70+
template <__queryable_interface _Interface>
71+
using __system_context_backend_factory = std::shared_ptr<_Interface> (*)();
72+
73+
/// Sets the factory that creates the system context backend for an interface of type `_Interface`.
74+
template <__queryable_interface _Interface>
75+
extern __system_context_backend_factory<_Interface>
76+
set_system_context_backend_factory(__system_context_backend_factory<_Interface> __new_factory);
5177

5278
/// Interface for completing a sender operation. Backend will call frontend though this interface
5379
/// for completing the `schedule` and `schedule_bulk` operations.
5480
struct receiver {
5581
virtual ~receiver() = default;
5682

83+
protected:
84+
virtual bool __query_env(__uuid, void*) noexcept = 0;
85+
86+
public:
5787
/// Called when the system scheduler completes successfully.
5888
virtual void set_value() noexcept = 0;
5989
/// Called when the system scheduler completes with an error.
6090
virtual void set_error(std::exception_ptr) noexcept = 0;
6191
/// Called when the system scheduler was stopped.
6292
virtual void set_stopped() noexcept = 0;
93+
94+
/// Query the receiver for a property of type `_P`.
95+
template <typename _P>
96+
std::optional<std::decay_t<_P>> try_query() noexcept {
97+
if constexpr (__runtime_property<_P>) {
98+
std::decay_t<_P> __p;
99+
bool __success =
100+
__query_env(__runtime_property_helper<std::decay_t<_P>>::__property_identifier, &__p);
101+
return __success ? std::make_optional(std::move(__p)) : std::nullopt;
102+
} else {
103+
return std::nullopt;
104+
}
105+
}
63106
};
64107

65108
/// Receiver for bulk sheduling operations.
@@ -90,15 +133,6 @@ namespace exec::system_context_replaceability {
90133
bulk_schedule(std::uint32_t __n, storage __s, bulk_item_receiver* __r) noexcept = 0;
91134
};
92135

93-
/// Implementation-defined mechanism for replacing the system scheduler backend at run-time.
94-
struct __system_context_replaceability {
95-
static constexpr __uuid __interface_identifier{0xc008a3be3bb9284b, 0xb98edb3a740ee02c};
96-
97-
/// Globally replaces the system scheduler backend.
98-
/// This needs to be called within `main()` and before the system scheduler is accessed.
99-
virtual void __set_system_scheduler(system_scheduler*) noexcept = 0;
100-
};
101-
102136
} // namespace exec::system_context_replaceability
103137

104138
#endif

0 commit comments

Comments
 (0)