Skip to content

Commit a32d90a

Browse files
committed
feat: add rayon-compat crate
1 parent 96104b2 commit a32d90a

11 files changed

Lines changed: 411 additions & 105 deletions

File tree

CHANGELOG.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,24 @@ This project is currently in early [pre-release], and there may be arbitrary bre
1313

1414
## [Unreleased]
1515

16+
### Added
17+
18+
- `rayon-compat` crate, for running rayon on top of forte.
19+
- `Worker::migrated()` which is `true` when the current job has moved between threads.
20+
21+
### Changed
22+
23+
- Heartbeat frequency is now 100 microseconds.
24+
- Heartbeats are now more evenly distributed between workers.
25+
- The heartbeat thread goes to sleep when the pool is not in use.
26+
- Shared work is now queued and claimed in the order it was shared.
27+
- The `Scope` API is now identical to `rayon-core` and does not use `Pin<T>`.
28+
- `Scope::new`, `Scope::add_reference` and `Scope::remove_reference` are now private.
29+
30+
### Security
31+
32+
- Forte now implements exception safety. Panics can no longer cause UB.
33+
1634
## [1.0.0-alpha.3]
1735

1836
### Added

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ repository = "https://github.com/NthTensor/Forte"
88

99
[workspace]
1010
resolver = "2"
11-
members = ["ci"]
12-
exclude = ["coz"]
11+
members = ["ci", "rayon-compat"]
1312

1413
[dependencies]
1514
async-task = "4.7.1"

rayon-compat/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
[package]
2+
name = "rayon-compat"
3+
version = "1.12.1"
4+
edition = "2024"
5+
6+
[dependencies]
7+
forte = { path = ".." }
8+
9+
[features]
10+
web_spin_lock = []

rayon-compat/README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Rayon Compat
2+
3+
This is a way to run `rayon` on top of `forte`! The `rayon-compat` crate mocks the important bits of the api of `rayon_core` in a pretty simple and crude way, which is none-the-less enough to support most of what `rayon` needs.
4+
5+
To use this crate, apply the following cargo patch like one of these:
6+
```
7+
// If you want to clone forte and use it locally
8+
[patch.crates-io]
9+
rayon-core = { path = "path to this repo", package = "rayon-compat" }
10+
11+
// If you want to use the latest published version of forte
12+
[patch.crates-io]
13+
rayon-core = { path = "https://github.com/NthTensor/Forte", package = "rayon-compat" }
14+
```

