Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ steps:
- JuliaCI/julia-coverage#v1:
codecov: true

- label: Julia 1.11 (MPI)
timeout_in_minutes: 20
<<: *test
plugins:
- JuliaCI/julia#v1:
version: "1.11"
- JuliaCI/julia-test#v1:
julia_args: "--threads=1"
- JuliaCI/julia-coverage#v1:
codecov: true
env:
CI_USE_MPI: "1"

- label: Julia 1.11 (CUDA)
timeout_in_minutes: 60
<<: *gputest
Expand Down
6 changes: 0 additions & 6 deletions src/datadeps/remainders.jl
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,8 @@ function compute_remainder_for_arg!(state::DataDepsState,
nspans = length(first(target_ainfos))

# FIXME: This is a hack to ensure that we don't miss any history generated by aliasing(...)
<<<<<<< HEAD
for (_, space, _) in state.arg_history[arg_w]
if !in(space, spaces)
=======
for entry in state.arg_history[arg_w]
if !in(entry.space, spaces)
@opcounter :compute_remainder_for_arg_restart
>>>>>>> 85e0b801 (MPI: Optimizations and fix some uniformity issues)
@goto restart
end
end
Expand Down
18 changes: 9 additions & 9 deletions src/sch/Sch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ struct ComputeState
worker_chans::Dict{Int,Tuple{RemoteChannel,RemoteChannel}}
signature_time_cost::Dict{Signature,UInt64}
signature_alloc_cost::Dict{Signature,UInt64}
worker_transfer_rate::Dict{Int,Dict{Processor,UInt64}}
worker_transfer_rate::Dict{Processor,Dict{Processor,UInt64}}
halt::Base.Event
lock::ReentrantLock
futures::Dict{Thunk, Vector{ThunkFuture}}
Expand All @@ -115,14 +115,14 @@ function start_state(deps::Dict, node_order, chan)
Dict{Int, WeakThunk}(),
node_order,
WeakKeyDict{Any,Chunk}(),
Dict{Int,Dict{Processor,UInt64}}(),
Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}}(),
Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}}(),
Dict{Int,NTuple{3,Float64}}(),
Dict{Int, Tuple{RemoteChannel,RemoteChannel}}(),
Dict{Processor,Dict{Processor,UInt64}}(),
Dict{Processor,Dict{Union{StorageResource,Nothing},UInt64}}(),
Dict{Processor,Dict{Union{StorageResource,Nothing},UInt64}}(),
Dict{Processor,NTuple{3,Float64}}(),
Dict{Processor,Tuple{RemoteChannel,RemoteChannel}}(),
Dict{Signature,UInt64}(),
Dict{Signature,UInt64}(),
Dict{Int,Dict{Processor,UInt64}}(),
Dict{Processor,Dict{Processor,UInt64}}(),
Base.Event(),
ReentrantLock(),
Dict{Thunk, Vector{ThunkFuture}}(),
Expand Down Expand Up @@ -1189,7 +1189,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
end
task, occupancy = peek(queue)
scope = task.scope
accel = something(task.options.acceleration, Dagger.DistributedAcceleration())
accel = something(task.options.acceleration, Dagger.DistributedAcceleration())
if Dagger.proc_in_scope(to_proc, scope) && Dagger.accel_matches_proc(accel, to_proc)
typemax(UInt32) - proc_occupancy_cached >= occupancy
# Compatible, steal this task
Expand Down Expand Up @@ -1400,7 +1400,7 @@ function do_tasks(to_proc, return_queue, tasks)
end
@dagdebug nothing :processor "Kicked processors"
end

const SCHED_MOVE = ScopedValue{Bool}(false)

