From 6b871a95e7374a603412ef949c2aaf2e0a63220c Mon Sep 17 00:00:00 2001 From: Keluaa <34173752+Keluaa@users.noreply.github.com> Date: Wed, 13 Mar 2024 10:10:03 +0100 Subject: [PATCH 01/10] Added `IReduce!` and `IAllreduce!` --- src/collective.jl | 90 ++++++++++++++++++++++++++++++++++++++++++ test/test_allreduce.jl | 13 ++++++ test/test_reduce.jl | 23 +++++++++++ 3 files changed, 126 insertions(+) diff --git a/src/collective.jl b/src/collective.jl index 9366fbf26..af951ea08 100644 --- a/src/collective.jl +++ b/src/collective.jl @@ -716,6 +716,57 @@ function Reduce(object::T, op, root::Integer, comm::Comm) where {T} end end +## IReduce + +""" + IReduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0) + IReduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0) + +Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf` and +stores the result in `recvbuf` on the process of rank `root`. + +On non-root processes `recvbuf` is ignored, and can be `nothing`. + +To perform the reduction in place, provide a single buffer `sendrecvbuf`. + +Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction. + +# See also +- [`Reduce!`](@ref) the equivalent blocking operation. +- [`IAllreduce!`](@ref) to send reduction to all ranks. +- [`Op`](@ref) for details on reduction operators. + +# External links +$(_doc_external("MPI_Ireduce")) +""" +IReduce!(sendrecvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) = + IReduce!(sendrecvbuf, op, root, comm, req) +IReduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) = + IReduce!(sendbuf, recvbuf, op, root, comm, req) + +function IReduce!(rbuf::RBuffer, op::Union{Op,MPI_Op}, root::Integer, comm::Comm, req::AbstractRequest=Request()) + # int MPI_Ireduce(const void* sendbuf, void* recvbuf, int count, + # MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm, + # MPI_Request* req) + API.MPI_Ireduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, root, comm, req) + setbuffer!(req, rbuf) + return req +end + +IReduce!(rbuf::RBuffer, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) = + IReduce!(rbuf, Op(op, eltype(rbuf)), root, comm, req) +IReduce!(sendbuf, recvbuf, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) = + IReduce!(RBuffer(sendbuf, recvbuf), op, root, comm, req) + +# inplace +function IReduce!(buf, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) + if Comm_rank(comm) == root + IReduce!(IN_PLACE, buf, op, root, comm, req) + else + IReduce!(buf, nothing, op, root, comm, req) + end +end + ## Allreduce # mutating @@ -775,6 +826,45 @@ Allreduce(sendbuf::AbstractArray, op, comm::Comm) = Allreduce(obj::T, op, comm::Comm) where {T} = Allreduce!(Ref(obj), Ref{T}(), op, comm)[] +## IAllreduce + +""" + IAllreduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()]) + IAllreduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()]) + +Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf`, storing +the result in the `recvbuf` of all processes in the group. + +If only one `sendrecvbuf` buffer is provided, then the operation is performed in-place. + +Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction. + +# See also +- [`Allreduce!`](@ref) the equivalent blocking operation. +- [`IReduce!`](@ref) to send reduction to a single rank. +- [`Op`](@ref) for details on reduction operators. + +# External links +$(_doc_external("MPI_Iallreduce")) +""" +function IAllreduce!(rbuf::RBuffer, op::Union{Op, MPI_Op}, comm::Comm, req::AbstractRequest=Request()) + @assert isnull(req) + # int MPI_Iallreduce(const void* sendbuf, void* recvbuf, int count, + # MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, + # MPI_Request* req) + API.MPI_Iallreduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, comm, req) + setbuffer!(req, rbuf) + return req +end +IAllreduce!(rbuf::RBuffer, op, comm::Comm, req::AbstractRequest=Request()) = + IAllreduce!(rbuf, Op(op, eltype(rbuf)), comm, req) +IAllreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request()) = + IAllreduce!(RBuffer(sendbuf, recvbuf), op, comm, req) + +# inplace +IAllreduce!(rbuf, op, comm::Comm, req::AbstractRequest=Request()) = + IAllreduce!(IN_PLACE, rbuf, op, comm, req) + ## Scan # mutating diff --git a/test/test_allreduce.jl b/test/test_allreduce.jl index 6f23a2e45..00112c6db 100644 --- a/test/test_allreduce.jl +++ b/test/test_allreduce.jl @@ -43,6 +43,19 @@ for T = [Int] vals = MPI.Allreduce(send_arr, op, MPI.COMM_WORLD) @test vals isa ArrayType{T} @test vals == comm_size .* send_arr + + # Nonblocking + recv_arr = ArrayType{T}(undef, size(send_arr)) + req = MPI.IAllreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD) + MPI.Wait(req) + @test recv_arr == comm_size .* send_arr + + # Nonblocking (IN_PLACE) + recv_arr = copy(send_arr) + synchronize() + req = MPI.IAllreduce!(recv_arr, op, MPI.COMM_WORLD) + MPI.Wait(req) + @test recv_arr == comm_size .* send_arr end end end diff --git a/test/test_reduce.jl b/test/test_reduce.jl index 6ad5d24cf..cac4723fb 100644 --- a/test/test_reduce.jl +++ b/test/test_reduce.jl @@ -116,6 +116,22 @@ for T = [Int] @test recv_arr isa ArrayType{T} @test recv_arr == sz .* view(send_arr, 2:3) end + + # Nonblocking + recv_arr = ArrayType{T}(undef, size(send_arr)) + req = MPI.IReduce!(send_arr, recv_arr, op, MPI.COMM_WORLD; root=root) + MPI.Wait(req) + if isroot + @test recv_arr == sz .* send_arr + end + + # Nonblocking (IN_PLACE) + recv_arr = copy(send_arr) + req = MPI.IReduce!(recv_arr, op, MPI.COMM_WORLD; root=root) + MPI.Wait(req) + if isroot + @test recv_arr == sz .* send_arr + end end end end @@ -131,6 +147,13 @@ else @test result === nothing end +recv_arr = isroot ? zeros(eltype(send_arr), size(send_arr)) : nothing +req = MPI.IReduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=root) +MPI.Wait(req) +if rank == root + @test recv_arr ≈ [Double64(sz*i)/10 for i = 1:10] rtol=sz*eps(Double64) +end + MPI.Barrier( MPI.COMM_WORLD ) GC.gc() From 8b5f79b49ea8a5225140381153658b32761ed697 Mon Sep 17 00:00:00 2001 From: Keluaa <34173752+Keluaa@users.noreply.github.com> Date: Wed, 13 Mar 2024 10:38:54 +0100 Subject: [PATCH 02/10] Docs for `IReduce!` and `IAllreduce!` --- docs/src/reference/collective.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/src/reference/collective.md b/docs/src/reference/collective.md index c89e6f880..dfd082720 100644 --- a/docs/src/reference/collective.md +++ b/docs/src/reference/collective.md @@ -55,8 +55,10 @@ MPI.Neighbor_alltoallv! ```@docs MPI.Reduce! MPI.Reduce +MPI.IReduce! MPI.Allreduce! MPI.Allreduce +MPI.IAllreduce! MPI.Scan! MPI.Scan MPI.Exscan! From c5c5ac6b1c72db112669e4c917dcfa10658761f8 Mon Sep 17 00:00:00 2001 From: Keluaa <34173752+Keluaa@users.noreply.github.com> Date: Mon, 18 Nov 2024 14:34:00 +0100 Subject: [PATCH 03/10] Feature support testing for IAllreduce and IReduce for GPU backends --- test/mpi_support_test.jl | 28 ++++++++++++++++++++++++++++ test/runtests.jl | 13 +++++++++++++ test/test_allreduce.jl | 19 +++++++++++++------ test/test_reduce.jl | 26 +++++++++++++++++--------- 4 files changed, 71 insertions(+), 15 deletions(-) create mode 100644 test/mpi_support_test.jl diff --git a/test/mpi_support_test.jl b/test/mpi_support_test.jl new file mode 100644 index 000000000..0c6547cc5 --- /dev/null +++ b/test/mpi_support_test.jl @@ -0,0 +1,28 @@ +include("common.jl") + +MPI.Init() + +# Those MPI calls may be unsupported features (e.g. for GPU backends), and will raise SIGSEGV +# (or a similar signal) when called, which cannot be handled in Julia in a portable way. + +op = ARGS[1] +if op == "IAllreduce" + # IAllreduce is unsupported for CUDA with OpenMPI + UCX + # See https://docs.open-mpi.org/en/main/tuning-apps/networking/cuda.html#which-mpi-apis-do-not-work-with-cuda-aware-ucx + send_arr = ArrayType(zeros(Int, 1)) + recv_arr = ArrayType{Int}(undef, 1) + synchronize() + req = MPI.IAllreduce!(send_arr, recv_arr, +, MPI.COMM_WORLD) + MPI.Wait(req) + +elseif op == "IReduce" + # IAllreduce is unsupported for CUDA with OpenMPI + UCX + send_arr = ArrayType(zeros(Int, 1)) + recv_arr = ArrayType{Int}(undef, 1) + synchronize() + req = MPI.IReduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=0) + MPI.Wait(req) + +else + error("unknown test: $op") +end diff --git a/test/runtests.jl b/test/runtests.jl index 30d27b674..84e2727b3 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -71,6 +71,19 @@ if Sys.isunix() include("mpiexecjl.jl") end +function is_mpi_operation_supported(mpi_op, n=nprocs) + test_file = joinpath(@__DIR__, "mpi_support_test.jl") + cmd = `$(mpiexec()) -n $n $(Base.julia_cmd()) --startup-file=no $test_file $mpi_op` + supported = success(run(ignorestatus(cmd))) + !supported && @warn "$mpi_op is unsupported with $backend_name" + return supported +end + +if ArrayType != Array # we expect that only GPU backends can have unsupported features + ENV["JULIA_MPI_TEST_IALLREDUCE"] = is_mpi_operation_supported("IAllreduce") + ENV["JULIA_MPI_TEST_IREDUCE"] = is_mpi_operation_supported("IReduce") +end + excludefiles = split(get(ENV,"JULIA_MPI_TEST_EXCLUDE",""),',') testdir = @__DIR__ diff --git a/test/test_allreduce.jl b/test/test_allreduce.jl index 00112c6db..3fb9c3b33 100644 --- a/test/test_allreduce.jl +++ b/test/test_allreduce.jl @@ -13,6 +13,9 @@ else operators = [MPI.SUM, +, (x,y) -> 2x+y-x] end +iallreduce_supported = get(ENV, "JULIA_MPI_TEST_IALLREDUCE", "true") == "true" + + for T = [Int] for dims = [1, 2, 3] send_arr = ArrayType(zeros(T, Tuple(3 for i in 1:dims))) @@ -46,16 +49,20 @@ for T = [Int] # Nonblocking recv_arr = ArrayType{T}(undef, size(send_arr)) - req = MPI.IAllreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD) - MPI.Wait(req) - @test recv_arr == comm_size .* send_arr + if iallreduce_supported + req = MPI.IAllreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD) + MPI.Wait(req) + end + @test recv_arr == comm_size .* send_arr skip=!iallreduce_supported # Nonblocking (IN_PLACE) recv_arr = copy(send_arr) synchronize() - req = MPI.IAllreduce!(recv_arr, op, MPI.COMM_WORLD) - MPI.Wait(req) - @test recv_arr == comm_size .* send_arr + if iallreduce_supported + req = MPI.IAllreduce!(recv_arr, op, MPI.COMM_WORLD) + MPI.Wait(req) + end + @test recv_arr == comm_size .* send_arr skip=!iallreduce_supported end end end diff --git a/test/test_reduce.jl b/test/test_reduce.jl index cac4723fb..af73dd424 100644 --- a/test/test_reduce.jl +++ b/test/test_reduce.jl @@ -9,6 +9,8 @@ const can_do_closures = Sys.ARCH !== :aarch64 && !startswith(string(Sys.ARCH), "arm") +ireduce_supported = get(ENV, "JULIA_MPI_TEST_IREDUCE", "true") == "true" + using DoubleFloats MPI.Init() @@ -119,18 +121,22 @@ for T = [Int] # Nonblocking recv_arr = ArrayType{T}(undef, size(send_arr)) - req = MPI.IReduce!(send_arr, recv_arr, op, MPI.COMM_WORLD; root=root) - MPI.Wait(req) + if ireduce_supported + req = MPI.IReduce!(send_arr, recv_arr, op, MPI.COMM_WORLD; root=root) + MPI.Wait(req) + end if isroot - @test recv_arr == sz .* send_arr + @test recv_arr == sz .* send_arr skip=!ireduce_supported end # Nonblocking (IN_PLACE) recv_arr = copy(send_arr) - req = MPI.IReduce!(recv_arr, op, MPI.COMM_WORLD; root=root) - MPI.Wait(req) + if ireduce_supported + req = MPI.IReduce!(recv_arr, op, MPI.COMM_WORLD; root=root) + MPI.Wait(req) + end if isroot - @test recv_arr == sz .* send_arr + @test recv_arr == sz .* send_arr skip=!ireduce_supported end end end @@ -148,10 +154,12 @@ else end recv_arr = isroot ? zeros(eltype(send_arr), size(send_arr)) : nothing -req = MPI.IReduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=root) -MPI.Wait(req) +if ireduce_supported + req = MPI.IReduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=root) + MPI.Wait(req) +end if rank == root - @test recv_arr ≈ [Double64(sz*i)/10 for i = 1:10] rtol=sz*eps(Double64) + @test recv_arr ≈ [Double64(sz*i)/10 for i = 1:10] rtol=sz*eps(Double64) skip=!ireduce_supported end MPI.Barrier( MPI.COMM_WORLD ) From 7d6d5374b2749e48910c9201176f27799ce66db0 Mon Sep 17 00:00:00 2001 From: Keluaa <34173752+Keluaa@users.noreply.github.com> Date: Mon, 18 Nov 2024 14:54:00 +0100 Subject: [PATCH 04/10] Do not use `skip` in tests for compat with Julia 1.6 --- test/test_allreduce.jl | 4 ++-- test/test_reduce.jl | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/test/test_allreduce.jl b/test/test_allreduce.jl index 3fb9c3b33..c4eac32b1 100644 --- a/test/test_allreduce.jl +++ b/test/test_allreduce.jl @@ -52,8 +52,8 @@ for T = [Int] if iallreduce_supported req = MPI.IAllreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD) MPI.Wait(req) + @test recv_arr == comm_size .* send_arr end - @test recv_arr == comm_size .* send_arr skip=!iallreduce_supported # Nonblocking (IN_PLACE) recv_arr = copy(send_arr) @@ -61,8 +61,8 @@ for T = [Int] if iallreduce_supported req = MPI.IAllreduce!(recv_arr, op, MPI.COMM_WORLD) MPI.Wait(req) + @test recv_arr == comm_size .* send_arr end - @test recv_arr == comm_size .* send_arr skip=!iallreduce_supported end end end diff --git a/test/test_reduce.jl b/test/test_reduce.jl index af73dd424..9dfd76ec7 100644 --- a/test/test_reduce.jl +++ b/test/test_reduce.jl @@ -124,9 +124,9 @@ for T = [Int] if ireduce_supported req = MPI.IReduce!(send_arr, recv_arr, op, MPI.COMM_WORLD; root=root) MPI.Wait(req) - end - if isroot - @test recv_arr == sz .* send_arr skip=!ireduce_supported + if isroot + @test recv_arr == sz .* send_arr + end end # Nonblocking (IN_PLACE) @@ -134,9 +134,9 @@ for T = [Int] if ireduce_supported req = MPI.IReduce!(recv_arr, op, MPI.COMM_WORLD; root=root) MPI.Wait(req) - end - if isroot - @test recv_arr == sz .* send_arr skip=!ireduce_supported + if isroot + @test recv_arr == sz .* send_arr + end end end end @@ -157,9 +157,9 @@ recv_arr = isroot ? zeros(eltype(send_arr), size(send_arr)) : nothing if ireduce_supported req = MPI.IReduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=root) MPI.Wait(req) -end -if rank == root - @test recv_arr ≈ [Double64(sz*i)/10 for i = 1:10] rtol=sz*eps(Double64) skip=!ireduce_supported + if rank == root + @test recv_arr ≈ [Double64(sz*i)/10 for i = 1:10] rtol=sz*eps(Double64) + end end MPI.Barrier( MPI.COMM_WORLD ) From 1be23c418775a06dd4c284d8bf67e736b90698f8 Mon Sep 17 00:00:00 2001 From: Keluaa <34173752+Keluaa@users.noreply.github.com> Date: Wed, 11 Feb 2026 09:52:12 +0100 Subject: [PATCH 05/10] Proper capitalization for `Ireduce` and `Iallreduce` --- docs/src/reference/collective.md | 4 +-- src/collective.jl | 54 ++++++++++++++++---------------- test/mpi_support_test.jl | 12 +++---- test/runtests.jl | 4 +-- test/test_allreduce.jl | 4 +-- test/test_reduce.jl | 6 ++-- 6 files changed, 42 insertions(+), 42 deletions(-) diff --git a/docs/src/reference/collective.md b/docs/src/reference/collective.md index dfd082720..8dafe4307 100644 --- a/docs/src/reference/collective.md +++ b/docs/src/reference/collective.md @@ -55,10 +55,10 @@ MPI.Neighbor_alltoallv! ```@docs MPI.Reduce! MPI.Reduce -MPI.IReduce! +MPI.Ireduce! MPI.Allreduce! MPI.Allreduce -MPI.IAllreduce! +MPI.Iallreduce! MPI.Scan! MPI.Scan MPI.Exscan! diff --git a/src/collective.jl b/src/collective.jl index af951ea08..836037eb6 100644 --- a/src/collective.jl +++ b/src/collective.jl @@ -716,11 +716,11 @@ function Reduce(object::T, op, root::Integer, comm::Comm) where {T} end end -## IReduce +## Ireduce """ - IReduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0) - IReduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0) + Ireduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0) + Ireduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0) Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf` and stores the result in `recvbuf` on the process of rank `root`. @@ -733,18 +733,18 @@ Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction. # See also - [`Reduce!`](@ref) the equivalent blocking operation. -- [`IAllreduce!`](@ref) to send reduction to all ranks. +- [`Iallreduce!`](@ref) to send reduction to all ranks. - [`Op`](@ref) for details on reduction operators. # External links $(_doc_external("MPI_Ireduce")) """ -IReduce!(sendrecvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) = - IReduce!(sendrecvbuf, op, root, comm, req) -IReduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) = - IReduce!(sendbuf, recvbuf, op, root, comm, req) +Ireduce!(sendrecvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) = + Ireduce!(sendrecvbuf, op, root, comm, req) +Ireduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) = + Ireduce!(sendbuf, recvbuf, op, root, comm, req) -function IReduce!(rbuf::RBuffer, op::Union{Op,MPI_Op}, root::Integer, comm::Comm, req::AbstractRequest=Request()) +function Ireduce!(rbuf::RBuffer, op::Union{Op,MPI_Op}, root::Integer, comm::Comm, req::AbstractRequest=Request()) # int MPI_Ireduce(const void* sendbuf, void* recvbuf, int count, # MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm, # MPI_Request* req) @@ -753,17 +753,17 @@ function IReduce!(rbuf::RBuffer, op::Union{Op,MPI_Op}, root::Integer, comm::Comm return req end -IReduce!(rbuf::RBuffer, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) = - IReduce!(rbuf, Op(op, eltype(rbuf)), root, comm, req) -IReduce!(sendbuf, recvbuf, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) = - IReduce!(RBuffer(sendbuf, recvbuf), op, root, comm, req) +Ireduce!(rbuf::RBuffer, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) = + Ireduce!(rbuf, Op(op, eltype(rbuf)), root, comm, req) +Ireduce!(sendbuf, recvbuf, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) = + Ireduce!(RBuffer(sendbuf, recvbuf), op, root, comm, req) # inplace -function IReduce!(buf, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) +function Ireduce!(buf, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) if Comm_rank(comm) == root - IReduce!(IN_PLACE, buf, op, root, comm, req) + Ireduce!(IN_PLACE, buf, op, root, comm, req) else - IReduce!(buf, nothing, op, root, comm, req) + Ireduce!(buf, nothing, op, root, comm, req) end end @@ -826,11 +826,11 @@ Allreduce(sendbuf::AbstractArray, op, comm::Comm) = Allreduce(obj::T, op, comm::Comm) where {T} = Allreduce!(Ref(obj), Ref{T}(), op, comm)[] -## IAllreduce +## Iallreduce """ - IAllreduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()]) - IAllreduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()]) + Iallreduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()]) + Iallreduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()]) Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf`, storing the result in the `recvbuf` of all processes in the group. @@ -841,13 +841,13 @@ Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction. # See also - [`Allreduce!`](@ref) the equivalent blocking operation. -- [`IReduce!`](@ref) to send reduction to a single rank. +- [`Ireduce!`](@ref) to send reduction to a single rank. - [`Op`](@ref) for details on reduction operators. # External links $(_doc_external("MPI_Iallreduce")) """ -function IAllreduce!(rbuf::RBuffer, op::Union{Op, MPI_Op}, comm::Comm, req::AbstractRequest=Request()) +function Iallreduce!(rbuf::RBuffer, op::Union{Op, MPI_Op}, comm::Comm, req::AbstractRequest=Request()) @assert isnull(req) # int MPI_Iallreduce(const void* sendbuf, void* recvbuf, int count, # MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, @@ -856,14 +856,14 @@ function IAllreduce!(rbuf::RBuffer, op::Union{Op, MPI_Op}, comm::Comm, req::Abst setbuffer!(req, rbuf) return req end -IAllreduce!(rbuf::RBuffer, op, comm::Comm, req::AbstractRequest=Request()) = - IAllreduce!(rbuf, Op(op, eltype(rbuf)), comm, req) -IAllreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request()) = - IAllreduce!(RBuffer(sendbuf, recvbuf), op, comm, req) +Iallreduce!(rbuf::RBuffer, op, comm::Comm, req::AbstractRequest=Request()) = + Iallreduce!(rbuf, Op(op, eltype(rbuf)), comm, req) +Iallreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request()) = + Iallreduce!(RBuffer(sendbuf, recvbuf), op, comm, req) # inplace -IAllreduce!(rbuf, op, comm::Comm, req::AbstractRequest=Request()) = - IAllreduce!(IN_PLACE, rbuf, op, comm, req) +Iallreduce!(rbuf, op, comm::Comm, req::AbstractRequest=Request()) = + Iallreduce!(IN_PLACE, rbuf, op, comm, req) ## Scan diff --git a/test/mpi_support_test.jl b/test/mpi_support_test.jl index 0c6547cc5..bb1e0c701 100644 --- a/test/mpi_support_test.jl +++ b/test/mpi_support_test.jl @@ -6,21 +6,21 @@ MPI.Init() # (or a similar signal) when called, which cannot be handled in Julia in a portable way. op = ARGS[1] -if op == "IAllreduce" - # IAllreduce is unsupported for CUDA with OpenMPI + UCX +if op == "Iallreduce" + # Iallreduce is unsupported for CUDA with OpenMPI + UCX # See https://docs.open-mpi.org/en/main/tuning-apps/networking/cuda.html#which-mpi-apis-do-not-work-with-cuda-aware-ucx send_arr = ArrayType(zeros(Int, 1)) recv_arr = ArrayType{Int}(undef, 1) synchronize() - req = MPI.IAllreduce!(send_arr, recv_arr, +, MPI.COMM_WORLD) + req = MPI.Iallreduce!(send_arr, recv_arr, +, MPI.COMM_WORLD) MPI.Wait(req) -elseif op == "IReduce" - # IAllreduce is unsupported for CUDA with OpenMPI + UCX +elseif op == "Ireduce" + # Iallreduce is unsupported for CUDA with OpenMPI + UCX send_arr = ArrayType(zeros(Int, 1)) recv_arr = ArrayType{Int}(undef, 1) synchronize() - req = MPI.IReduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=0) + req = MPI.Ireduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=0) MPI.Wait(req) else diff --git a/test/runtests.jl b/test/runtests.jl index 84e2727b3..2363982b7 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -80,8 +80,8 @@ function is_mpi_operation_supported(mpi_op, n=nprocs) end if ArrayType != Array # we expect that only GPU backends can have unsupported features - ENV["JULIA_MPI_TEST_IALLREDUCE"] = is_mpi_operation_supported("IAllreduce") - ENV["JULIA_MPI_TEST_IREDUCE"] = is_mpi_operation_supported("IReduce") + ENV["JULIA_MPI_TEST_IALLREDUCE"] = is_mpi_operation_supported("Iallreduce") + ENV["JULIA_MPI_TEST_IREDUCE"] = is_mpi_operation_supported("Ireduce") end excludefiles = split(get(ENV,"JULIA_MPI_TEST_EXCLUDE",""),',') diff --git a/test/test_allreduce.jl b/test/test_allreduce.jl index c4eac32b1..5b81b3ab5 100644 --- a/test/test_allreduce.jl +++ b/test/test_allreduce.jl @@ -50,7 +50,7 @@ for T = [Int] # Nonblocking recv_arr = ArrayType{T}(undef, size(send_arr)) if iallreduce_supported - req = MPI.IAllreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD) + req = MPI.Iallreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD) MPI.Wait(req) @test recv_arr == comm_size .* send_arr end @@ -59,7 +59,7 @@ for T = [Int] recv_arr = copy(send_arr) synchronize() if iallreduce_supported - req = MPI.IAllreduce!(recv_arr, op, MPI.COMM_WORLD) + req = MPI.Iallreduce!(recv_arr, op, MPI.COMM_WORLD) MPI.Wait(req) @test recv_arr == comm_size .* send_arr end diff --git a/test/test_reduce.jl b/test/test_reduce.jl index 9dfd76ec7..de0ee6b61 100644 --- a/test/test_reduce.jl +++ b/test/test_reduce.jl @@ -122,7 +122,7 @@ for T = [Int] # Nonblocking recv_arr = ArrayType{T}(undef, size(send_arr)) if ireduce_supported - req = MPI.IReduce!(send_arr, recv_arr, op, MPI.COMM_WORLD; root=root) + req = MPI.Ireduce!(send_arr, recv_arr, op, MPI.COMM_WORLD; root=root) MPI.Wait(req) if isroot @test recv_arr == sz .* send_arr @@ -132,7 +132,7 @@ for T = [Int] # Nonblocking (IN_PLACE) recv_arr = copy(send_arr) if ireduce_supported - req = MPI.IReduce!(recv_arr, op, MPI.COMM_WORLD; root=root) + req = MPI.Ireduce!(recv_arr, op, MPI.COMM_WORLD; root=root) MPI.Wait(req) if isroot @test recv_arr == sz .* send_arr @@ -155,7 +155,7 @@ end recv_arr = isroot ? zeros(eltype(send_arr), size(send_arr)) : nothing if ireduce_supported - req = MPI.IReduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=root) + req = MPI.Ireduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=root) MPI.Wait(req) if rank == root @test recv_arr ≈ [Double64(sz*i)/10 for i = 1:10] rtol=sz*eps(Double64) From 275d21d215028fd7e2738ec73dd29d3028114ef7 Mon Sep 17 00:00:00 2001 From: Keluaa <34173752+Keluaa@users.noreply.github.com> Date: Wed, 11 Feb 2026 10:00:19 +0100 Subject: [PATCH 06/10] Test reductions on AMDGPU --- .buildkite/pipeline.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 73a763fa7..b1d7fdceb 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -203,7 +203,7 @@ ' echo "+++ Run tests" - export JULIA_MPI_TEST_EXCLUDE="test_allreduce.jl,test_reduce.jl,test_scan.jl" + export JULIA_MPI_TEST_EXCLUDE="test_scan.jl" julia --color=yes --project=. -e ' import Pkg Pkg.test("MPI"; test_args=["--backend=AMDGPU"]) From c594d259b7ddff4103db449325a13a5a8988b877 Mon Sep 17 00:00:00 2001 From: Keluaa <34173752+Keluaa@users.noreply.github.com> Date: Wed, 11 Feb 2026 11:41:39 +0100 Subject: [PATCH 07/10] Test support for `MPI_Reduce` and `MPI_Allreduce` for AMDGPU --- test/mpi_support_test.jl | 21 +++++++++++++++++---- test/runtests.jl | 6 +++++- test/test_allreduce.jl | 9 +++++++-- test/test_reduce.jl | 5 +++++ 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/test/mpi_support_test.jl b/test/mpi_support_test.jl index bb1e0c701..667a0050c 100644 --- a/test/mpi_support_test.jl +++ b/test/mpi_support_test.jl @@ -6,17 +6,30 @@ MPI.Init() # (or a similar signal) when called, which cannot be handled in Julia in a portable way. op = ARGS[1] -if op == "Iallreduce" - # Iallreduce is unsupported for CUDA with OpenMPI + UCX - # See https://docs.open-mpi.org/en/main/tuning-apps/networking/cuda.html#which-mpi-apis-do-not-work-with-cuda-aware-ucx +if op == "Allreduce" + # Allreduce is unsupported for AMDGPU with UCX + send_arr = ArrayType(zeros(Int, 1)) + recv_arr = ArrayType{Int}(undef, 1) + synchronize() + MPI.Allreduce!(send_arr, recv_arr, +, MPI.COMM_WORLD) + +elseif op == "Iallreduce" + # Iallreduce is unsupported for CUDA with OpenMPI 5 + UCX send_arr = ArrayType(zeros(Int, 1)) recv_arr = ArrayType{Int}(undef, 1) synchronize() req = MPI.Iallreduce!(send_arr, recv_arr, +, MPI.COMM_WORLD) MPI.Wait(req) +elseif op == "Reduce" + # Reduce is unsupported for AMDGPU with UCX + send_arr = ArrayType(zeros(Int, 1)) + recv_arr = ArrayType{Int}(undef, 1) + synchronize() + MPI.Reduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=0) + elseif op == "Ireduce" - # Iallreduce is unsupported for CUDA with OpenMPI + UCX + # Ireduce is unsupported for CUDA with OpenMPI 5 + UCX send_arr = ArrayType(zeros(Int, 1)) recv_arr = ArrayType{Int}(undef, 1) synchronize() diff --git a/test/runtests.jl b/test/runtests.jl index 2363982b7..e2903d8ea 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -74,12 +74,16 @@ end function is_mpi_operation_supported(mpi_op, n=nprocs) test_file = joinpath(@__DIR__, "mpi_support_test.jl") cmd = `$(mpiexec()) -n $n $(Base.julia_cmd()) --startup-file=no $test_file $mpi_op` - supported = success(run(ignorestatus(cmd))) + cmd = pipeline(ignorestatus(cmd); stderr=devnull) + process = run(cmd) + supported = success(process) !supported && @warn "$mpi_op is unsupported with $backend_name" return supported end if ArrayType != Array # we expect that only GPU backends can have unsupported features + ENV["JULIA_MPI_TEST_ALLREDUCE"] = is_mpi_operation_supported("Allreduce") + ENV["JULIA_MPI_TEST_REDUCE"] = is_mpi_operation_supported("Reduce") ENV["JULIA_MPI_TEST_IALLREDUCE"] = is_mpi_operation_supported("Iallreduce") ENV["JULIA_MPI_TEST_IREDUCE"] = is_mpi_operation_supported("Ireduce") end diff --git a/test/test_allreduce.jl b/test/test_allreduce.jl index 5b81b3ab5..c2ae3e506 100644 --- a/test/test_allreduce.jl +++ b/test/test_allreduce.jl @@ -1,5 +1,12 @@ include("common.jl") +allreduce_supported = get(ENV, "JULIA_MPI_TEST_ALLREDUCE", "true") == "true" +iallreduce_supported = get(ENV, "JULIA_MPI_TEST_IALLREDUCE", "true") == "true" +if !allreduce_supported + @warn "Skipping all tests in 'test_allreduce.jl' as reductions are unsupported" + exit(0) +end + MPI.Init() comm_size = MPI.Comm_size(MPI.COMM_WORLD) @@ -13,8 +20,6 @@ else operators = [MPI.SUM, +, (x,y) -> 2x+y-x] end -iallreduce_supported = get(ENV, "JULIA_MPI_TEST_IALLREDUCE", "true") == "true" - for T = [Int] for dims = [1, 2, 3] diff --git a/test/test_reduce.jl b/test/test_reduce.jl index de0ee6b61..1c53e382d 100644 --- a/test/test_reduce.jl +++ b/test/test_reduce.jl @@ -9,7 +9,12 @@ const can_do_closures = Sys.ARCH !== :aarch64 && !startswith(string(Sys.ARCH), "arm") +reduce_supported = get(ENV, "JULIA_MPI_TEST_REDUCE", "true") == "true" ireduce_supported = get(ENV, "JULIA_MPI_TEST_IREDUCE", "true") == "true" +if !reduce_supported + @warn "Skipping all tests in 'test_reduce.jl' as reductions are unsupported" + exit(0) +end using DoubleFloats From c8f20d99169ad97cd1155def8770c8159a29ad24 Mon Sep 17 00:00:00 2001 From: Petr Krysl Date: Sun, 29 Sep 2024 08:13:54 -0700 Subject: [PATCH 08/10] add Ibcast! --- src/collective.jl | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/collective.jl b/src/collective.jl index 836037eb6..c79c418db 100644 --- a/src/collective.jl +++ b/src/collective.jl @@ -103,6 +103,29 @@ function bcast(obj, root::Integer, comm::Comm) return obj end + +""" + Ibcast!(buf, comm::Comm; root::Integer=0[, req::AbstractRequest = Request()]) + +Broadcast the buffer `buf` from `root` to all processes in `comm`. + +# External links +$(_doc_external("MPI_Ibcast")) +""" +Ibcast!(buf, comm::Comm; root::Integer=Cint(0)) = + Ibcast!(buf, root, comm) + +function Ibcast!(buf::Buffer, root::Integer, comm::Comm, req::AbstractRequest = Request()) + # int MPI_Ibcast(void *buffer, int count, MPI_Datatype datatype, int root, + # MPI_Comm comm, MPI_Request *request) + API.MPI_Ibcast(buf.data, buf.count, buf.datatype, root, comm, req) + return req +end +function Ibcast!(data, root::Integer, comm::Comm) + Ibcast!(Buffer(data), root, comm) +end + + """ Scatter!(sendbuf::Union{UBuffer,Nothing}, recvbuf, comm::Comm; root::Integer=0) From ad9c9cd3771df5c67b5d8e94c7a37131d377bdc1 Mon Sep 17 00:00:00 2001 From: Petr Krysl Date: Sun, 17 Nov 2024 10:19:57 -0800 Subject: [PATCH 09/10] add ibcast! tests --- test/test_ibcast.jl | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 test/test_ibcast.jl diff --git a/test/test_ibcast.jl b/test/test_ibcast.jl new file mode 100644 index 000000000..f5bb378d4 --- /dev/null +++ b/test/test_ibcast.jl @@ -0,0 +1,35 @@ +include("common.jl") +using Random + +MPI.Init() + +comm = MPI.COMM_WORLD +root = 0 +matsize = (17,17) + +for T in MPITestTypes + # This test depends on the stability of the rng and we have observed with + # CUDA.jl that it is not guaranteed that the same number of rand calls will + # occur on each rank. (This is a hypothesis). To be sure we shall seed the rng + # just before we call rand. + Random.seed!(17) + A = ArrayType(rand(T, matsize)) + B = MPI.Comm_rank(comm) == root ? A : similar(A) + req = MPI.Ibcast!(B, comm; root=root) + sleep(rand()) + MPI.Wait(req) + @test B == A +end + +# Char +A = ['s', 't', 'a', 'r', ' ', 'w', 'a', 'r', 's'] +B = MPI.Comm_rank(comm) == root ? A : similar(A) +req = MPI.Ibcast!(B, comm; root=root) +sleep(rand()) +MPI.Wait(req) +@test B == A + + + +MPI.Finalize() +@test MPI.Finalized() From af90e5c2ef8bd11cc94f4f9e707a8be695df9791 Mon Sep 17 00:00:00 2001 From: Keluaa <34173752+Keluaa@users.noreply.github.com> Date: Fri, 6 Mar 2026 09:27:42 +0100 Subject: [PATCH 10/10] Add docs for `Ibcast!` --- docs/src/reference/collective.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/src/reference/collective.md b/docs/src/reference/collective.md index 8dafe4307..785ccc5cd 100644 --- a/docs/src/reference/collective.md +++ b/docs/src/reference/collective.md @@ -13,6 +13,7 @@ MPI.Ibarrier MPI.Bcast! MPI.Bcast MPI.bcast +MPI.Ibcast! ``` ## Gather/Scatter