Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 128 additions & 41 deletions src/windows.jl
Original file line number Diff line number Diff line change
Expand Up @@ -129,34 +129,108 @@ WindowedProblem(problem; kw...) = WindowedProblem(; problem, kw...)
centersize(p::WindowedProblem) = p.centersize, p.centersize
isthreaded(p::WindowedProblem) = p.threaded

mutable struct WindowedInit{P,R} <: Initialisation
"""
WindowWorkspace

The per-task workspace bundle used to solve a single window: a
`WorkspaceCollection` plus the `SparseBuilders` it shares. One
`WindowWorkspace` is held by a task for the duration of `init` + `solve!` on
that window, then returned to the pool.
"""
struct WindowWorkspace
workspaces::WorkspaceCollection
sparse_builders::SparseBuilders
end
function WindowWorkspace(inner_problem, max_size::Tuple{Int,Int})
mat_ws = _create_mat_workspaces(max_size, num_mat_workspaces(inner_problem), mmap_path(inner_problem))
workspaces = WorkspaceCollection(
Workspaces(max_size[1], num_vec_workspaces(inner_problem)),
mat_ws,
Workspaces(spzeros(max_size[1], max_size[1]), num_sp_workspaces(inner_problem)),
)
return WindowWorkspace(workspaces, SparseBuilders())
end

"""
SingleWorkspacePool

Holds one `WindowWorkspace`, used directly without synchronisation. This is
the structure used for non-threaded `WindowedProblem`s, where windows run
sequentially and reuse a single workspace.
"""
struct SingleWorkspacePool
window_workspace::WindowWorkspace
end

"""
ChannelWorkspacePool

A pool of `n` `WindowWorkspace`s fronted by a `Channel`. Each window task
`acquire`s one (blocking if all are in use) and `release`s it when done. Used
for threaded `WindowedProblem`s — the channel size acts as a semaphore on
concurrent window solves.
"""
struct ChannelWorkspacePool
window_workspaces::Vector{WindowWorkspace}
channel::Channel{WindowWorkspace}
end
function ChannelWorkspacePool(window_workspaces::Vector{WindowWorkspace})
channel = Channel{WindowWorkspace}(length(window_workspaces))
for ww in window_workspaces
put!(channel, ww)
end
return ChannelWorkspacePool(window_workspaces, channel)
end

# Pool interface: acquire one WindowWorkspace, release it back, iterate all.
Base.acquire(p::SingleWorkspacePool) = p.window_workspace
Base.acquire(p::ChannelWorkspacePool) = take!(p.channel)
Base.release(::SingleWorkspacePool, _ww) = nothing
Base.release(p::ChannelWorkspacePool, ww::WindowWorkspace) = (put!(p.channel, ww); nothing)
window_workspaces(p::SingleWorkspacePool) = (p.window_workspace,)
window_workspaces(p::ChannelWorkspacePool) = p.window_workspaces

# Direct workspace access — only well-defined for SingleWorkspacePool, where
# there is exactly one WindowWorkspace and it is reused across windows. With a
# ChannelWorkspacePool any task may currently hold any entry, so reaching in
# without acquire/release would race; we error instead.
workspaces(p::SingleWorkspacePool) = p.window_workspace.workspaces
sparse_builders(p::SingleWorkspacePool) = p.window_workspace.sparse_builders
workspaces(::ChannelWorkspacePool) =
error("workspaces(::WindowedInit) is undefined for a threaded WindowedProblem — acquire a WindowWorkspace from the pool instead")
sparse_builders(::ChannelWorkspacePool) =
error("sparse_builders(::WindowedInit) is undefined for a threaded WindowedProblem — acquire a WindowWorkspace from the pool instead")

