diff --git a/ext/CUDAExt.jl b/ext/CUDAExt.jl index ea77889ff..b9964c611 100644 --- a/ext/CUDAExt.jl +++ b/ext/CUDAExt.jl @@ -16,7 +16,7 @@ if isdefined(Base, :get_extension) else import ..CUDA end -import CUDA: CuDevice, CuContext, CuStream, CuArray, CUDABackend +import CUDA: CuDevice, CuContext, CuStream, CuArray, CUDABackend, CuEvent import CUDA: devices, attribute, context, context!, stream, stream! import CUDA: CUBLAS, CUSOLVER @@ -103,8 +103,11 @@ function with_context(f, x) end function _sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace}) + caller_stream = stream() with_context(x) do - CUDA.synchronize() + ev = CUDA.CuEvent() + CUDA.record(ev, stream()) + CUDA.wait(ev, caller_stream) end end function sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace}) @@ -164,7 +167,6 @@ function Dagger.move(from_proc::CPUProc, to_proc::CuArrayDeviceProc, x::Chunk) cpu_data = remotecall_fetch(unwrap, from_w, x) with_context(to_proc) do arr = adapt(CuArray, cpu_data) - CUDA.synchronize() return arr end end @@ -175,7 +177,6 @@ function Dagger.move(from_proc::CPUProc, to_proc::CuArrayDeviceProc, x::CuArray) with_context(to_proc) do _x = similar(x) copyto!(_x, x) - CUDA.synchronize() return _x end end @@ -184,9 +185,7 @@ end function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CPUProc, x) with_context(from_proc) do CUDA.synchronize() - _x = adapt(Array, x) - CUDA.synchronize() - return _x + return adapt(Array, x) end end function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CPUProc, x::Chunk) @@ -203,7 +202,6 @@ function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CPUProc, x::CuArray{ CUDA.synchronize() _x = Array{T,N}(undef, size(x)) copyto!(_x, x) - CUDA.synchronize() return _x end end @@ -211,18 +209,22 @@ end # Out-of-place DtoD function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CuArrayDeviceProc, x::Dagger.Chunk{T}) where T<:CuArray if from_proc == to_proc - # Same process and GPU, no change - arr = unwrap(x) - with_context(CUDA.synchronize, from_proc) - return arr + # Same process and GPU, no change. + # Stream ordering guarantees safety; no sync needed. + return unwrap(x) + elseif Dagger.root_worker_id(from_proc) == Dagger.root_worker_id(to_proc) # Same process but different GPUs, use DtoD copy from_arr = unwrap(x) - with_context(CUDA.synchronize, from_proc) + ev = CUDA.CuEvent() + with_context(from_proc) do + CUDA.record(ev, stream()) + end + return with_context(to_proc) do + CUDA.wait(ev, stream()) to_arr = similar(from_arr) copyto!(to_arr, from_arr) - CUDA.synchronize() return to_arr end elseif Dagger.system_uuid(from_proc.owner) == Dagger.system_uuid(to_proc.owner) && from_proc.device_uuid == to_proc.device_uuid @@ -252,11 +254,12 @@ function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CuArrayDeviceProc, x end else return arr - end + end else # Different node, use DtoH, serialization, HtoD host_copy = remotecall_fetch(from_proc.owner, from_proc, x) do from_proc, x return with_context(from_proc) do + CUDA.synchronize() Array(unwrap(x)) end end @@ -268,20 +271,26 @@ end function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CuArrayDeviceProc, x::CuArray) if from_proc == to_proc - with_context(CUDA.synchronize, from_proc) return x elseif Dagger.root_worker_id(from_proc) == Dagger.root_worker_id(to_proc) - with_context(CUDA.synchronize, from_proc) + ev = CUDA.CuEvent() + with_context(from_proc) do + CUDA.record(ev, stream()) + end + return with_context(to_proc) do + CUDA.wait(ev, stream()) to_arr = similar(x) copyto!(to_arr, x) - CUDA.synchronize() return to_arr end + else host_copy = with_context(from_proc) do + CUDA.synchronize() return Array(x) end + return with_context(to_proc) do return CuArray(host_copy) end @@ -390,13 +399,20 @@ Dagger.gpu_kernel_backend(::CuArrayDeviceProc) = CUDABackend() Dagger.gpu_with_device(f, proc::CuArrayDeviceProc) = CUDA.device!(f, proc.device) function Dagger.gpu_synchronize(proc::CuArrayDeviceProc) + user_stream = stream() + with_context(proc) do - CUDA.synchronize() + ev = CUDA.CuEvent() + CUDA.record(ev, stream()) + CUDA.wait(ev, user_stream) + end end function Dagger.gpu_synchronize(::Val{:CUDA}) + user_stream = stream() for dev in CUDA.devices() - _sync_with_context(CuArrayDeviceProc(myid(), dev.handle, CUDA.uuid(dev))) + proc = CuArrayDeviceProc(myid(), dev.handle, CUDA.uuid(dev)) + Dagger.gpu_synchronize(proc) end end