Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 36 additions & 20 deletions ext/CUDAExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ if isdefined(Base, :get_extension)
else
import ..CUDA
end
import CUDA: CuDevice, CuContext, CuStream, CuArray, CUDABackend
import CUDA: CuDevice, CuContext, CuStream, CuArray, CUDABackend, CuEvent
import CUDA: devices, attribute, context, context!, stream, stream!
import CUDA: CUBLAS, CUSOLVER

Expand Down Expand Up @@ -103,8 +103,11 @@ function with_context(f, x)
end

function _sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace})
caller_stream = stream()
with_context(x) do
CUDA.synchronize()
ev = CUDA.CuEvent()
CUDA.record(ev, stream())
CUDA.wait(ev, caller_stream)
end
end
function sync_with_context(x::Union{Dagger.Processor,Dagger.MemorySpace})
Expand Down Expand Up @@ -164,7 +167,6 @@ function Dagger.move(from_proc::CPUProc, to_proc::CuArrayDeviceProc, x::Chunk)
cpu_data = remotecall_fetch(unwrap, from_w, x)
with_context(to_proc) do
arr = adapt(CuArray, cpu_data)
CUDA.synchronize()
return arr
end
end
Expand All @@ -175,7 +177,6 @@ function Dagger.move(from_proc::CPUProc, to_proc::CuArrayDeviceProc, x::CuArray)
with_context(to_proc) do
_x = similar(x)
copyto!(_x, x)
CUDA.synchronize()
return _x
end
end
Expand All @@ -184,9 +185,7 @@ end
function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CPUProc, x)
with_context(from_proc) do
CUDA.synchronize()
_x = adapt(Array, x)
CUDA.synchronize()
return _x
return adapt(Array, x)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary - CUDA.synchronize() is a stream-local sync (it's the same as CUDA.synchronize(stream()).

end
end
function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CPUProc, x::Chunk)
Expand All @@ -203,26 +202,29 @@ function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CPUProc, x::CuArray{
CUDA.synchronize()
_x = Array{T,N}(undef, size(x))
copyto!(_x, x)
CUDA.synchronize()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with this function, this shouldn't be necessary.

return _x
end
end

# Out-of-place DtoD
function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CuArrayDeviceProc, x::Dagger.Chunk{T}) where T<:CuArray
if from_proc == to_proc
# Same process and GPU, no change
arr = unwrap(x)
with_context(CUDA.synchronize, from_proc)
return arr
# Same process and GPU, no change.
# Stream ordering guarantees safety; no sync needed.
return unwrap(x)

elseif Dagger.root_worker_id(from_proc) == Dagger.root_worker_id(to_proc)
# Same process but different GPUs, use DtoD copy
from_arr = unwrap(x)
with_context(CUDA.synchronize, from_proc)
ev = CUDA.CuEvent()
with_context(from_proc) do
CUDA.record(ev, stream())
end

return with_context(to_proc) do
CUDA.wait(ev, stream())
to_arr = similar(from_arr)
copyto!(to_arr, from_arr)
CUDA.synchronize()
return to_arr
end
elseif Dagger.system_uuid(from_proc.owner) == Dagger.system_uuid(to_proc.owner) && from_proc.device_uuid == to_proc.device_uuid
Expand Down Expand Up @@ -252,11 +254,12 @@ function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CuArrayDeviceProc, x
end
else
return arr
end
end
else
# Different node, use DtoH, serialization, HtoD
host_copy = remotecall_fetch(from_proc.owner, from_proc, x) do from_proc, x
return with_context(from_proc) do
CUDA.synchronize()
Array(unwrap(x))
end
end
Expand All @@ -268,20 +271,26 @@ end

function Dagger.move(from_proc::CuArrayDeviceProc, to_proc::CuArrayDeviceProc, x::CuArray)
if from_proc == to_proc
with_context(CUDA.synchronize, from_proc)
return x
elseif Dagger.root_worker_id(from_proc) == Dagger.root_worker_id(to_proc)
with_context(CUDA.synchronize, from_proc)
ev = CUDA.CuEvent()
with_context(from_proc) do
CUDA.record(ev, stream())
end

return with_context(to_proc) do
CUDA.wait(ev, stream())
to_arr = similar(x)
copyto!(to_arr, x)
CUDA.synchronize()
return to_arr
end

else
host_copy = with_context(from_proc) do
CUDA.synchronize()
return Array(x)
end

return with_context(to_proc) do
return CuArray(host_copy)
end
Expand Down Expand Up @@ -390,13 +399,20 @@ Dagger.gpu_kernel_backend(::CuArrayDeviceProc) = CUDABackend()
Dagger.gpu_with_device(f, proc::CuArrayDeviceProc) =
CUDA.device!(f, proc.device)
function Dagger.gpu_synchronize(proc::CuArrayDeviceProc)
user_stream = stream()

with_context(proc) do
CUDA.synchronize()
ev = CUDA.CuEvent()
CUDA.record(ev, stream())
CUDA.wait(ev, user_stream)

end
end
function Dagger.gpu_synchronize(::Val{:CUDA})
user_stream = stream()
for dev in CUDA.devices()
_sync_with_context(CuArrayDeviceProc(myid(), dev.handle, CUDA.uuid(dev)))
proc = CuArrayDeviceProc(myid(), dev.handle, CUDA.uuid(dev))
Dagger.gpu_synchronize(proc)
end
end

Expand Down
Loading