mutable struct WindowedInit{P,R,Pool} <: Initialisation
problem::P
rast::R
sparse_sizes::Vector{Tuple{Int,Int}}
ranges::Vector{Tuple{UnitRange{Int},UnitRange{Int}}}
indices::Vector{Int}
sorted_indices::Vector{Int}
workspaces::WorkspaceCollection
sparse_builders::SparseBuilders
function WindowedInit(problem::P, rast::R, sparse_sizes, ranges, indices, sorted_indices, workspaces, sparse_builders) where {P,R}
wi = new{P,R}(problem, rast, sparse_sizes, ranges, indices, sorted_indices, workspaces, sparse_builders)
# Register finalizer to clean up mmap files if any
isempty(mat_workspaces(workspaces).mmap_paths) || finalizer(_cleanup_windowed_init!, wi)
pool::Pool
function WindowedInit(problem::P, rast::R, sparse_sizes, ranges, indices, sorted_indices, pool::Pool) where {P,R,Pool}
wi = new{P,R,Pool}(problem, rast, sparse_sizes, ranges, indices, sorted_indices, pool)
any(ww -> !isempty(mat_workspaces(ww.workspaces).mmap_paths), window_workspaces(pool)) &&
finalizer(_cleanup_windowed_init!, wi)
return wi
end
end

function _cleanup_windowed_init!(wi::WindowedInit)
cleanup!(mat_workspaces(wi))
for ww in window_workspaces(wi.pool)
cleanup!(mat_workspaces(ww.workspaces))
end
return nothing
end

problem(wi::WindowedInit) = wi.problem
workspaces(wi::WindowedInit) = wi.workspaces
pool(wi::WindowedInit) = wi.pool
workspaces(wi::WindowedInit) = workspaces(wi.pool)
sparse_builders(wi::WindowedInit) = sparse_builders(wi.pool)
vec_workspaces(wi::WindowedInit) = vec_workspaces(workspaces(wi))
mat_workspaces(wi::WindowedInit) = mat_workspaces(workspaces(wi))
sp_workspaces(wi::WindowedInit) = sp_workspaces(workspaces(wi))
sparse_builders(wi::WindowedInit) = wi.sparse_builders

