diff --git a/src/windows.jl b/src/windows.jl index 0d1a056..b0f47a6 100644 --- a/src/windows.jl +++ b/src/windows.jl @@ -129,34 +129,108 @@ WindowedProblem(problem; kw...) = WindowedProblem(; problem, kw...) centersize(p::WindowedProblem) = p.centersize, p.centersize isthreaded(p::WindowedProblem) = p.threaded -mutable struct WindowedInit{P,R} <: Initialisation +""" + WindowWorkspace + +The per-task workspace bundle used to solve a single window: a +`WorkspaceCollection` plus the `SparseBuilders` it shares. One +`WindowWorkspace` is held by a task for the duration of `init` + `solve!` on +that window, then returned to the pool. +""" +struct WindowWorkspace + workspaces::WorkspaceCollection + sparse_builders::SparseBuilders +end +function WindowWorkspace(inner_problem, max_size::Tuple{Int,Int}) + mat_ws = _create_mat_workspaces(max_size, num_mat_workspaces(inner_problem), mmap_path(inner_problem)) + workspaces = WorkspaceCollection( + Workspaces(max_size[1], num_vec_workspaces(inner_problem)), + mat_ws, + Workspaces(spzeros(max_size[1], max_size[1]), num_sp_workspaces(inner_problem)), + ) + return WindowWorkspace(workspaces, SparseBuilders()) +end + +""" + SingleWorkspacePool + +Holds one `WindowWorkspace`, used directly without synchronisation. This is +the structure used for non-threaded `WindowedProblem`s, where windows run +sequentially and reuse a single workspace. +""" +struct SingleWorkspacePool + window_workspace::WindowWorkspace +end + +""" + ChannelWorkspacePool + +A pool of `n` `WindowWorkspace`s fronted by a `Channel`. Each window task +`acquire`s one (blocking if all are in use) and `release`s it when done. Used +for threaded `WindowedProblem`s — the channel size acts as a semaphore on +concurrent window solves. +""" +struct ChannelWorkspacePool + window_workspaces::Vector{WindowWorkspace} + channel::Channel{WindowWorkspace} +end +function ChannelWorkspacePool(window_workspaces::Vector{WindowWorkspace}) + channel = Channel{WindowWorkspace}(length(window_workspaces)) + for ww in window_workspaces + put!(channel, ww) + end + return ChannelWorkspacePool(window_workspaces, channel) +end + +# Pool interface: acquire one WindowWorkspace, release it back, iterate all. +Base.acquire(p::SingleWorkspacePool) = p.window_workspace +Base.acquire(p::ChannelWorkspacePool) = take!(p.channel) +Base.release(::SingleWorkspacePool, _ww) = nothing +Base.release(p::ChannelWorkspacePool, ww::WindowWorkspace) = (put!(p.channel, ww); nothing) +window_workspaces(p::SingleWorkspacePool) = (p.window_workspace,) +window_workspaces(p::ChannelWorkspacePool) = p.window_workspaces + +# Direct workspace access — only well-defined for SingleWorkspacePool, where +# there is exactly one WindowWorkspace and it is reused across windows. With a +# ChannelWorkspacePool any task may currently hold any entry, so reaching in +# without acquire/release would race; we error instead. +workspaces(p::SingleWorkspacePool) = p.window_workspace.workspaces +sparse_builders(p::SingleWorkspacePool) = p.window_workspace.sparse_builders +workspaces(::ChannelWorkspacePool) = + error("workspaces(::WindowedInit) is undefined for a threaded WindowedProblem — acquire a WindowWorkspace from the pool instead") +sparse_builders(::ChannelWorkspacePool) = + error("sparse_builders(::WindowedInit) is undefined for a threaded WindowedProblem — acquire a WindowWorkspace from the pool instead") + +mutable struct WindowedInit{P,R,Pool} <: Initialisation problem::P rast::R sparse_sizes::Vector{Tuple{Int,Int}} ranges::Vector{Tuple{UnitRange{Int},UnitRange{Int}}} indices::Vector{Int} sorted_indices::Vector{Int} - workspaces::WorkspaceCollection - sparse_builders::SparseBuilders - function WindowedInit(problem::P, rast::R, sparse_sizes, ranges, indices, sorted_indices, workspaces, sparse_builders) where {P,R} - wi = new{P,R}(problem, rast, sparse_sizes, ranges, indices, sorted_indices, workspaces, sparse_builders) - # Register finalizer to clean up mmap files if any - isempty(mat_workspaces(workspaces).mmap_paths) || finalizer(_cleanup_windowed_init!, wi) + pool::Pool + function WindowedInit(problem::P, rast::R, sparse_sizes, ranges, indices, sorted_indices, pool::Pool) where {P,R,Pool} + wi = new{P,R,Pool}(problem, rast, sparse_sizes, ranges, indices, sorted_indices, pool) + any(ww -> !isempty(mat_workspaces(ww.workspaces).mmap_paths), window_workspaces(pool)) && + finalizer(_cleanup_windowed_init!, wi) return wi end end function _cleanup_windowed_init!(wi::WindowedInit) - cleanup!(mat_workspaces(wi)) + for ww in window_workspaces(wi.pool) + cleanup!(mat_workspaces(ww.workspaces)) + end return nothing end problem(wi::WindowedInit) = wi.problem -workspaces(wi::WindowedInit) = wi.workspaces +pool(wi::WindowedInit) = wi.pool +workspaces(wi::WindowedInit) = workspaces(wi.pool) +sparse_builders(wi::WindowedInit) = sparse_builders(wi.pool) vec_workspaces(wi::WindowedInit) = vec_workspaces(workspaces(wi)) mat_workspaces(wi::WindowedInit) = mat_workspaces(workspaces(wi)) sp_workspaces(wi::WindowedInit) = sp_workspaces(workspaces(wi)) -sparse_builders(wi::WindowedInit) = wi.sparse_builders """ cleanup!(wi::WindowedInit) @@ -182,36 +256,42 @@ function init(p::WindowedProblem, rast::RasterStack; indices = isnothing(indices) ? _select_indices(p, rast; window_ranges, sparse_sizes) : indices sorted_indices = last.(sort!(first.(sparse_sizes[indices]) .=> indices; rev=true)) - # Pre-allocate workspaces sized for the largest window - # These will be resized as needed for each window, avoiding repeated allocations - # Use already-computed sparse_sizes to find max (avoids re-scanning raster) + # Pre-allocate workspaces sized for the largest window. + # These will be resized as needed for each window, avoiding repeated allocations. + # Use already-computed sparse_sizes to find max (avoids re-scanning raster). _, max_idx = findmax(prod, sparse_sizes) max_size = sparse_sizes[max_idx] inner_problem = problem(p) - # Create mat_workspaces - optionally mmap'd to reduce GC pressure - mat_ws = _create_mat_workspaces(max_size, num_mat_workspaces(inner_problem), mmap_path(inner_problem)) - - workspaces = WorkspaceCollection( - Workspaces(max_size[1], num_vec_workspaces(inner_problem)), - mat_ws, - Workspaces(spzeros(max_size[1], max_size[1]), num_sp_workspaces(inner_problem)), - ) - sparse_builders = SparseBuilders() + # Build the workspace pool. In single-threaded mode that's just one + # WindowWorkspace reused across windows. In threaded mode it's a + # Channel-backed pool of N = min(nthreads, nwindows) WindowWorkspaces — + # the channel size caps concurrent solves and keeps each task's workspaces + # disjoint from every other's, sidestepping ConcurrencyViolationError on + # the workspaces' internal `unused` BitVector. + pool = if p.threaded && length(sorted_indices) > 1 + n = min(Threads.nthreads(), length(sorted_indices)) + ChannelWorkspacePool([WindowWorkspace(inner_problem, max_size) for _ in 1:n]) + else + SingleWorkspacePool(WindowWorkspace(inner_problem, max_size)) + end return WindowedInit( - p, rast, sparse_sizes, window_ranges, indices, sorted_indices, workspaces, sparse_builders + p, rast, sparse_sizes, window_ranges, indices, sorted_indices, pool ) end -function init(wi::WindowedInit, i::Int; verbose=false) + +function init(wi::WindowedInit, i::Int; + verbose=false, + ww::WindowWorkspace=Base.acquire(wi.pool), +) ranges = wi.ranges[i] verbose && println("Initialising window from ranges $ranges...") rast = _get_window_with_zeroed_buffer(wi, ranges) - # Pass pre-allocated workspaces to avoid repeated allocations - init(problem(problem(wi)), rast; + return init(problem(problem(wi)), rast; verbose, - workspaces=workspaces(wi), - sparse_builders=sparse_builders(wi), + workspaces=ww.workspaces, + sparse_builders=ww.sparse_builders, ) end @@ -240,22 +320,29 @@ function solve!(window_init::WindowedInit; end end - # Set up channels for threading - # Define a runner for threaded/non-threaded operation + wp = pool(window_init) + # Define a runner for threaded/non-threaded operation. + # Each invocation acquires a WindowWorkspace from the pool, holds it for + # the duration of init+solve, and releases it in a finally block. For + # SingleWorkspacePool acquire/release are no-ops returning the only + # WindowWorkspace; for ChannelWorkspacePool the channel blocks acquire + # when all are in use, providing backpressure on Threads.@spawn. function run(i, iw) verbose && println("Running window $i - $iw on thread $(Threads.threadid())") - # Initialise the window using stored memory - ggi = init(window_init, iw; verbose) - @assert ggi isa GridGraphInit - # Solve for the window - elapsed = @elapsed begin - output = solve!(ggi; verbose) + ww = Base.acquire(wp) + try + ggi = init(window_init, iw; ww, verbose) + @assert ggi isa GridGraphInit + elapsed = @elapsed begin + output = solve!(ggi; verbose) + end + # Garbage collect for this window. + # Inefficient for few/small windows but often needed for many/large windows. + p.gc && GC.gc() + return output, elapsed + finally + Base.release(wp, ww) end - # Garbage collect for this window - # Inneficient for few/small windows but - # often needed for many/large windows - p.gc && GC.gc() - return output, elapsed end # Run the window problems out_elapsed = if p.threaded diff --git a/test/windowed.jl b/test/windowed.jl index 80fdb01..b221c40 100644 --- a/test/windowed.jl +++ b/test/windowed.jl @@ -88,6 +88,43 @@ end end +@testset "threaded WindowedProblem uses ChannelWorkspacePool" begin + distance_transformation = x -> exp(-x / 50) + movement = RandomisedShortestPath(ExpectedCost(); theta=θ, distance_transformation) + csp = ConScapeProblem(; measures, movement, solver=VectorSolver()) + kw = (; buffer=10, centersize=5) + + # Non-threaded: SinglePool, accessors return the only workspace. + wp_serial = WindowedProblem(csp; kw..., threaded=false) + wi_serial = init(wp_serial, rast) + @test wi_serial.pool isa ConScape.SingleWorkspacePool + @test ConScape.mat_workspaces(wi_serial) isa ConScape.Workspaces + + # Threaded with multiple valid windows: ChannelPool sized to min(nthreads, nwindows). + wp_threaded = WindowedProblem(csp; kw..., threaded=true) + wi_threaded = init(wp_threaded, rast) + @test wi_threaded.pool isa ConScape.ChannelWorkspacePool + @test length(wi_threaded.pool.window_workspaces) == + min(Threads.nthreads(), length(wi_threaded.sorted_indices)) + + # Direct workspace accessors must error on the threaded pool — they would + # otherwise return a workspace another task may currently hold. + @test_throws ErrorException ConScape.workspaces(wi_threaded) + @test_throws ErrorException ConScape.sparse_builders(wi_threaded) + @test_throws ErrorException ConScape.mat_workspaces(wi_threaded) + + # Threaded path produces the same result as serial. + serial_result = solve(wp_serial, rast) + threaded_result = solve(wp_threaded, rast) + for k in keys(serial_result) + s = serial_result[k] + t = threaded_result[k] + @test all(zip(s, t)) do (a, b) + isnan(a) && isnan(b) || isapprox(a, b; atol=1e-8) + end + end +end + # BatchProblem writes files to disk and mosaics to RasterStack @testset "batch problem matches windowed problem" begin solver = VectorSolver()