diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 73a763fa7..b1d7fdceb 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -203,7 +203,7 @@ ' echo "+++ Run tests" - export JULIA_MPI_TEST_EXCLUDE="test_allreduce.jl,test_reduce.jl,test_scan.jl" + export JULIA_MPI_TEST_EXCLUDE="test_scan.jl" julia --color=yes --project=. -e ' import Pkg Pkg.test("MPI"; test_args=["--backend=AMDGPU"]) diff --git a/docs/src/reference/collective.md b/docs/src/reference/collective.md index c89e6f880..785ccc5cd 100644 --- a/docs/src/reference/collective.md +++ b/docs/src/reference/collective.md @@ -13,6 +13,7 @@ MPI.Ibarrier MPI.Bcast! MPI.Bcast MPI.bcast +MPI.Ibcast! ``` ## Gather/Scatter @@ -55,8 +56,10 @@ MPI.Neighbor_alltoallv! ```@docs MPI.Reduce! MPI.Reduce +MPI.Ireduce! MPI.Allreduce! MPI.Allreduce +MPI.Iallreduce! MPI.Scan! MPI.Scan MPI.Exscan! diff --git a/src/collective.jl b/src/collective.jl index 9366fbf26..c79c418db 100644 --- a/src/collective.jl +++ b/src/collective.jl @@ -103,6 +103,29 @@ function bcast(obj, root::Integer, comm::Comm) return obj end + +""" + Ibcast!(buf, comm::Comm; root::Integer=0[, req::AbstractRequest = Request()]) + +Broadcast the buffer `buf` from `root` to all processes in `comm`. + +# External links +$(_doc_external("MPI_Ibcast")) +""" +Ibcast!(buf, comm::Comm; root::Integer=Cint(0)) = + Ibcast!(buf, root, comm) + +function Ibcast!(buf::Buffer, root::Integer, comm::Comm, req::AbstractRequest = Request()) + # int MPI_Ibcast(void *buffer, int count, MPI_Datatype datatype, int root, + # MPI_Comm comm, MPI_Request *request) + API.MPI_Ibcast(buf.data, buf.count, buf.datatype, root, comm, req) + return req +end +function Ibcast!(data, root::Integer, comm::Comm) + Ibcast!(Buffer(data), root, comm) +end + + """ Scatter!(sendbuf::Union{UBuffer,Nothing}, recvbuf, comm::Comm; root::Integer=0) @@ -716,6 +739,57 @@ function Reduce(object::T, op, root::Integer, comm::Comm) where {T} end end +## Ireduce + +""" + Ireduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0) + Ireduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0) + +Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf` and +stores the result in `recvbuf` on the process of rank `root`. + +On non-root processes `recvbuf` is ignored, and can be `nothing`. + +To perform the reduction in place, provide a single buffer `sendrecvbuf`. + +Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction. + +# See also +- [`Reduce!`](@ref) the equivalent blocking operation. +- [`Iallreduce!`](@ref) to send reduction to all ranks. +- [`Op`](@ref) for details on reduction operators. + +# External links +$(_doc_external("MPI_Ireduce")) +""" +Ireduce!(sendrecvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) = + Ireduce!(sendrecvbuf, op, root, comm, req) +Ireduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) = + Ireduce!(sendbuf, recvbuf, op, root, comm, req) + +function Ireduce!(rbuf::RBuffer, op::Union{Op,MPI_Op}, root::Integer, comm::Comm, req::AbstractRequest=Request()) + # int MPI_Ireduce(const void* sendbuf, void* recvbuf, int count, + # MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm, + # MPI_Request* req) + API.MPI_Ireduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, root, comm, req) + setbuffer!(req, rbuf) + return req +end + +Ireduce!(rbuf::RBuffer, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) = + Ireduce!(rbuf, Op(op, eltype(rbuf)), root, comm, req) +Ireduce!(sendbuf, recvbuf, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) = + Ireduce!(RBuffer(sendbuf, recvbuf), op, root, comm, req) + +# inplace +function Ireduce!(buf, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) + if Comm_rank(comm) == root + Ireduce!(IN_PLACE, buf, op, root, comm, req) + else + Ireduce!(buf, nothing, op, root, comm, req) + end +end + ## Allreduce # mutating @@ -775,6 +849,45 @@ Allreduce(sendbuf::AbstractArray, op, comm::Comm) = Allreduce(obj::T, op, comm::Comm) where {T} = Allreduce!(Ref(obj), Ref{T}(), op, comm)[] +## Iallreduce + +""" + Iallreduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()]) + Iallreduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()]) + +Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf`, storing +the result in the `recvbuf` of all processes in the group. + +If only one `sendrecvbuf` buffer is provided, then the operation is performed in-place. + +Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction. + +# See also +- [`Allreduce!`](@ref) the equivalent blocking operation. +- [`Ireduce!`](@ref) to send reduction to a single rank. +- [`Op`](@ref) for details on reduction operators. + +# External links +$(_doc_external("MPI_Iallreduce")) +""" +function Iallreduce!(rbuf::RBuffer, op::Union{Op, MPI_Op}, comm::Comm, req::AbstractRequest=Request()) + @assert isnull(req) + # int MPI_Iallreduce(const void* sendbuf, void* recvbuf, int count, + # MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, + # MPI_Request* req) + API.MPI_Iallreduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, comm, req) + setbuffer!(req, rbuf) + return req +end +Iallreduce!(rbuf::RBuffer, op, comm::Comm, req::AbstractRequest=Request()) = + Iallreduce!(rbuf, Op(op, eltype(rbuf)), comm, req) +Iallreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request()) = + Iallreduce!(RBuffer(sendbuf, recvbuf), op, comm, req) + +# inplace +Iallreduce!(rbuf, op, comm::Comm, req::AbstractRequest=Request()) = + Iallreduce!(IN_PLACE, rbuf, op, comm, req) + ## Scan # mutating diff --git a/test/mpi_support_test.jl b/test/mpi_support_test.jl new file mode 100644 index 000000000..667a0050c --- /dev/null +++ b/test/mpi_support_test.jl @@ -0,0 +1,41 @@ +include("common.jl") + +MPI.Init() + +# Those MPI calls may be unsupported features (e.g. for GPU backends), and will raise SIGSEGV +# (or a similar signal) when called, which cannot be handled in Julia in a portable way. + +op = ARGS[1] +if op == "Allreduce" + # Allreduce is unsupported for AMDGPU with UCX + send_arr = ArrayType(zeros(Int, 1)) + recv_arr = ArrayType{Int}(undef, 1) + synchronize() + MPI.Allreduce!(send_arr, recv_arr, +, MPI.COMM_WORLD) + +elseif op == "Iallreduce" + # Iallreduce is unsupported for CUDA with OpenMPI 5 + UCX + send_arr = ArrayType(zeros(Int, 1)) + recv_arr = ArrayType{Int}(undef, 1) + synchronize() + req = MPI.Iallreduce!(send_arr, recv_arr, +, MPI.COMM_WORLD) + MPI.Wait(req) + +elseif op == "Reduce" + # Reduce is unsupported for AMDGPU with UCX + send_arr = ArrayType(zeros(Int, 1)) + recv_arr = ArrayType{Int}(undef, 1) + synchronize() + MPI.Reduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=0) + +elseif op == "Ireduce" + # Ireduce is unsupported for CUDA with OpenMPI 5 + UCX + send_arr = ArrayType(zeros(Int, 1)) + recv_arr = ArrayType{Int}(undef, 1) + synchronize() + req = MPI.Ireduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=0) + MPI.Wait(req) + +else + error("unknown test: $op") +end diff --git a/test/runtests.jl b/test/runtests.jl index 30d27b674..e2903d8ea 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -71,6 +71,23 @@ if Sys.isunix() include("mpiexecjl.jl") end +function is_mpi_operation_supported(mpi_op, n=nprocs) + test_file = joinpath(@__DIR__, "mpi_support_test.jl") + cmd = `$(mpiexec()) -n $n $(Base.julia_cmd()) --startup-file=no $test_file $mpi_op` + cmd = pipeline(ignorestatus(cmd); stderr=devnull) + process = run(cmd) + supported = success(process) + !supported && @warn "$mpi_op is unsupported with $backend_name" + return supported +end + +if ArrayType != Array # we expect that only GPU backends can have unsupported features + ENV["JULIA_MPI_TEST_ALLREDUCE"] = is_mpi_operation_supported("Allreduce") + ENV["JULIA_MPI_TEST_REDUCE"] = is_mpi_operation_supported("Reduce") + ENV["JULIA_MPI_TEST_IALLREDUCE"] = is_mpi_operation_supported("Iallreduce") + ENV["JULIA_MPI_TEST_IREDUCE"] = is_mpi_operation_supported("Ireduce") +end + excludefiles = split(get(ENV,"JULIA_MPI_TEST_EXCLUDE",""),',') testdir = @__DIR__ diff --git a/test/test_allreduce.jl b/test/test_allreduce.jl index 6f23a2e45..c2ae3e506 100644 --- a/test/test_allreduce.jl +++ b/test/test_allreduce.jl @@ -1,5 +1,12 @@ include("common.jl") +allreduce_supported = get(ENV, "JULIA_MPI_TEST_ALLREDUCE", "true") == "true" +iallreduce_supported = get(ENV, "JULIA_MPI_TEST_IALLREDUCE", "true") == "true" +if !allreduce_supported + @warn "Skipping all tests in 'test_allreduce.jl' as reductions are unsupported" + exit(0) +end + MPI.Init() comm_size = MPI.Comm_size(MPI.COMM_WORLD) @@ -13,6 +20,7 @@ else operators = [MPI.SUM, +, (x,y) -> 2x+y-x] end + for T = [Int] for dims = [1, 2, 3] send_arr = ArrayType(zeros(T, Tuple(3 for i in 1:dims))) @@ -43,6 +51,23 @@ for T = [Int] vals = MPI.Allreduce(send_arr, op, MPI.COMM_WORLD) @test vals isa ArrayType{T} @test vals == comm_size .* send_arr + + # Nonblocking + recv_arr = ArrayType{T}(undef, size(send_arr)) + if iallreduce_supported + req = MPI.Iallreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD) + MPI.Wait(req) + @test recv_arr == comm_size .* send_arr + end + + # Nonblocking (IN_PLACE) + recv_arr = copy(send_arr) + synchronize() + if iallreduce_supported + req = MPI.Iallreduce!(recv_arr, op, MPI.COMM_WORLD) + MPI.Wait(req) + @test recv_arr == comm_size .* send_arr + end end end end diff --git a/test/test_ibcast.jl b/test/test_ibcast.jl new file mode 100644 index 000000000..f5bb378d4 --- /dev/null +++ b/test/test_ibcast.jl @@ -0,0 +1,35 @@ +include("common.jl") +using Random + +MPI.Init() + +comm = MPI.COMM_WORLD +root = 0 +matsize = (17,17) + +for T in MPITestTypes + # This test depends on the stability of the rng and we have observed with + # CUDA.jl that it is not guaranteed that the same number of rand calls will + # occur on each rank. (This is a hypothesis). To be sure we shall seed the rng + # just before we call rand. + Random.seed!(17) + A = ArrayType(rand(T, matsize)) + B = MPI.Comm_rank(comm) == root ? A : similar(A) + req = MPI.Ibcast!(B, comm; root=root) + sleep(rand()) + MPI.Wait(req) + @test B == A +end + +# Char +A = ['s', 't', 'a', 'r', ' ', 'w', 'a', 'r', 's'] +B = MPI.Comm_rank(comm) == root ? A : similar(A) +req = MPI.Ibcast!(B, comm; root=root) +sleep(rand()) +MPI.Wait(req) +@test B == A + + + +MPI.Finalize() +@test MPI.Finalized() diff --git a/test/test_reduce.jl b/test/test_reduce.jl index 6ad5d24cf..1c53e382d 100644 --- a/test/test_reduce.jl +++ b/test/test_reduce.jl @@ -9,6 +9,13 @@ const can_do_closures = Sys.ARCH !== :aarch64 && !startswith(string(Sys.ARCH), "arm") +reduce_supported = get(ENV, "JULIA_MPI_TEST_REDUCE", "true") == "true" +ireduce_supported = get(ENV, "JULIA_MPI_TEST_IREDUCE", "true") == "true" +if !reduce_supported + @warn "Skipping all tests in 'test_reduce.jl' as reductions are unsupported" + exit(0) +end + using DoubleFloats MPI.Init() @@ -116,6 +123,26 @@ for T = [Int] @test recv_arr isa ArrayType{T} @test recv_arr == sz .* view(send_arr, 2:3) end + + # Nonblocking + recv_arr = ArrayType{T}(undef, size(send_arr)) + if ireduce_supported + req = MPI.Ireduce!(send_arr, recv_arr, op, MPI.COMM_WORLD; root=root) + MPI.Wait(req) + if isroot + @test recv_arr == sz .* send_arr + end + end + + # Nonblocking (IN_PLACE) + recv_arr = copy(send_arr) + if ireduce_supported + req = MPI.Ireduce!(recv_arr, op, MPI.COMM_WORLD; root=root) + MPI.Wait(req) + if isroot + @test recv_arr == sz .* send_arr + end + end end end end @@ -131,6 +158,15 @@ else @test result === nothing end +recv_arr = isroot ? zeros(eltype(send_arr), size(send_arr)) : nothing +if ireduce_supported + req = MPI.Ireduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=root) + MPI.Wait(req) + if rank == root + @test recv_arr ≈ [Double64(sz*i)/10 for i = 1:10] rtol=sz*eps(Double64) + end +end + MPI.Barrier( MPI.COMM_WORLD ) GC.gc()