Replace Rayon with our own, non-workstealing thread pool#910
Conversation
|
Starting off as a draft as there are some more cleanups I'd like to look into |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #910 +/- ##
==========================================
- Coverage 86.68% 86.35% -0.33%
==========================================
Files 90 92 +2
Lines 27088 27587 +499
==========================================
+ Hits 23480 23822 +342
- Misses 3608 3765 +157 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Merging this PR will degrade performance by 16.41%
|
| Mode | Benchmark | BASE |
HEAD |
Efficiency | |
|---|---|---|---|---|---|
| ❌ | Simulation | rust_rule_match_with_serialize[rule_run_1] |
287.8 ms | 389.9 ms | -26.19% |
| ❌ | Simulation | rust_rule_tableaction_hot_path[facts50000_funcs200] |
24.7 ms | 26.1 ms | -5.35% |
Tip
Investigate this regression by commenting @codspeedbot fix this regression on this PR, or directly use the CodSpeed MCP with your agent.
Comparing ezr-replace-rayon (b4b64ce) with main (42a4fb6)
Footnotes
-
226 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports. ↩
|
I made progress here and got some but not all of the overhead down. I think that the remaining overhead can be fixed by changing how materialization works: I think not using work-stealing is causing more contention over the DashMap when doing a scoped materialization. I think we can probably switch away from dashmap for this step entirely though. Thoughts on submitting this for now and then continuing to optimize the concurrency of the tree decomp stuff later? |
Add a Divan microbenchmark comparing single-threaded summation, Rayon parallel iterator summation, and the new scoped ThreadPool with varying chunk sizes. On a 256M-element vector with 8 worker threads, the thread pool is much slower for tiny chunks due to channel/task overhead, but catches up once chunks are large enough. Around 64K elements per chunk it approaches Rayon performance, and around 256K-1M elements per chunk it can match or slightly exceed the Rayon baseline on the measured run.
3b0d35d to
af324bc
Compare
| fn default_vec<T: Default>(len: usize) -> Vec<T> { | ||
| vec_with(len, T::default) | ||
| } | ||
|
|
||
| fn vec_with<T>(len: usize, init: impl FnMut() -> T) -> Vec<T> { | ||
| let mut values = Vec::with_capacity(len); | ||
| values.resize_with(len, init); | ||
| values |
There was a problem hiding this comment.
These aren't parallel specific just helpers?
| /// Return a copy of this e-graph configured with a different thread count. | ||
| pub fn with_num_threads(mut self, threads: usize) -> Self { | ||
| self.set_num_threads(threads); | ||
| self | ||
| } | ||
|
|
||
| /// Set the number of worker threads used by this e-graph. | ||
| /// | ||
| /// Passing `1` disables the pool. Passing `0` uses available parallelism. | ||
| pub fn set_num_threads(&mut self, threads: usize) { | ||
| let threads = normalize_thread_count(threads); | ||
| self.threads = threads; | ||
| self.thread_pool = (threads > 1).then(|| Arc::new(ThreadPool::new(threads))); | ||
| } |
There was a problem hiding this comment.
Cool so we can change the number of threads on the fly? That's nice
|
@coderabbitai full review |
✅ Action performedFull review finished. |
📝 WalkthroughSummary by CodeRabbit
WalkthroughThis PR replaces global Rayon thread-pool usage with a new custom ChangesRayon → egglog_concurrency ThreadPool migration
Sequence Diagram(s)sequenceDiagram
participant Caller
participant EGraph
participant ThreadPool
participant Scope
participant WorkerThread
Caller->>EGraph: new(num_threads) or set_num_threads()
Caller->>EGraph: run_rules() or flush_updates()
EGraph->>ThreadPool: install_thread_pool(f)
ThreadPool->>Scope: scope(f)
Scope->>WorkerThread: enqueue jobs via crossbeam channel
WorkerThread->>Scope: execute and record completion in AtomicCounts
alt inline help depth within limit
Scope->>Scope: drain queued jobs inline while waiting
else depth exceeded
Scope->>WorkerThread: spawn BackupWorker thread
end
Scope-->>ThreadPool: signal done channel
ThreadPool-->>EGraph: return result
EGraph-->>Caller: return
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
egglog-bridge/src/lib.rs (1)
1017-1029: 🚀 Performance & Scalability | 🟠 Major | ⚡ Quick winWrap
with_execution_state_trackedin the configured pool too.
with_execution_statenow installs the pool, but the tracked variant still bypasses it. PublicEGraph::readandupdate_uncheckeduse the tracked path, so those callbacks ignore the per-instance thread setting.Proposed fix
pub fn with_execution_state_tracked<R>( &self, f: impl FnOnce(&mut ExecutionState<'_>) -> R, ) -> (R, bool) { - self.db.with_execution_state_tracked(f) + let thread_pool = self.thread_pool(); + install_thread_pool(thread_pool, || self.db.with_execution_state_tracked(f)) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@egglog-bridge/src/lib.rs` around lines 1017 - 1029, `with_execution_state_tracked` in `EGraph` still bypasses the instance thread pool, unlike `with_execution_state`, so callbacks from `read` and `update_unchecked` ignore the configured pool. Update `with_execution_state_tracked` to install the thread pool around the `self.db.with_execution_state_tracked(f)` call, using the same `self.thread_pool()` and `install_thread_pool` pattern as `with_execution_state`, so both execution paths honor the per-instance thread setting.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@CHANGELOG.md`:
- Line 6: Update the CHANGELOG entry for the new `egglog-concurrency` scoped
`ThreadPool` support to also mention the thread-configuration migration from
global Rayon setup to per-`EGraph` `with_num_threads` / `set_num_threads`. Keep
the bullet concise but make the upgrade path explicit so users can see that
thread control now belongs on `EGraph` rather than global configuration.
In `@concurrency/benches/sum_vector.rs`:
- Around line 82-93: The `threadpool_chunked_sum` benchmark is still using a
shared `AtomicU64`, which adds contention and makes the comparison with
`rayon_sum` unfair. Update the benchmark to avoid shared atomic accumulation in
the `pool.scope`/`scope.spawn` path and use the same reduction-style aggregation
approach as the Rayon benchmark. Keep the existing symbols like
`threadpool_chunked_sum`, `pool.scope`, and `sum_slice` so the benchmark
isolates scheduler overhead instead of atomic contention.
In `@core-relations/src/hash_index/mod.rs`:
- Around line 948-952: The global INDEX_THREAD_POOL in run_in_index_thread_pool
currently freezes its worker count on first use, which can override later
EGraph-specific thread settings and keep worker-local pools alive too long.
Change the helper so the pool is created per installed thread count or tied to
the owning EGraph lifecycle instead of a single global Lazy<ThreadPool>. Keep
the thread-count source in parallel::current_num_threads() or the EGraph-owned
configuration, and update run_in_index_thread_pool and INDEX_THREAD_POOL
accordingly.
In `@core-relations/src/parallel.rs`:
- Around line 84-118: The parallel helpers are eagerly initializing every result
slot before worker threads overwrite them, which adds unnecessary setup cost and
forces extra trait bounds. Update `map_mut_with` and the related `map`/`map_mut`
path to store uninitialized or optional result slots and write each output
exactly once during processing, then collect the finalized values after the
parallel scope completes. Keep the fix centered around the existing
`map_mut_with` helper and any shared result-allocation logic so callers like
`parallel_rehash` and `parallel_insert` no longer pay for discarded initial
values, and remove the need for `R: Default` where it is only used for prefill.
In `@core-relations/src/row_buffer/mod.rs`:
- Around line 388-392: The TaggedRowBuffer::par_iter alias is misleading because
it now behaves sequentially, not in parallel. Update the API in TaggedRowBuffer
by either deprecating par_iter and steering callers to iter, or renaming the
method to iter so the name matches the behavior. Keep the existing iter
implementation as the single source of truth and adjust any related
docs/comments around TaggedRowBuffer::par_iter accordingly.
In `@egglog-bridge/src/lib.rs`:
- Around line 163-168: The None branch in install_thread_pool currently just
calls f(), which leaves any outer CURRENT_POOL in place and can make serial
EGraphs observe the wrong thread count. Update install_thread_pool so the None
case runs f() under a cleared/no-current-pool context rather than inheriting an
ambient pool, while keeping the Some(thread_pool) path using
ThreadPool::install.
---
Outside diff comments:
In `@egglog-bridge/src/lib.rs`:
- Around line 1017-1029: `with_execution_state_tracked` in `EGraph` still
bypasses the instance thread pool, unlike `with_execution_state`, so callbacks
from `read` and `update_unchecked` ignore the configured pool. Update
`with_execution_state_tracked` to install the thread pool around the
`self.db.with_execution_state_tracked(f)` call, using the same
`self.thread_pool()` and `install_thread_pool` pattern as
`with_execution_state`, so both execution paths honor the per-instance thread
setting.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 265ca948-4bb6-4ece-9adf-a4e00593a87a
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locktests/cykjson.eggis excluded by!**/*.egg
📒 Files selected for processing (29)
CHANGELOG.mdCargo.tomlbenches/common.rsbenches/rust_api_benchmarking.rsconcurrency/Cargo.tomlconcurrency/benches/sum_vector.rsconcurrency/src/lib.rsconcurrency/src/threadpool/mod.rsconcurrency/src/threadpool/tests.rscore-relations/Cargo.tomlcore-relations/src/containers/mod.rscore-relations/src/free_join/execute.rscore-relations/src/free_join/mod.rscore-relations/src/hash_index/mod.rscore-relations/src/lib.rscore-relations/src/parallel.rscore-relations/src/parallel_heuristics.rscore-relations/src/row_buffer/mod.rscore-relations/src/table/mod.rscore-relations/src/table/rebuild.rscore-relations/src/table/sharded_hash_table.rscore-relations/src/tests.rsegglog-bridge/Cargo.tomlegglog-bridge/src/lib.rsnumeric-id/Cargo.tomlnumeric-id/src/lib.rssrc/cli.rssrc/lib.rstests/files.rs
💤 Files with no reviewable changes (2)
- numeric-id/Cargo.toml
- Cargo.toml
📜 Review details
⚠️ CI failures not shown inline (29)
GitHub Actions: Build / 0_benchmark (ubuntu-latest, rust_rule_tableaction_hot_path).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 26_wasm.txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 3_benchmark (ubuntu-latest, proof_testing_math).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 4_benchmark (ubuntu-latest, proof_testing_unify).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 1_benchmark (ubuntu-latest, proof_testing_typecheck).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 2_benchmark (ubuntu-latest, rust_rule_fib).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 5_benchmark (ubuntu-latest, rust_rule_match_overhead).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 6_benchmark (ubuntu-latest, rectangle).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 7_benchmark (ubuntu-latest, factoring-multisets).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 9_benchmark (ubuntu-latest, proof_testing_eqsat-basic).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 14_benchmark (ubuntu-latest, math-microbenchmark).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 11_benchmark (ubuntu-latest, eggcc-2mm).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 15_benchmark (ubuntu-latest, conv1d_32).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 12_benchmark (ubuntu-latest, conv1d_128).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 13_benchmark (ubuntu-latest, math_normal).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 10_benchmark (ubuntu-latest, rust_rule_match_with_serialize).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 16_benchmark (ubuntu-latest, stresstest_large_expr).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 18_benchmark (ubuntu-latest, typeinfer).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 17_benchmark (ubuntu-latest, luminal-llama).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 19_benchmark (ubuntu-latest, python_array_optimize).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 20_benchmark (ubuntu-latest, repro-665-set-union).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 21_benchmark (ubuntu-latest, extract-vec-bench).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 22_benchmark (ubuntu-latest, cykjson).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 25_benchmark (ubuntu-latest, taylor51).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 23_benchmark (ubuntu-latest, herbie).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 24_benchmark (ubuntu-latest, eggcc-extraction).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 8_benchmark (ubuntu-latest, rust_rule_insert_loop).txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 28_coverage.txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
GitHub Actions: Build / 29_test.txt: Replace Rayon with our own, non-workstealing thread pool
Conclusion: failure
##[group]Run bail() {
�[36;1mbail() {�[0m
�[36;1m printf '::error::install-action: %s\n' "$*"�[0m
🧰 Additional context used
📓 Path-based instructions (1)
CHANGELOG.md
📄 CodeRabbit inference engine (CLAUDE.md)
Update
CHANGELOG.mdwith a concise bullet when making major changes such as breaking changes or new features.
Files:
CHANGELOG.md
🪛 LanguageTool
CHANGELOG.md
[grammar] ~6-~6: Use a hyphen to join words.
Context: ...push/pop. - Add egglog-concurrency scoped ThreadPool support backed by a ...
(QB_NEW_EN_HYPHEN)
🔇 Additional comments (23)
egglog-bridge/Cargo.toml (1)
13-13: LGTM!egglog-bridge/src/lib.rs (1)
26-26: LGTM!Also applies to: 137-161, 170-250, 252-274, 606-614, 660-661, 700-700, 1035-1039
src/lib.rs (1)
500-507: LGTM!Also applies to: 532-532, 541-541, 554-573
src/cli.rs (1)
7-7: LGTM!Also applies to: 94-95
benches/common.rs (1)
2-6: LGTM!Also applies to: 36-37, 79-80, 108-110
benches/rust_api_benchmarking.rs (1)
21-24: LGTM!Also applies to: 116-120, 156-160, 291-302, 331-334
tests/files.rs (1)
104-113: LGTM!Also applies to: 194-194
concurrency/Cargo.toml (1)
14-14: LGTM!Also applies to: 27-29
concurrency/src/lib.rs (1)
10-10: LGTM!Also applies to: 20-20
concurrency/src/threadpool/tests.rs (1)
1-585: LGTM!core-relations/src/lib.rs (1)
13-13: LGTM!core-relations/Cargo.toml (1)
39-39: LGTM!core-relations/src/parallel.rs (1)
12-37: LGTM!Also applies to: 49-73, 120-144
numeric-id/src/lib.rs (1)
163-165: LGTM!Also applies to: 405-407
core-relations/src/containers/mod.rs (1)
26-26: LGTM!Also applies to: 177-190, 582-679
core-relations/src/table/mod.rs (1)
30-30: LGTM!Also applies to: 595-628, 826-961, 1066-1110, 1145-1175
core-relations/src/table/rebuild.rs (1)
13-13: LGTM!Also applies to: 142-163, 198-224
core-relations/src/table/sharded_hash_table.rs (1)
16-20: LGTM!core-relations/src/parallel_heuristics.rs (1)
1-114: LGTM!core-relations/src/free_join/execute.rs (1)
19-19: LGTM!Also applies to: 34-34, 363-363, 440-440, 645-652, 990-990, 1069-1069, 1808-1808, 1903-1914, 1946-1958, 1983-1997, 2179-2179
core-relations/src/free_join/mod.rs (1)
27-27: LGTM!Also applies to: 317-318, 420-426, 563-568, 835-836
core-relations/src/hash_index/mod.rs (1)
6-23: LGTM!Also applies to: 293-310, 635-651
core-relations/src/tests.rs (1)
32-37: LGTM!
| ## [Unreleased] - ReleaseDate | ||
|
|
||
| - Add typed `EGraph` extension state that clones with `EGraph` and is restored by `push`/`pop`. | ||
| - Add `egglog-concurrency` scoped `ThreadPool` support backed by a crossbeam channel. |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win
Document the thread-configuration migration here.
This entry mentions the new pool, but the user-facing change is also that thread control moved off the global Rayon setup and onto per-EGraph with_num_threads / set_num_threads. Please fold that into the bullet so the upgrade path is visible from the changelog.
Suggested wording
-- Add `egglog-concurrency` scoped `ThreadPool` support backed by a crossbeam channel.
+- Replace the global Rayon thread pool with an `egglog-concurrency` scoped `ThreadPool`; configure parallelism per `EGraph` via `with_num_threads` / `set_num_threads`.As per coding guidelines, CHANGELOG.md: Update CHANGELOG.md with a concise bullet when making major changes such as breaking changes or new features.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| - Add `egglog-concurrency` scoped `ThreadPool` support backed by a crossbeam channel. | |
| - Replace the global Rayon thread pool with an `egglog-concurrency` scoped `ThreadPool`; configure parallelism per `EGraph` via `with_num_threads` / `set_num_threads`. |
🧰 Tools
🪛 LanguageTool
[grammar] ~6-~6: Use a hyphen to join words.
Context: ...push/pop. - Add egglog-concurrency scoped ThreadPool support backed by a ...
(QB_NEW_EN_HYPHEN)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@CHANGELOG.md` at line 6, Update the CHANGELOG entry for the new
`egglog-concurrency` scoped `ThreadPool` support to also mention the
thread-configuration migration from global Rayon setup to per-`EGraph`
`with_num_threads` / `set_num_threads`. Keep the bullet concise but make the
upgrade path explicit so users can see that thread control now belongs on
`EGraph` rather than global configuration.
Source: Coding guidelines
| let total = AtomicU64::new(0); | ||
| pool.scope(|scope| { | ||
| for chunk in divan::black_box(data).chunks(CHUNK) { | ||
| let total = &total; | ||
| scope.spawn(move |_| { | ||
| total.fetch_add(sum_slice(chunk), Ordering::Relaxed); | ||
| }); | ||
| } | ||
| }); | ||
|
|
||
| let sum = total.load(Ordering::Relaxed); | ||
| assert_eq!(sum, expected); |
There was a problem hiding this comment.
🚀 Performance & Scalability | 🟠 Major | ⚡ Quick win
Remove the shared atomic from this comparison.
threadpool_chunked_sum is measuring queueing and contention on a single AtomicU64, while rayon_sum uses a reduction. That makes the custom pool look slower even when the scheduler cost is identical, so this benchmark is not isolating the Rayon→ThreadPool change.
♻️ Suggested rewrite
- let total = AtomicU64::new(0);
+ let mut partials = vec![0_u64; data.len().div_ceil(CHUNK)];
pool.scope(|scope| {
- for chunk in divan::black_box(data).chunks(CHUNK) {
- let total = &total;
+ for (slot, chunk) in partials
+ .iter_mut()
+ .zip(divan::black_box(data).chunks(CHUNK))
+ {
scope.spawn(move |_| {
- total.fetch_add(sum_slice(chunk), Ordering::Relaxed);
+ *slot = sum_slice(chunk);
});
}
});
- let sum = total.load(Ordering::Relaxed);
+ let sum: u64 = partials.into_iter().sum();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let total = AtomicU64::new(0); | |
| pool.scope(|scope| { | |
| for chunk in divan::black_box(data).chunks(CHUNK) { | |
| let total = &total; | |
| scope.spawn(move |_| { | |
| total.fetch_add(sum_slice(chunk), Ordering::Relaxed); | |
| }); | |
| } | |
| }); | |
| let sum = total.load(Ordering::Relaxed); | |
| assert_eq!(sum, expected); | |
| let mut partials = vec![0_u64; data.len().div_ceil(CHUNK)]; | |
| pool.scope(|scope| { | |
| for (slot, chunk) in partials | |
| .iter_mut() | |
| .zip(divan::black_box(data).chunks(CHUNK)) | |
| { | |
| scope.spawn(move |_| { | |
| *slot = sum_slice(chunk); | |
| }); | |
| } | |
| }); | |
| let sum: u64 = partials.into_iter().sum(); | |
| assert_eq!(sum, expected); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@concurrency/benches/sum_vector.rs` around lines 82 - 93, The
`threadpool_chunked_sum` benchmark is still using a shared `AtomicU64`, which
adds contention and makes the comparison with `rayon_sum` unfair. Update the
benchmark to avoid shared atomic accumulation in the `pool.scope`/`scope.spawn`
path and use the same reduction-style aggregation approach as the Rayon
benchmark. Keep the existing symbols like `threadpool_chunked_sum`,
`pool.scope`, and `sum_slice` so the benchmark isolates scheduler overhead
instead of atomic contention.
| static INDEX_THREAD_POOL: Lazy<ThreadPool> = | ||
| Lazy::new(|| ThreadPool::new(parallel::current_num_threads().max(1))); | ||
|
|
||
| fn run_in_index_thread_pool<R>(f: impl FnOnce() -> R) -> R { | ||
| INDEX_THREAD_POOL.install(f) |
There was a problem hiding this comment.
🚀 Performance & Scalability | 🟠 Major | 🏗️ Heavy lift
Do not freeze the index pool size on first use.
INDEX_THREAD_POOL is global, but its size is derived from the first caller’s installed pool. That can make later EGraphs ignore their configured thread count, and the global workers also keep worker-local pools alive for the process lifetime. Size this helper per installed thread count or attach it to the EGraph-owned pool lifecycle.
Possible minimal direction
-static INDEX_THREAD_POOL: Lazy<ThreadPool> =
- Lazy::new(|| ThreadPool::new(parallel::current_num_threads().max(1)));
-
fn run_in_index_thread_pool<R>(f: impl FnOnce() -> R) -> R {
- INDEX_THREAD_POOL.install(f)
+ let thread_count = parallel::current_num_threads().max(1);
+ let pool = ThreadPool::new(thread_count);
+ pool.install(f)
}A cached per-thread-count or per-EGraph pool would avoid repeated thread creation while preserving the per-instance contract.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| static INDEX_THREAD_POOL: Lazy<ThreadPool> = | |
| Lazy::new(|| ThreadPool::new(parallel::current_num_threads().max(1))); | |
| fn run_in_index_thread_pool<R>(f: impl FnOnce() -> R) -> R { | |
| INDEX_THREAD_POOL.install(f) | |
| fn run_in_index_thread_pool<R>(f: impl FnOnce() -> R) -> R { | |
| let thread_count = parallel::current_num_threads().max(1); | |
| let pool = ThreadPool::new(thread_count); | |
| pool.install(f) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core-relations/src/hash_index/mod.rs` around lines 948 - 952, The global
INDEX_THREAD_POOL in run_in_index_thread_pool currently freezes its worker count
on first use, which can override later EGraph-specific thread settings and keep
worker-local pools alive too long. Change the helper so the pool is created per
installed thread count or tied to the owning EGraph lifecycle instead of a
single global Lazy<ThreadPool>. Keep the thread-count source in
parallel::current_num_threads() or the EGraph-owned configuration, and update
run_in_index_thread_pool and INDEX_THREAD_POOL accordingly.
| pub(crate) fn map_mut_with<T, R, Init, F>(items: &mut [T], init: Init, f: F) -> Vec<R> | ||
| where | ||
| T: Send, | ||
| R: Send, | ||
| Init: FnMut() -> R, | ||
| F: Fn(usize, &mut T) -> R + Sync, | ||
| { | ||
| if !enabled_for_len(items.len()) { | ||
| return items | ||
| .iter_mut() | ||
| .enumerate() | ||
| .map(|(index, item)| f(index, item)) | ||
| .collect(); | ||
| } | ||
|
|
||
| let chunk_len = chunk_len(items.len()); | ||
| let mut results = vec_with(items.len(), init); | ||
| egglog_concurrency::scope(|scope| { | ||
| let f = &f; | ||
| for (chunk_index, (chunk, out)) in items | ||
| .chunks_mut(chunk_len) | ||
| .zip(results.chunks_mut(chunk_len)) | ||
| .enumerate() | ||
| { | ||
| let base = chunk_index * chunk_len; | ||
| scope.spawn(move |_| { | ||
| for (offset, item) in chunk.iter_mut().enumerate() { | ||
| out[offset] = f(base + offset, item); | ||
| } | ||
| }); | ||
| } | ||
| }); | ||
|
|
||
| results | ||
| } |
There was a problem hiding this comment.
🚀 Performance & Scalability | 🔵 Trivial | 🏗️ Heavy lift
Avoid pre-initializing parallel result slots that are immediately overwritten.
map_mut_with and map allocate one initialized result per input before workers overwrite every slot. That makes current callsites pay unnecessary setup costs, e.g. default pooled histograms in parallel_rehash and discarded checker clones in parallel_insert. Consider using Vec<Option<R>> or MaybeUninit slots so results are initialized exactly once and map/map_mut no longer require R: Default.
Also applies to: 146-179
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core-relations/src/parallel.rs` around lines 84 - 118, The parallel helpers
are eagerly initializing every result slot before worker threads overwrite them,
which adds unnecessary setup cost and forces extra trait bounds. Update
`map_mut_with` and the related `map`/`map_mut` path to store uninitialized or
optional result slots and write each output exactly once during processing, then
collect the finalized values after the parallel scope completes. Keep the fix
centered around the existing `map_mut_with` helper and any shared
result-allocation logic so callers like `parallel_rehash` and `parallel_insert`
no longer pay for discarded initial values, and remove the need for `R: Default`
where it is only used for prefill.
| /// Iterate over the contents of the buffer. | ||
| /// | ||
| /// This is a compatibility alias for [`TaggedRowBuffer::iter`]. | ||
| pub fn par_iter(&self) -> impl Iterator<Item = (RowId, &[Value])> { | ||
| self.iter() |
There was a problem hiding this comment.
🚀 Performance & Scalability | 🔵 Trivial | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify remaining call sites so they can be renamed or intentionally accepted.
rg -n -C3 '\.par_iter\s*\(' .Repository: egraphs-good/egglog
Length of output: 670
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '== TaggedRowBuffer par_iter implementation ==\n'
sed -n '360,410p' core-relations/src/row_buffer/mod.rs
printf '\n== All par_iter definitions/usages in repository ==\n'
rg -n -C2 '\bpar_iter\b' core-relations concurrency . --glob '!target' --glob '!node_modules'
printf '\n== Benchmark context ==\n'
sed -n '1,120p' concurrency/benches/sum_vector.rsRepository: egraphs-good/egglog
Length of output: 6573
Deprecate TaggedRowBuffer::par_iter or rename it to iter. The alias is sequential now, but the name still suggests parallelism and can mislead callers.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@core-relations/src/row_buffer/mod.rs` around lines 388 - 392, The
TaggedRowBuffer::par_iter alias is misleading because it now behaves
sequentially, not in parallel. Update the API in TaggedRowBuffer by either
deprecating par_iter and steering callers to iter, or renaming the method to
iter so the name matches the behavior. Keep the existing iter implementation as
the single source of truth and adjust any related docs/comments around
TaggedRowBuffer::par_iter accordingly.
| fn install_thread_pool<R>(thread_pool: Option<Arc<ThreadPool>>, f: impl FnOnce() -> R) -> R { | ||
| match thread_pool { | ||
| Some(thread_pool) => thread_pool.install(f), | ||
| None => f(), | ||
| } | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | 🏗️ Heavy lift
Clear the ambient pool for serial EGraphs.
None => f() preserves any outer CURRENT_POOL, so an EGraph configured with 1 thread can still observe current_num_threads() > 1 when called under another installed pool. Add a serial/no-current-pool installation path for the None branch.
Possible direction
fn install_thread_pool<R>(thread_pool: Option<Arc<ThreadPool>>, f: impl FnOnce() -> R) -> R {
match thread_pool {
Some(thread_pool) => thread_pool.install(f),
- None => f(),
+ None => egglog_concurrency::without_current_pool(f),
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn install_thread_pool<R>(thread_pool: Option<Arc<ThreadPool>>, f: impl FnOnce() -> R) -> R { | |
| match thread_pool { | |
| Some(thread_pool) => thread_pool.install(f), | |
| None => f(), | |
| } | |
| } | |
| fn install_thread_pool<R>(thread_pool: Option<Arc<ThreadPool>>, f: impl FnOnce() -> R) -> R { | |
| match thread_pool { | |
| Some(thread_pool) => thread_pool.install(f), | |
| None => egglog_concurrency::without_current_pool(f), | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@egglog-bridge/src/lib.rs` around lines 163 - 168, The None branch in
install_thread_pool currently just calls f(), which leaves any outer
CURRENT_POOL in place and can make serial EGraphs observe the wrong thread
count. Update install_thread_pool so the None case runs f() under a
cleared/no-current-pool context rather than inheriting an ambient pool, while
keeping the Some(thread_pool) path using ThreadPool::install.
|
I asked my agent for a more holistic review of this feature, and was curious about this option of just using a per-egraph rayon threadpool (instead of the global one) to avoid the issue we had about the global setup... That would keep the work stealing aspect, which it wasn't clear was a problem and slowed things down in some cases? Or are there other reasons to avoid work stealing? You know generally in favor of keeping as little LoC as possible lol but I imagine you have thought about this more and have a good reason for moving away from Rayon. |




Remove the reliance on the rayon global thread pool, while maintaining single and multi-threaded performance in the
mathandcykjsonmicrobenchmarks.This should make us a lot more resilient to issues where parallelism is enabled by accident.