Skip to content

Replace FLoops with manual Threads.@spawn#215

Open
devmotion wants to merge 1 commit intoJuliaML:mainfrom
devmotion:dmw/transducers_floops
Open

Replace FLoops with manual Threads.@spawn#215
devmotion wants to merge 1 commit intoJuliaML:mainfrom
devmotion:dmw/transducers_floops

Conversation

@devmotion
Copy link
Copy Markdown

@devmotion devmotion commented May 6, 2026

Motivation

FLoops.jl and Transducers.jl are both registered out of the JuliaFolds2 organization, which forked the original JuliaFolds packages after upstream went silent (see JuliaFolds2/Transducers.jl#31, the issue that motivated the fork). The forks are alive but very low-velocity: JuliaFolds2/FLoops.jl has had 14 commits in 2025 (almost entirely automated CompatHelper / pkg-update PRs), and JuliaFolds2/Transducers.jl has had 1 commit in 2025 (housekeeping — drop Julia <1.10, remove Requires).

Removing this part of the stack from MLUtils has been on the radar for a while:

Other packages in the ecosystem have made the same call: AbstractMCMC.jl removed Transducers as a hard dep in TuringLang/AbstractMCMC.jl#201 (merged April 2026), citing 11 transitive dependencies eliminated.

In a fresh environment with this branch's MLUtils installed (61 packages resolved), adding FLoops brings the total to 85 packages — 24 added (about 20 non-stdlib): FLoops, FLoopsBase, Transducers, MLStyle, JuliaVariables, NameResolution, Setfield, ContextVariablesX, Accessors, BangBang, MicroCollections, SplittablesBase, InitialValues, DefineSingletons, Baselet, CompositionsBase, ConstructionBase, ArgCheck, PrettyPrint, InverseFunctions, plus a handful of stdlibs that Transducers pulls. Some of these may well already be loaded for other reasons in any given user's project, but they are not currently shared with anything else MLUtils depends on.

Approach

Replace FLoops.@floop ThreadedEx() for arg in argiter in Loader with a recursive divide-and-conquer using Threads.@spawn directly. The structure mirrors Transducers._reduce: at each level the right half is @spawn'd, the left half recurses on the current task, then wait on the right. basesize = length ÷ Threads.nthreads() matches the Transducers.ThreadedEx default; the base-case predicate uses max(basesize, 1) (also lifted from Transducers) so that numobs < nthreads cases terminate.

The outer dispatcher uses Threads.@spawn rather than @async to avoid making the calling task non-migratable for the lifetime of the iteration, per the warning in the @async docstring about library code.

basesize is currently computed inline. If there's interest, it could be exposed (as a kwarg on eachobsparallel / DataLoader) in a follow-up so users can tune task granularity per call without a major API change.

Also moves Transducers from a required dep to a weakdep + extension (ext/TransducersExt.jl), and bumps the minimum Julia version to 1.10 (needed for weakdeps; CI matrix updated to min-patch).

Alternatives considered

  • OhMyThreads.jl — the alternative explicitly raised in Floops depedency should be removed #175. More actively maintained, drop-in for the parallel dispatch, but its default chunking strategy differs from FLoops/Transducers — see the OhMyThreads column in the table below: TimeDataset n=10000, t=1e-3 goes from 3.06 s (master) to 4.31 s. Recovering parity needs explicit tuning of the number of tasks (chunking=false brings it to 2.85 s, but spawns one task per observation, which is its own footgun for large numobs). Going with manual Threads.@spawn keeps exactly the algorithm that was running underneath FLoops, i.e. characteristics don't change for users.
  • Keep FLoops, narrow the surface area — doesn't address the dependency-footprint or maintenance velocity concerns.

Benchmarks

Julia 1.12.6, 6 threads. Single trials; ~10% run-to-run variance on the TimeDataset rows.

Benchmark master (FLoops) This PR OhMyThreads default (for ref.)
TimeDataset n=100, t=0.1 1.37 s 1.33 s 1.79 s
TimeDataset n=10000, t=1e-3 3.06 s 2.84 s 4.31 s
TimeDataset n=10000, t=1e-4 3.12 s 2.85 s 4.40 s
TimeDataset n=10000, t=1e-5 3.17 s 2.85 s 4.48 s
CPUDataset n=1000, w=64 0.081 s 0.117 s 0.078 s
CPUDataset n=1000, w=256 0.201 s 0.182 s 0.163 s
ArrayDataset, buffer=true 0.241 s 0.106 s 0.141 s
TimeDataset bs=8 0.345 s 0.355 s 0.459 s
CPUDataset bs=8 0.033 s 0.088 s 0.027 s

Net: TimeDataset (representative of I/O-bound getobs) is consistently slightly faster than master and substantially better than OhMyThreads at default settings. The CPU-bound CPUDataset bs=8 row is dominated by @spawn setup cost on a tiny total workload (63 batches of small matmul), so single-trial variance is high.

Benchmark script
using MLUtils
using MLUtils: eachobsparallel
using Random

struct TimeDataset
    n::Int
    t::Float64
end
MLUtils.numobs(d::TimeDataset) = d.n
MLUtils.getobs(d::TimeDataset, i::Integer) = (sleep(d.t); i)
MLUtils.getobs(d::TimeDataset, idxs::AbstractVector) = [getobs(d, i) for i in idxs]

struct CPUDataset
    n::Int
    work::Int
end
MLUtils.numobs(d::CPUDataset) = d.n
MLUtils.getobs(d::CPUDataset, i::Integer) = sum(rand(d.work, d.work) * rand(d.work))
MLUtils.getobs(d::CPUDataset, idxs::AbstractVector) = [getobs(d, i) for i in idxs]

struct ArrayDataset
    n::Int
    sz::Tuple{Int,Int}
end
MLUtils.numobs(d::ArrayDataset) = d.n
MLUtils.getobs(d::ArrayDataset, i::Integer) = rand!(Array{Float64}(undef, d.sz...))
MLUtils.getobs!(buf, d::ArrayDataset, i::Integer) = rand!(buf)

# warmup
for _ in eachobsparallel(TimeDataset(10, 0.001)) end

println("Threads: ", Threads.nthreads(), "  Julia: ", VERSION)

println("\n# eachobsparallel — TimeDataset (sleep per obs)")
for (n, t) in [(100, 0.1), (10_000, 1e-3), (10_000, 1e-4), (10_000, 1e-5)]
    data = TimeDataset(n, t)
    e = @elapsed for _ in eachobsparallel(data) end
    println("  n=$n, t=$t: ", e, " s")
end

println("\n# eachobsparallel — CPUDataset (matmul per obs)")
for (n, w) in [(1000, 64), (1000, 256)]
    data = CPUDataset(n, w)
    e = @elapsed for _ in eachobsparallel(data) end
    println("  n=$n, work=$w: ", e, " s")
end

println("\n# DataLoader(buffer=true, parallel=true) — ArrayDataset")
let data = ArrayDataset(1000, (64, 64))
    e = @elapsed for _ in DataLoader(data; buffer=true, parallel=true, batchsize=-1) end
    println("  n=1000, sz=(64,64): ", e, " s")
end

println("\n# DataLoader(batchsize=8, parallel=true)")
let data = TimeDataset(1000, 1e-3)
    e = @elapsed for _ in DataLoader(data; batchsize=8, parallel=true) end
    println("  TimeDataset(1000, 1e-3), bs=8: ", e, " s")
end
let data = CPUDataset(500, 128)
    e = @elapsed for _ in DataLoader(data; batchsize=8, parallel=true) end
    println("  CPUDataset(500, 128), bs=8: ", e, " s")
end
Raw outputs (master, this PR, OhMyThreads default)

master (FLoops):

Threads: 6  Julia: 1.12.6

# eachobsparallel — TimeDataset (sleep per obs)
  n=100, t=0.1: 1.368496292 s
  n=10000, t=0.001: 3.059648 s
  n=10000, t=0.0001: 3.116711292 s
  n=10000, t=1.0e-5: 3.171029166 s

# eachobsparallel — CPUDataset (matmul per obs)
  n=1000, work=64: 0.080803167 s
  n=1000, work=256: 0.201418208 s

# DataLoader(buffer=true, parallel=true) — ArrayDataset
  n=1000, sz=(64,64): 0.240511958 s

# DataLoader(batchsize=8, parallel=true)
  TimeDataset(1000, 1e-3), bs=8: 0.345019208 s
  CPUDataset(500, 128), bs=8: 0.032582959 s

This PR (manual Threads.@spawn):

Threads: 6  Julia: 1.12.6

# eachobsparallel — TimeDataset (sleep per obs)
  n=100, t=0.1: 1.331942625 s
  n=10000, t=0.001: 2.83705325 s
  n=10000, t=0.0001: 2.853143958 s
  n=10000, t=1.0e-5: 2.848424875 s

# eachobsparallel — CPUDataset (matmul per obs)
  n=1000, work=64: 0.116775375 s
  n=1000, work=256: 0.1815135 s

# DataLoader(buffer=true, parallel=true) — ArrayDataset
  n=1000, sz=(64,64): 0.105989542 s

# DataLoader(batchsize=8, parallel=true)
  TimeDataset(1000, 1e-3), bs=8: 0.355370292 s
  CPUDataset(500, 128), bs=8: 0.087745334 s

Earlier experiment with OhMyThreads.tforeach at default chunking, for reference:

Threads: 6  Julia: 1.12.6

# eachobsparallel — TimeDataset (sleep per obs)
  n=100, t=0.1: 1.788332125 s
  n=10000, t=0.001: 4.311297167 s
  n=10000, t=0.0001: 4.399013791 s
  n=10000, t=1.0e-5: 4.481137083 s

# eachobsparallel — CPUDataset (matmul per obs)
  n=1000, work=64: 0.078201542 s
  n=1000, work=256: 0.162779042 s

# DataLoader(buffer=true, parallel=true) — ArrayDataset
  n=1000, sz=(64,64): 0.141071958 s

# DataLoader(batchsize=8, parallel=true)
  TimeDataset(1000, 1e-3), bs=8: 0.458739833 s
  CPUDataset(500, 128), bs=8: 0.027184666 s

Test plan

  • julia --project -t auto -e 'using Pkg; Pkg.test()' — full suite passes
  • eachobsparallel testset green (9/9)
  • Smoke test for numobs < nthreads (n = 0, 1, 3, 5 with 6 threads) — all terminate and produce the expected items
  • CI

🤖 Generated with Claude Code

Drops the FLoops dependency. Parallel iteration in `Loader` is rewritten
as a recursive divide-and-conquer over `argiter[lo:hi]` using
`Threads.@spawn`, mirroring `Transducers._reduce`: at each level, the
right half is spawned and the left half recurses on the current task,
with leaves (size <= basesize) processed sequentially. Default
`basesize = length ÷ nthreads()` matches the FLoops `ThreadedEx`
default, and the base-case check uses `max(basesize, 1)` so small
inputs (`numobs < nthreads`) terminate.

The outer dispatcher uses `Threads.@spawn` instead of `@async` to avoid
making the caller's task non-migratable (per the `@async` docstring's
warning about library code).

Also moves Transducers from a required dep to a weakdep + extension
(`ext/TransducersExt.jl`) and bumps the minimum Julia version to 1.10
(with the matching CI.yml `min-patch` switch).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@devmotion devmotion force-pushed the dmw/transducers_floops branch from 8cd5fea to acbbb8f Compare May 6, 2026 15:36
@devmotion devmotion closed this May 6, 2026
@devmotion devmotion reopened this May 6, 2026
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 6, 2026

Codecov Report

❌ Patch coverage is 70.83333% with 14 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.83%. Comparing base (4067267) to head (acbbb8f).
⚠️ Report is 10 commits behind head on main.

Files with missing lines Patch % Lines
ext/TransducersExt.jl 69.56% 7 Missing ⚠️
src/parallel.jl 69.56% 7 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #215      +/-   ##
==========================================
- Coverage   84.93%   84.83%   -0.11%     
==========================================
  Files          15       18       +3     
  Lines         697      712      +15     
==========================================
+ Hits          592      604      +12     
- Misses        105      108       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@devmotion devmotion marked this pull request as ready for review May 6, 2026 16:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants