diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 8e3b41511..8804a7938 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -90,6 +90,19 @@ steps: - JuliaCI/julia-coverage#v1: codecov: true + - label: Julia 1.11 (MPI) + timeout_in_minutes: 20 + <<: *test + plugins: + - JuliaCI/julia#v1: + version: "1.11" + - JuliaCI/julia-test#v1: + julia_args: "--threads=1" + - JuliaCI/julia-coverage#v1: + codecov: true + env: + CI_USE_MPI: "1" + - label: Julia 1.11 (CUDA) timeout_in_minutes: 60 <<: *gputest diff --git a/src/datadeps/remainders.jl b/src/datadeps/remainders.jl index af4b8a13c..88201c621 100644 --- a/src/datadeps/remainders.jl +++ b/src/datadeps/remainders.jl @@ -145,14 +145,8 @@ function compute_remainder_for_arg!(state::DataDepsState, nspans = length(first(target_ainfos)) # FIXME: This is a hack to ensure that we don't miss any history generated by aliasing(...) -<<<<<<< HEAD - for (_, space, _) in state.arg_history[arg_w] - if !in(space, spaces) -======= for entry in state.arg_history[arg_w] if !in(entry.space, spaces) - @opcounter :compute_remainder_for_arg_restart ->>>>>>> 85e0b801 (MPI: Optimizations and fix some uniformity issues) @goto restart end end diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index 3fcd87070..3b8688a16 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -93,7 +93,7 @@ struct ComputeState worker_chans::Dict{Int,Tuple{RemoteChannel,RemoteChannel}} signature_time_cost::Dict{Signature,UInt64} signature_alloc_cost::Dict{Signature,UInt64} - worker_transfer_rate::Dict{Int,Dict{Processor,UInt64}} + worker_transfer_rate::Dict{Processor,Dict{Processor,UInt64}} halt::Base.Event lock::ReentrantLock futures::Dict{Thunk, Vector{ThunkFuture}} @@ -115,14 +115,14 @@ function start_state(deps::Dict, node_order, chan) Dict{Int, WeakThunk}(), node_order, WeakKeyDict{Any,Chunk}(), - Dict{Int,Dict{Processor,UInt64}}(), - Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}}(), - Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}}(), - Dict{Int,NTuple{3,Float64}}(), - Dict{Int, Tuple{RemoteChannel,RemoteChannel}}(), + Dict{Processor,Dict{Processor,UInt64}}(), + Dict{Processor,Dict{Union{StorageResource,Nothing},UInt64}}(), + Dict{Processor,Dict{Union{StorageResource,Nothing},UInt64}}(), + Dict{Processor,NTuple{3,Float64}}(), + Dict{Processor,Tuple{RemoteChannel,RemoteChannel}}(), Dict{Signature,UInt64}(), Dict{Signature,UInt64}(), - Dict{Int,Dict{Processor,UInt64}}(), + Dict{Processor,Dict{Processor,UInt64}}(), Base.Event(), ReentrantLock(), Dict{Thunk, Vector{ThunkFuture}}(), @@ -1189,7 +1189,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re end task, occupancy = peek(queue) scope = task.scope - accel = something(task.options.acceleration, Dagger.DistributedAcceleration()) + accel = something(task.options.acceleration, Dagger.DistributedAcceleration()) if Dagger.proc_in_scope(to_proc, scope) && Dagger.accel_matches_proc(accel, to_proc) typemax(UInt32) - proc_occupancy_cached >= occupancy # Compatible, steal this task @@ -1400,7 +1400,7 @@ function do_tasks(to_proc, return_queue, tasks) end @dagdebug nothing :processor "Kicked processors" end - + const SCHED_MOVE = ScopedValue{Bool}(false) """ diff --git a/src/sch/util.jl b/src/sch/util.jl index 8e36c3576..38b767588 100644 --- a/src/sch/util.jl +++ b/src/sch/util.jl @@ -594,8 +594,9 @@ const DEFAULT_TRANSFER_RATE = UInt64(1_000_000) # Add fixed cost for cross-worker task transfer (esimated at 1ms) # TODO: Actually estimate/benchmark this task_xfer_cost = root_worker_id(gproc) != myid() ? 1_000_000 : 0 # 1ms + pid = Dagger.root_worker_id(gproc) - tx_rate = get(get(state.worker_transfer_rate, gproc.pid, Dict{Processor,UInt64}()), proc, DEFAULT_TRANSFER_RATE) + tx_rate = get(get(state.worker_transfer_rate, pid, Dict{Processor,UInt64}()), proc, DEFAULT_TRANSFER_RATE) costs[proc] = est_time_util + (tx_cost/tx_rate) + task_xfer_cost end chunks_cleanup() diff --git a/test/mpi.jl b/test/mpi.jl index a84ffdce1..c6d2cbae3 100644 --- a/test/mpi.jl +++ b/test/mpi.jl @@ -1,70 +1,58 @@ -using Dagger -using MPI -using LinearAlgebra -using SparseArrays - +using Dagger, MPI, LinearAlgebra Dagger.accelerate!(:mpi) - comm = MPI.COMM_WORLD rank = MPI.Comm_rank(comm) -size = MPI.Comm_size(comm) - -# Use a large array (adjust size as needed for your RAM) -N = 100 -tag = 123 +sz = MPI.Comm_size(comm) + +mpidagger_all_results = [] + +# Define constants +# You need to define the MPI workers before running the benchmark +# Example: mpirun -n 4 julia --project benchmarks/DaggerMPI_Weak_scale.jl +datatype = [Float32, Float64] +datasize = 40 +try + for T in datatype + A = rand(T, datasize, datasize) + A = A * A' + A[diagind(A)] .+= size(A, 1) + B = copy(A) + @assert ishermitian(B) + DA = distribute(A, Blocks(20,20)) + DB = distribute(B, Blocks(20,20)) + + LinearAlgebra._chol!(DA, UpperTriangular) + elapsed_time = @elapsed chol_DB = LinearAlgebra._chol!(DB, UpperTriangular) + + # Store results + result = ( + procs = sz, + dtype = T, + size = datasize, + time = elapsed_time, + gflops = (datasize^3 / 3) / (elapsed_time * 1e9) + ) + push!(mpidagger_all_results, result) -if rank == 0 - arr = sprand(N, N, 0.6) -else - arr = spzeros(N, N) -end -# --- Out-of-place broadcast --- -function bcast_outofplace() - MPI.Barrier(comm) - if rank == 0 - Dagger.bcast_send_yield(arr, comm, 0, tag+1) - else - Dagger.bcast_recv_yield(comm, 0, tag+1) end - MPI.Barrier(comm) -end -# --- In-place broadcast --- - -function bcast_inplace() - MPI.Barrier(comm) +catch e if rank == 0 - Dagger.bcast_send_yield!(arr, comm, 0, tag) - else - Dagger.bcast_recv_yield!(arr, comm, 0, tag) + showerror(stdout, e) end - MPI.Barrier(comm) end +if rank == 0 + #= Write results to CSV + mkpath("benchmarks/results") + if !isempty(mpidagger_all_results) + df = DataFrame(mpidagger_all_results) + CSV.write("benchmarks/results/DaggerMPI_Weak_scale_results.csv", df) -function bcast_inplace_metadata() - MPI.Barrier(comm) - if rank == 0 - Dagger.bcast_send_yield_metadata(arr, comm, 0) end - MPI.Barrier(comm) + =# + # Summary statistics + for result in mpidagger_all_results + println(result.procs, ",", result.dtype, ",", result.size, ",", result.time, ",", result.gflops) + end + #println("\nAll Cholesky tests completed!") end - - -inplace = @time bcast_inplace() - - -MPI.Barrier(comm) -MPI.Finalize() - - - - -#= -A = rand(Blocks(2,2), 4, 4) -Ac = collect(A) -println(Ac) - - -move!(identity, Ac[1].space , Ac[2].space, Ac[1], Ac[2]) -=# - diff --git a/test/runtests.jl b/test/runtests.jl index bc4505fb3..e575cd597 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -5,6 +5,7 @@ USE_ROCM = parse(Bool, get(ENV, "CI_USE_ROCM", "0")) USE_ONEAPI = parse(Bool, get(ENV, "CI_USE_ONEAPI", "0")) USE_METAL = parse(Bool, get(ENV, "CI_USE_METAL", "0")) USE_OPENCL = parse(Bool, get(ENV, "CI_USE_OPENCL", "0")) +USE_MPI = parse(Bool, get(ENV, "CI_USE_MPI", "0")) USE_GPU = USE_CUDA || USE_ROCM || USE_ONEAPI || USE_METAL || USE_OPENCL tests = [ @@ -46,6 +47,8 @@ tests = [ ("Reusable Data Structures", "reuse.jl"), ("External Languages - Python", "extlang/python.jl"), ("Preferences", "preferences.jl"), + ("MPI_test", "mpi_test.jl"), + #("MPI", "mpi.jl") #("Fault Tolerance", "fault-tolerance.jl"), ] if USE_GPU @@ -55,6 +58,15 @@ if USE_GPU ("Array - Stencils", "array/stencil.jl"), ] end + +if USE_MPI + #Only run MPI tests + tests = [ + #("MPI", "mpi.jl"), + ("MPI_test", "mpi_test.jl"), + ] +end + all_test_names = map(test -> replace(last(test), ".jl"=>""), tests) additional_workers::Int = 3 @@ -65,6 +77,9 @@ if PROGRAM_FILE != "" && realpath(PROGRAM_FILE) == @__FILE__ using Pkg Pkg.activate(@__DIR__) try + # If I not use Pkg.develop it returns the error "Package Dagger not found in current path. + # Run `import Pkg; Pkg.add("Dagger")` to install the Dagger package." + Pkg.develop(path=joinpath(@__DIR__, "..")) Pkg.instantiate() catch end @@ -185,6 +200,17 @@ if USE_GPU include("setup_gpu.jl") end +if USE_MPI + include("setup_mpi.jl") + @info "Running MPI tests via mpiexecjl" + # Construct path to mpiexecjl in the depot's bin directory + # I've tried to figure out another way to launch the mpi jobs but that was the only one I got today + cmd = `mpiexecjl -n 2 $(Base.julia_cmd()) --project=$(Base.active_project()) $(joinpath(@__DIR__, "mpi_test.jl"))` + @info "Executing: $cmd" + run(cmd) + exit(0) +end + try for test in to_test test_title = tests[findfirst(x->x[2]==test * ".jl", tests)][1] diff --git a/test/setup_mpi.jl b/test/setup_mpi.jl new file mode 100644 index 000000000..8fb680ab2 --- /dev/null +++ b/test/setup_mpi.jl @@ -0,0 +1,20 @@ +if USE_MPI + using Pkg + Pkg.add("MPI") + Pkg.add("MPIPreferences") +end + +@everywhere begin + if $USE_MPI + using MPI + using MPIPreferences + + # Configure each rank to use JLL binary + MPIPreferences.use_jll_binary("MPICH_jll"; export_prefs=true) + end +end + +if USE_MPI + # Install mpiexecjl wrapper to standard bin location + MPI.install_mpiexecjl(; destdir=joinpath(DEPOT_PATH[1], "bin"), force=true) +end