"""
Expand Down
3 changes: 2 additions & 1 deletion src/sch/util.jl
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,9 @@ const DEFAULT_TRANSFER_RATE = UInt64(1_000_000)
# Add fixed cost for cross-worker task transfer (esimated at 1ms)
# TODO: Actually estimate/benchmark this
task_xfer_cost = root_worker_id(gproc) != myid() ? 1_000_000 : 0 # 1ms
pid = Dagger.root_worker_id(gproc)

tx_rate = get(get(state.worker_transfer_rate, gproc.pid, Dict{Processor,UInt64}()), proc, DEFAULT_TRANSFER_RATE)
tx_rate = get(get(state.worker_transfer_rate, pid, Dict{Processor,UInt64}()), proc, DEFAULT_TRANSFER_RATE)
costs[proc] = est_time_util + (tx_cost/tx_rate) + task_xfer_cost
end
chunks_cleanup()
Expand Down
104 changes: 46 additions & 58 deletions test/mpi.jl
Original file line number Diff line number Diff line change
@@ -1,70 +1,58 @@
using Dagger
using MPI
using LinearAlgebra
using SparseArrays

using Dagger, MPI, LinearAlgebra
Dagger.accelerate!(:mpi)

comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)

# Use a large array (adjust size as needed for your RAM)
N = 100
tag = 123
sz = MPI.Comm_size(comm)

mpidagger_all_results = []

# Define constants
# You need to define the MPI workers before running the benchmark
# Example: mpirun -n 4 julia --project benchmarks/DaggerMPI_Weak_scale.jl
datatype = [Float32, Float64]
datasize = 40
try
for T in datatype
A = rand(T, datasize, datasize)
A = A * A'
A[diagind(A)] .+= size(A, 1)
B = copy(A)
@assert ishermitian(B)
DA = distribute(A, Blocks(20,20))
DB = distribute(B, Blocks(20,20))

LinearAlgebra._chol!(DA, UpperTriangular)
elapsed_time = @elapsed chol_DB = LinearAlgebra._chol!(DB, UpperTriangular)

# Store results
result = (
procs = sz,
dtype = T,
size = datasize,
time = elapsed_time,
gflops = (datasize^3 / 3) / (elapsed_time * 1e9)
)
push!(mpidagger_all_results, result)

if rank == 0
arr = sprand(N, N, 0.6)
else
arr = spzeros(N, N)
end

# --- Out-of-place broadcast ---
function bcast_outofplace()
MPI.Barrier(comm)
if rank == 0
Dagger.bcast_send_yield(arr, comm, 0, tag+1)
else
Dagger.bcast_recv_yield(comm, 0, tag+1)
end
MPI.Barrier(comm)
end
# --- In-place broadcast ---

function bcast_inplace()
MPI.Barrier(comm)
catch e
if rank == 0
Dagger.bcast_send_yield!(arr, comm, 0, tag)
else
Dagger.bcast_recv_yield!(arr, comm, 0, tag)
showerror(stdout, e)
end
MPI.Barrier(comm)
end
if rank == 0
#= Write results to CSV
mkpath("benchmarks/results")
if !isempty(mpidagger_all_results)
df = DataFrame(mpidagger_all_results)
CSV.write("benchmarks/results/DaggerMPI_Weak_scale_results.csv", df)

function bcast_inplace_metadata()
MPI.Barrier(comm)
if rank == 0
Dagger.bcast_send_yield_metadata(arr, comm, 0)
end
MPI.Barrier(comm)
=#
# Summary statistics
for result in mpidagger_all_results
println(result.procs, ",", result.dtype, ",", result.size, ",", result.time, ",", result.gflops)
end
#println("\nAll Cholesky tests completed!")
end


inplace = @time bcast_inplace()


MPI.Barrier(comm)
MPI.Finalize()




#=
A = rand(Blocks(2,2), 4, 4)
Ac = collect(A)
println(Ac)


move!(identity, Ac[1].space , Ac[2].space, Ac[1], Ac[2])
=#

26 changes: 26 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ USE_ROCM = parse(Bool, get(ENV, "CI_USE_ROCM", "0"))
USE_ONEAPI = parse(Bool, get(ENV, "CI_USE_ONEAPI", "0"))
USE_METAL = parse(Bool, get(ENV, "CI_USE_METAL", "0"))
USE_OPENCL = parse(Bool, get(ENV, "CI_USE_OPENCL", "0"))
USE_MPI = parse(Bool, get(ENV, "CI_USE_MPI", "0"))
USE_GPU = USE_CUDA || USE_ROCM || USE_ONEAPI || USE_METAL || USE_OPENCL

tests = [
Expand Down Expand Up @@ -46,6 +47,8 @@ tests = [
("Reusable Data Structures", "reuse.jl"),
("External Languages - Python", "extlang/python.jl"),
("Preferences", "preferences.jl"),
("MPI_test", "mpi_test.jl"),
#("MPI", "mpi.jl")
#("Fault Tolerance", "fault-tolerance.jl"),
]
if USE_GPU
Expand All @@ -55,6 +58,15 @@ if USE_GPU
("Array - Stencils", "array/stencil.jl"),
]
end

if USE_MPI
#Only run MPI tests
tests = [
#("MPI", "mpi.jl"),
("MPI_test", "mpi_test.jl"),
]
end

all_test_names = map(test -> replace(last(test), ".jl"=>""), tests)

additional_workers::Int = 3
Expand All @@ -65,6 +77,9 @@ if PROGRAM_FILE != "" && realpath(PROGRAM_FILE) == @__FILE__
using Pkg
Pkg.activate(@__DIR__)
try
# If I not use Pkg.develop it returns the error "Package Dagger not found in current path.
# Run `import Pkg; Pkg.add("Dagger")` to install the Dagger package."
Pkg.develop(path=joinpath(@__DIR__, ".."))
Pkg.instantiate()
catch
end
Expand Down Expand Up @@ -185,6 +200,17 @@ if USE_GPU
include("setup_gpu.jl")
end

if USE_MPI
include("setup_mpi.jl")
@info "Running MPI tests via mpiexecjl"
# Construct path to mpiexecjl in the depot's bin directory
# I've tried to figure out another way to launch the mpi jobs but that was the only one I got today
cmd = `mpiexecjl -n 2 $(Base.julia_cmd()) --project=$(Base.active_project()) $(joinpath(@__DIR__, "mpi_test.jl"))`
@info "Executing: $cmd"
run(cmd)
exit(0)
end

try
for test in to_test
test_title = tests[findfirst(x->x[2]==test * ".jl", tests)][1]
Expand Down
20 changes: 20 additions & 0 deletions test/setup_mpi.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
if USE_MPI
using Pkg
Pkg.add("MPI")
Pkg.add("MPIPreferences")
end

@everywhere begin
if $USE_MPI
using MPI
using MPIPreferences

# Configure each rank to use JLL binary
MPIPreferences.use_jll_binary("MPICH_jll"; export_prefs=true)
end
end

if USE_MPI
# Install mpiexecjl wrapper to standard bin location
MPI.install_mpiexecjl(; destdir=joinpath(DEPOT_PATH[1], "bin"), force=true)
end
Loading