"""
cleanup!(wi::WindowedInit)
Expand All @@ -182,36 +256,42 @@ function init(p::WindowedProblem, rast::RasterStack;
indices = isnothing(indices) ? _select_indices(p, rast; window_ranges, sparse_sizes) : indices
sorted_indices = last.(sort!(first.(sparse_sizes[indices]) .=> indices; rev=true))

# Pre-allocate workspaces sized for the largest window
# These will be resized as needed for each window, avoiding repeated allocations
# Use already-computed sparse_sizes to find max (avoids re-scanning raster)
# Pre-allocate workspaces sized for the largest window.
# These will be resized as needed for each window, avoiding repeated allocations.
# Use already-computed sparse_sizes to find max (avoids re-scanning raster).
_, max_idx = findmax(prod, sparse_sizes)
max_size = sparse_sizes[max_idx]
inner_problem = problem(p)

# Create mat_workspaces - optionally mmap'd to reduce GC pressure
mat_ws = _create_mat_workspaces(max_size, num_mat_workspaces(inner_problem), mmap_path(inner_problem))

workspaces = WorkspaceCollection(
Workspaces(max_size[1], num_vec_workspaces(inner_problem)),
mat_ws,
Workspaces(spzeros(max_size[1], max_size[1]), num_sp_workspaces(inner_problem)),
)
sparse_builders = SparseBuilders()
# Build the workspace pool. In single-threaded mode that's just one
# WindowWorkspace reused across windows. In threaded mode it's a
# Channel-backed pool of N = min(nthreads, nwindows) WindowWorkspaces —
# the channel size caps concurrent solves and keeps each task's workspaces
# disjoint from every other's, sidestepping ConcurrencyViolationError on
# the workspaces' internal `unused` BitVector.
pool = if p.threaded && length(sorted_indices) > 1
n = min(Threads.nthreads(), length(sorted_indices))
ChannelWorkspacePool([WindowWorkspace(inner_problem, max_size) for _ in 1:n])
else
SingleWorkspacePool(WindowWorkspace(inner_problem, max_size))
end

return WindowedInit(
p, rast, sparse_sizes, window_ranges, indices, sorted_indices, workspaces, sparse_builders
p, rast, sparse_sizes, window_ranges, indices, sorted_indices, pool
)
end
function init(wi::WindowedInit, i::Int; verbose=false)

function init(wi::WindowedInit, i::Int;
verbose=false,
ww::WindowWorkspace=Base.acquire(wi.pool),
)
ranges = wi.ranges[i]
verbose && println("Initialising window from ranges $ranges...")
rast = _get_window_with_zeroed_buffer(wi, ranges)
# Pass pre-allocated workspaces to avoid repeated allocations
init(problem(problem(wi)), rast;
return init(problem(problem(wi)), rast;
verbose,
workspaces=workspaces(wi),
sparse_builders=sparse_builders(wi),
workspaces=ww.workspaces,
sparse_builders=ww.sparse_builders,
)
end

Expand Down Expand Up @@ -240,22 +320,29 @@ function solve!(window_init::WindowedInit;
end
end

# Set up channels for threading
# Define a runner for threaded/non-threaded operation
wp = pool(window_init)
# Define a runner for threaded/non-threaded operation.
# Each invocation acquires a WindowWorkspace from the pool, holds it for
# the duration of init+solve, and releases it in a finally block. For
# SingleWorkspacePool acquire/release are no-ops returning the only
# WindowWorkspace; for ChannelWorkspacePool the channel blocks acquire
# when all are in use, providing backpressure on Threads.@spawn.
function run(i, iw)
verbose && println("Running window $i - $iw on thread $(Threads.threadid())")
# Initialise the window using stored memory
ggi = init(window_init, iw; verbose)
@assert ggi isa GridGraphInit
# Solve for the window
elapsed = @elapsed begin
output = solve!(ggi; verbose)
ww = Base.acquire(wp)
try
ggi = init(window_init, iw; ww, verbose)
@assert ggi isa GridGraphInit
elapsed = @elapsed begin
output = solve!(ggi; verbose)
end
# Garbage collect for this window.
# Inefficient for few/small windows but often needed for many/large windows.
p.gc && GC.gc()
return output, elapsed
finally
Base.release(wp, ww)
end
# Garbage collect for this window
# Inneficient for few/small windows but
# often needed for many/large windows
p.gc && GC.gc()
return output, elapsed
end
# Run the window problems
out_elapsed = if p.threaded
Expand Down
37 changes: 37 additions & 0 deletions test/windowed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,43 @@ end
end


@testset "threaded WindowedProblem uses ChannelWorkspacePool" begin
distance_transformation = x -> exp(-x / 50)
movement = RandomisedShortestPath(ExpectedCost(); theta=θ, distance_transformation)
csp = ConScapeProblem(; measures, movement, solver=VectorSolver())
kw = (; buffer=10, centersize=5)

# Non-threaded: SinglePool, accessors return the only workspace.
wp_serial = WindowedProblem(csp; kw..., threaded=false)
wi_serial = init(wp_serial, rast)
@test wi_serial.pool isa ConScape.SingleWorkspacePool
@test ConScape.mat_workspaces(wi_serial) isa ConScape.Workspaces

# Threaded with multiple valid windows: ChannelPool sized to min(nthreads, nwindows).
wp_threaded = WindowedProblem(csp; kw..., threaded=true)
wi_threaded = init(wp_threaded, rast)
@test wi_threaded.pool isa ConScape.ChannelWorkspacePool
@test length(wi_threaded.pool.window_workspaces) ==
min(Threads.nthreads(), length(wi_threaded.sorted_indices))

# Direct workspace accessors must error on the threaded pool — they would
# otherwise return a workspace another task may currently hold.
@test_throws ErrorException ConScape.workspaces(wi_threaded)
@test_throws ErrorException ConScape.sparse_builders(wi_threaded)
@test_throws ErrorException ConScape.mat_workspaces(wi_threaded)

# Threaded path produces the same result as serial.
serial_result = solve(wp_serial, rast)
threaded_result = solve(wp_threaded, rast)
for k in keys(serial_result)
s = serial_result[k]
t = threaded_result[k]
@test all(zip(s, t)) do (a, b)
isnan(a) && isnan(b) || isapprox(a, b; atol=1e-8)
end
end
end

# BatchProblem writes files to disk and mosaics to RasterStack
@testset "batch problem matches windowed problem" begin
solver = VectorSolver()
Expand Down
Loading