rayon-compat/src/lib.rs

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
use std::sync::atomic::{AtomicBool, Ordering};
2+
3+
pub static THREAD_POOL: forte::ThreadPool = const { forte::ThreadPool::new() };
4+
5+
pub static STARTED: AtomicBool = const { AtomicBool::new(false) };
6+
7+
#[inline(always)]
8+
fn ensure_started() {
9+
if !STARTED.load(Ordering::Relaxed) && !STARTED.swap(true, Ordering::Relaxed) {
10+
THREAD_POOL.resize_to_available();
11+
}
12+
}
13+
14+
#[inline(always)]
15+
pub fn current_num_threads() -> usize {
16+
64 // Forte prefers smaller tasks, so it's better to lie to rayon about the size of the pool
17+
}
18+
19+
#[inline(always)]
20+
pub fn current_thread_index() -> Option<usize> {
21+
forte::Worker::map_current(|worker| worker.index())
22+
}
23+
24+
#[inline(always)]
25+
pub fn max_num_threads() -> usize {
26+
usize::MAX // The number of forte workers is only bounded by the size of a vector.
27+
}
28+
29+
// -----------------------------------------------------------------------------
30+
// Join
31+
32+
#[derive(Debug)]
33+
pub struct FnContext {
34+
/// True if the task was migrated.
35+
migrated: bool,
36+
}
37+
38+
impl FnContext {
39+
#[inline(always)]
40+
pub fn migrated(&self) -> bool {
41+
self.migrated
42+
}
43+
}
44+
45+
#[inline(always)]
46+
pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
47+
where
48+
A: FnOnce(FnContext) -> RA + Send,
49+
B: FnOnce(FnContext) -> RB + Send,
50+
RA: Send,
51+
RB: Send,
52+
{
53+
ensure_started();
54+
THREAD_POOL.join(
55+
|worker| {
56+
let migrated = worker.migrated();
57+
let ctx = FnContext { migrated };
58+
oper_a(ctx)
59+
},
60+
|worker| {
61+
let migrated = worker.migrated();
62+
let ctx = FnContext { migrated };
63+
oper_b(ctx)
64+
},
65+
)
66+
}
67+
68+
#[inline(always)]
69+
pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
70+
where
71+
A: FnOnce() -> RA + Send,
72+
B: FnOnce() -> RB + Send,
73+
RA: Send,
74+
RB: Send,
75+
{
76+
ensure_started();
77+
THREAD_POOL.join(|_| oper_a(), |_| oper_b())
78+
}
79+
80+
// -----------------------------------------------------------------------------
81+
// Scope
82+
83+
pub use forte::Scope;
84+
85+
#[inline(always)]
86+
pub fn scope<'scope, OP, R>(op: OP) -> R
87+
where
88+
OP: FnOnce(&Scope<'scope>) -> R + Send,
89+
R: Send,
90+
{
91+
ensure_started();
92+
forte::scope(op)
93+
}
94+
95+
#[inline(always)]
96+
pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
97+
where
98+
OP: FnOnce(&Scope<'scope>) -> R,
99+
{
100+
ensure_started();
101+
forte::scope(op)
102+
}
103+
104+
// -----------------------------------------------------------------------------
105+
// Spawn
106+
107+
#[inline(always)]
108+
pub fn spawn<F>(func: F)
109+
where
110+
F: FnOnce() + Send + 'static,
111+
{
112+
ensure_started();
113+
THREAD_POOL.spawn(|_| func())
114+
}
115+
116+
// -----------------------------------------------------------------------------
117+
// Yield
118+
119+
pub use forte::Yield;
120+
121+
pub fn yield_local() -> Yield {
122+
let result = forte::Worker::map_current(forte::Worker::yield_local);
123+
match result {
124+
Some(status) => status,
125+
_ => Yield::Idle,
126+
}
127+
}
128+
129+
pub fn yield_now() -> Yield {
130+
let result = forte::Worker::map_current(forte::Worker::yield_now);
131+
match result {
132+
Some(status) => status,
133+
_ => Yield::Idle,
134+
}
135+
}
136+
137+
// -----------------------------------------------------------------------------
138+
// Fake stuff that dosn't work. These are here only so so that rayon can export
139+
// them.
140+
141+
pub struct ThreadBuilder;
142+
143+
pub struct ThreadPool;
144+
145+
pub struct ThreadPoolBuildError;
146+
147+
pub struct ThreadPoolBuilder;
148+
149+
pub struct BroadcastContext;
150+
151+
pub struct ScopeFifo;
152+
153+
pub fn broadcast() {
154+
unimplemented!()
155+
}
156+
157+
pub fn spawn_broadcast() {
158+
unimplemented!()
159+
}
160+
161+
pub fn scope_fifo() {
162+
unimplemented!()
163+
}
164+
165+
pub fn in_place_scope_fifo() {
166+
unimplemented!()
167+
}
168+
169+
pub fn spawn_fifo() {
170+
unimplemented!()
171+
}

src/job.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ use alloc::collections::VecDeque;
1616
use core::cell::UnsafeCell;
1717
use core::mem::ManuallyDrop;
1818
use core::ptr::NonNull;
19+
use std::thread::Result as ThreadResult;
1920

2021
use crate::signal::Signal;
2122
use crate::thread_pool::Worker;
23+
use crate::unwind;
2224

2325
// -----------------------------------------------------------------------------
2426
// Runnable
@@ -152,7 +154,7 @@ impl JobQueue {
152154
/// This is analogous to the chili type `JobStack` and the rayon type `StackJob`.
153155
pub struct StackJob<F, T> {
154156
f: UnsafeCell<ManuallyDrop<F>>,
155-
signal: Signal<T>,
157+
signal: Signal<ThreadResult<T>>,
156158
}
157159

158160
impl<F, T> StackJob<F, T>
@@ -210,7 +212,7 @@ where
210212
/// closure's return value is sent over this signal after the job is
211213
/// executed.
212214
#[inline(always)]
213-
pub fn signal(&self) -> &Signal<T> {
215+
pub fn signal(&self) -> &Signal<ThreadResult<T>> {
214216
&self.signal
215217
}
216218
}
@@ -235,20 +237,25 @@ where
235237
// SAFETY: The caller ensures `this` can be converted into an immutable
236238
// reference.
237239
let this = unsafe { this.cast::<Self>().as_ref() };
240+
// Create an abort guard. If the closure panics, this will convert the
241+
// panic into an abort. Doing so prevents use-after-free for other elements of the stack.
242+
let abort_guard = unwind::AbortOnDrop;
238243
// SAFETY: This memory location is accessed only in this function and in
239244
// `unwrap`. The latter cannot have been called, because it drops the
240245
// stack job, so, since this function is called only once, we can
241246
// guarantee that we have exclusive access.
242247
let f_ref = unsafe { &mut *this.f.get() };
243248
// SAFETY: The caller ensures this function is called only once.
244249
let f = unsafe { ManuallyDrop::take(f_ref) };
245-
// Run the job.
246-
let result = f(worker);
250+
// Run the job. If the job panics, we propagate the panic back to the main thread.
251+
let result = unwind::halt_unwinding(|| f(worker));
247252
// SAFETY: This is valid for the access used by `send` because
248253
// `&this.signal` is an immutable reference to a `Signal`. Because
249254
// `send` is only called in this function, and this function is never
250255
// called again, `send` is never called again.
251256
unsafe { Signal::send(&this.signal, result) }
257+
// Forget the abort guard, re-enabling panics.
258+
core::mem::forget(abort_guard);
252259
}
253260
}
254261

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ mod job;
4040
mod scope;
4141
mod signal;
4242
mod thread_pool;
43+
mod unwind;
4344

4445
// -----------------------------------------------------------------------------
4546
// Top-level exports

0 commit comments

Comments
 (0)