Skip to content

Commit 023fe52

Browse files
authored
Fix multi-device GPU external source. (#3710)
* Fix multi-device GPU external source. * Add cross-device test for PyTorch. Fix MXNet parallel external source test. Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
1 parent 17b508c commit 023fe52

9 files changed

Lines changed: 123 additions & 33 deletions

File tree

dali/core/access_order.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "dali/core/cuda_error.h"
1717
#include "dali/core/cuda_stream.h"
1818
#include "dali/core/cuda_event_pool.h"
19+
#include "dali/core/device_guard.h"
1920

2021
namespace dali {
2122

@@ -37,7 +38,18 @@ void AccessOrder::wait(const AccessOrder &other) const {
3738
int other_dev = other.device_id();
3839
auto event = pool.Get(other_dev);
3940
// Record an event in the preceding stream
40-
CUDA_CALL(cudaEventRecord(event, other.stream()));
41+
42+
// If the stream handle has a special value, we can't refer to it directly - it is
43+
// inherently associated with the concept of "current device" and it must be switched
44+
if (other_dev != device_id_ &&
45+
(other.stream_ == 0 ||
46+
other.stream_ == cudaStreamPerThread ||
47+
other.stream_ == cudaStreamLegacy)) {
48+
DeviceGuard dg(other.device_id_);
49+
CUDA_CALL(cudaEventRecord(event, other.stream()));
50+
} else {
51+
CUDA_CALL(cudaEventRecord(event, other.stream()));
52+
}
4153
// and wait for it in this stream
4254
CUDA_CALL(cudaStreamWaitEvent(stream(), event, 0));
4355
pool.Put(std::move(event), other_dev);

dali/python/backend_impl.cc

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1619,14 +1619,23 @@ PYBIND11_MODULE(backend_impl, m) {
16191619

16201620
// not the most beautiful but at least it doesn't throw as plain cast<T>()
16211621
py::detail::make_caster<Tensor<CPUBackend>&> conv;
1622-
bool is_cpu_data = conv.load(static_cast<py::object>(list[0]), true);
1622+
bool is_cpu_data = list.empty() || conv.load(static_cast<py::object>(list[0]), true);
16231623
if (is_cpu_data) {
16241624
FeedPipeline<CPUBackend>(p, name, list, AccessOrder::host(), true);
16251625
} else {
1626-
cudaStream_t stream = cuda_stream.is_none()
1627-
? UserStream::Get()->GetStream(list[0].cast<Tensor<GPUBackend>&>())
1628-
: static_cast<cudaStream_t>(ctypes_void_ptr(cuda_stream));
1629-
FeedPipeline<GPUBackend>(p, name, list, stream, cuda_stream.is_none(), use_copy_kernel);
1626+
int device_id = p->device_id();
1627+
cudaStream_t stream = 0;
1628+
if (!cuda_stream.is_none())
1629+
stream = static_cast<cudaStream_t>(ctypes_void_ptr(cuda_stream));
1630+
1631+
if (!list.empty()) {
1632+
auto &sample0 = list[0].cast<Tensor<GPUBackend>&>();
1633+
if (cuda_stream.is_none())
1634+
stream = UserStream::Get()->GetStream(sample0);
1635+
device_id = sample0.device_id();
1636+
}
1637+
AccessOrder order(stream, device_id);
1638+
FeedPipeline<GPUBackend>(p, name, list, order, cuda_stream.is_none(), use_copy_kernel);
16301639
}
16311640
},
16321641
"name"_a,

dali/python/nvidia/dali/_debug_mode.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,8 @@ def __rxor__(self, other):
126126
return _PipelineDebug.current()._wrap_op_call(_arithm_op, ["bitxor", other, self], {})
127127

128128

129-
def _transform_data_to_tensorlist(data, batch_size, layout=None):
130-
data = _prep_data_for_feed_input(data, batch_size, layout)
129+
def _transform_data_to_tensorlist(data, batch_size, layout=None, device_id=None):
130+
data = _prep_data_for_feed_input(data, batch_size, layout, device_id)
131131

132132
if isinstance(data, list):
133133
if isinstance(data[0], _tensors.TensorGPU):
@@ -268,7 +268,7 @@ def __enter__(self):
268268

269269
def build(self):
270270
"""Build the pipeline.
271-
271+
272272
Refer to :meth:`Pipeline.build() <nvidia.dali.Pipeline.build>` for details."""
273273
self._built = True
274274

@@ -299,7 +299,7 @@ def run(self):
299299

300300
def feed_input(self, data_node, data, **kwargs):
301301
"""Pass data to an ExternalSource operator inside the pipeline.
302-
302+
303303
Refer to :meth:`Pipeline.feed_input() <nvidia.dali.Pipeline.feed_input>` for details."""
304304
if not self._built:
305305
raise RuntimeError("Pipeline must be built first.")

dali/python/nvidia/dali/_utils/external_source_impl.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
1+
# Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -76,7 +76,7 @@ def assert_cpu_sample_data_type(sample, error_str="Unsupported callback return t
7676
if isinstance(sample, np.ndarray):
7777
return True
7878
if types._is_mxnet_array(sample):
79-
if sample.ctx.device_type != 'cpu':
79+
if sample.context.device_type != 'cpu':
8080
raise TypeError("Unsupported callback return type. "
8181
"GPU tensors are not supported. Got an MXNet GPU tensor.")
8282
return True
@@ -111,7 +111,7 @@ def sample_to_numpy(sample, error_str="Unsupported callback return type. Got: `{
111111
if isinstance(sample, np.ndarray):
112112
return sample
113113
if types._is_mxnet_array(sample):
114-
if sample.ctx.device_type != 'cpu':
114+
if sample.context.device_type != 'cpu':
115115
raise TypeError("Unsupported callback return type. "
116116
"GPU tensors are not supported. Got an MXNet GPU tensor.")
117117
return sample.asnumpy()

dali/python/nvidia/dali/external_source.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ def _check_data_batch(data, batch_size, layout):
5454
if layout is not None and layout != "" and dim != len(layout):
5555
raise RuntimeError("The layout '{}' cannot describe {}-dimensional data".format(layout, dim))
5656

57-
58-
def _prep_data_for_feed_input(data, batch_size, layout):
57+
def _prep_data_for_feed_input(data, batch_size, layout, device_id = None):
5958
def to_numpy(x):
6059
if _types._is_mxnet_array(x):
6160
return x.asnumpy()
@@ -82,7 +81,10 @@ def to_numpy(x):
8281
if isinstance(datum, (_tensors.TensorCPU, _tensors.TensorGPU)):
8382
inp = type(datum)(datum, layout=layout) if layout is not None else datum
8483
elif hasattr(datum, "__cuda_array_interface__") or (info[0] and info[1]):
85-
inp = _tensors.TensorGPU(datum, layout or "")
84+
array_device_id = _types._get_device_id_for_array(datum)
85+
if array_device_id is None:
86+
array_device_id = device_id
87+
inp = _tensors.TensorGPU(datum, layout or "", array_device_id)
8688
else:
8789
datum = to_numpy(datum)
8890
inp = _tensors.TensorCPU(datum, layout or "")

dali/python/nvidia/dali/pipeline.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,6 @@ def _show_deprecation_warning(deprecated, in_favor_of):
4343
Warning, stacklevel=2)
4444

4545

46-
def _get_default_stream_for_array(array):
47-
if isinstance(array, list) and len(array):
48-
array = array[0]
49-
if types._is_torch_tensor(array):
50-
import torch
51-
return torch.cuda.current_stream().cuda_stream
52-
elif types._is_cupy_array(array):
53-
import cupy
54-
return cupy.cuda.get_current_stream().ptr
55-
else:
56-
return None
57-
58-
5946
class Pipeline(object):
6047
"""Pipeline class is the base of all DALI data pipelines. The pipeline
6148
encapsulates the data processing graph and the execution engine.
@@ -718,13 +705,13 @@ def build(self, define_graph = None):
718705
def _feed_input(self, name, data, layout=None, cuda_stream=None, use_copy_kernel=False):
719706
from nvidia.dali.external_source import _prep_data_for_feed_input
720707
if cuda_stream is None:
721-
cuda_stream = _get_default_stream_for_array(data)
708+
cuda_stream = types._get_default_stream_for_array(data)
722709
if cuda_stream == -1:
723710
cuda_stream = None
724711
else:
725712
cuda_stream = types._raw_cuda_stream(cuda_stream)
726713

727-
data = _prep_data_for_feed_input(data, self._max_batch_size, layout)
714+
data = _prep_data_for_feed_input(data, self._max_batch_size, layout, self._device_id)
728715

729716
if isinstance(data, list):
730717
self._pipe.SetExternalTensorInput(

dali/python/nvidia/dali/types.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,30 @@ def _raw_cuda_stream(stream_obj):
333333
else:
334334
return stream_obj
335335

336+
def _get_default_stream_for_array(array):
337+
if isinstance(array, list) and len(array):
338+
array = array[0]
339+
if _is_torch_tensor(array):
340+
import torch
341+
return _raw_cuda_stream(torch.cuda.current_stream())
342+
elif _is_cupy_array(array):
343+
import cupy
344+
return _raw_cuda_stream(cupy.cuda.get_current_stream())
345+
else:
346+
return None
347+
348+
def _get_device_id_for_array(array):
349+
if isinstance(array, list) and len(array):
350+
array = array[0]
351+
if _is_torch_tensor(array):
352+
return array.device.index
353+
elif _is_cupy_array(array):
354+
return array.device
355+
elif _is_mxnet_array(array):
356+
return array.context.device_id
357+
else:
358+
return None
359+
336360
_cupy_array_type_regex = re.compile('.*cupy\..*\.ndarray.*')
337361

338362
def _is_cupy_array(value):

dali/test/python/test_external_source_cupy.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
1+
# Copyright (c) 2020, 2022, NVIDIA CORPORATION. All rights reserved.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -55,3 +55,32 @@ def generator(i):
5555
with check_output_pattern(pattern):
5656
for _ in range(iterations):
5757
pipe.run()
58+
59+
60+
def _test_cross_device(src, dst):
61+
import nvidia.dali.fn as fn
62+
import numpy as np
63+
64+
pipe = Pipeline(1, 3, dst)
65+
66+
iter = 0
67+
def get_data():
68+
nonlocal iter
69+
with cp.cuda.Device(src):
70+
data = cp.array([[1,2,3,4],[5,6,7,8]], dtype=cp.float32) + iter
71+
iter += 1
72+
return data
73+
74+
with pipe:
75+
pipe.set_outputs(fn.external_source(get_data, batch=False, device='gpu'))
76+
77+
pipe.build()
78+
for i in range(10):
79+
out, = pipe.run()
80+
assert np.array_equal(np.array(out[0].as_cpu()), np.array([[1,2,3,4],[5,6,7,8]]) + i)
81+
82+
def test_cross_device():
83+
if cp.cuda.runtime.getDeviceCount() > 1:
84+
for src in [0,1]:
85+
for dst in [0,1]:
86+
yield _test_cross_device, src, dst

dali/test/python/test_external_source_pytorch_gpu.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
1+
# Copyright (c) 2020, 2022, NVIDIA CORPORATION. All rights reserved.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -43,3 +43,30 @@ def gen_batch():
4343

4444
for i in range(10):
4545
check_output(pipe.run(), [np.array([attempt * 100 + (i + 1) * 10 + 1.5], dtype=np.float32)])
46+
47+
def _test_cross_device(src, dst):
48+
import nvidia.dali.fn as fn
49+
import numpy as np
50+
51+
pipe = Pipeline(1, 3, dst)
52+
53+
iter = 0
54+
def get_data():
55+
nonlocal iter
56+
data = torch.tensor([[1,2,3,4],[5,6,7,8]], dtype=torch.float32).cuda(device=dst) + iter
57+
iter += 1
58+
return data
59+
60+
with pipe:
61+
pipe.set_outputs(fn.external_source(get_data, batch=False, device='gpu'))
62+
63+
pipe.build()
64+
for i in range(10):
65+
out, = pipe.run()
66+
assert np.array_equal(np.array(out[0].as_cpu()), np.array([[1,2,3,4],[5,6,7,8]]) + i)
67+
68+
def test_cross_device():
69+
if torch.cuda.device_count() > 1:
70+
for src in [0,1]:
71+
for dst in [0,1]:
72+
yield _test_cross_device, src, dst

0 commit comments

Comments
 (0)