Skip to content

Commit 2ebecba

Browse files
author
Dan Toškan
committed
Added CUDA stream management, async device->host memcopy handling, synchronization per-stream instead of for the entire device
1 parent acd2f69 commit 2ebecba

38 files changed

Lines changed: 318 additions & 214 deletions

cpp/benchmarks/core/HashMap.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,8 @@ void HashReserveInt(benchmark::State& state,
253253

254254
class Int3 {
255255
public:
256-
Int3() : x_(0), y_(0), z_(0) {};
257-
Int3(int k) : x_(k), y_(k * 2), z_(k * 4) {};
256+
Int3() : x_(0), y_(0), z_(0){};
257+
Int3(int k) : x_(k), y_(k * 2), z_(k * 4){};
258258
bool operator==(const Int3& other) const {
259259
return x_ == other.x_ && y_ == other.y_ && z_ == other.z_;
260260
}

cpp/open3d/core/CUDAUtils.cpp

Lines changed: 48 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -141,41 +141,55 @@ static void SetDevice(int device_id) {
141141
OPEN3D_CUDA_CHECK(cudaSetDevice(device_id));
142142
}
143143

144-
class CUDAStream {
145-
public:
146-
static CUDAStream& GetInstance() {
147-
// The global stream state is given per thread like CUDA's internal
148-
// device state.
149-
static thread_local CUDAStream instance;
150-
return instance;
151-
}
144+
void Synchronize(const CUDAStream& stream) {
145+
OPEN3D_CUDA_CHECK(cudaStreamSynchronize(stream.Get()));
146+
}
152147

153-
cudaStream_t Get() { return stream_; }
154-
void Set(cudaStream_t stream) { stream_ = stream; }
148+
#endif
155149

156-
static cudaStream_t Default() { return static_cast<cudaStream_t>(0); }
150+
} // namespace cuda
157151

158-
private:
159-
CUDAStream() = default;
160-
CUDAStream(const CUDAStream&) = delete;
161-
CUDAStream& operator=(const CUDAStream&) = delete;
152+
#ifdef BUILD_CUDA_MODULE
162153

163-
cudaStream_t stream_ = Default();
164-
};
154+
CUDAStream& CUDAStream::GetInstance() {
155+
// The global stream state is given per thread like CUDA's internal
156+
// device state.
157+
thread_local CUDAStream instance = cuda::GetDefaultStream();
158+
return instance;
159+
}
160+
161+
CUDAStream CUDAStream::CreateNew() {
162+
CUDAStream stream;
163+
OPEN3D_CUDA_CHECK(cudaStreamCreate(&stream.stream_));
164+
// Having async memcpy device->host is very dangerous if you don't know what
165+
// you are doing.
166+
stream.SetShouldSyncMemcpyFromDeviceToHost(true);
167+
return stream;
168+
}
165169

166-
cudaStream_t GetStream() { return CUDAStream::GetInstance().Get(); }
170+
void CUDAStream::SetShouldSyncMemcpyFromDeviceToHost(
171+
bool sync_memcpy_device_to_host) {
172+
OPEN3D_ASSERT(!IsDefaultStream());
173+
sync_memcpy_from_device_to_host_ = sync_memcpy_device_to_host;
174+
}
167175

168-
static void SetStream(cudaStream_t stream) {
169-
CUDAStream::GetInstance().Set(stream);
176+
bool CUDAStream::ShouldSyncMemcpyFromDeviceToHost() const {
177+
return sync_memcpy_from_device_to_host_;
170178
}
171179

172-
cudaStream_t GetDefaultStream() { return CUDAStream::Default(); }
180+
bool CUDAStream::IsDefaultStream() const {
181+
return stream_ == static_cast<cudaStream_t>(nullptr);
182+
}
173183

174-
#endif
184+
cudaStream_t CUDAStream::Get() const { return stream_; }
175185

176-
} // namespace cuda
186+
void CUDAStream::Set(cudaStream_t stream) { stream_ = stream; }
177187

178-
#ifdef BUILD_CUDA_MODULE
188+
void CUDAStream::Destroy() {
189+
OPEN3D_ASSERT(!IsDefaultStream());
190+
OPEN3D_CUDA_CHECK(cudaStreamDestroy(stream_));
191+
*this = cuda::GetDefaultStream();
192+
}
179193

180194
CUDAScopedDevice::CUDAScopedDevice(int device_id)
181195
: prev_device_id_(cuda::GetDevice()) {
@@ -189,27 +203,22 @@ CUDAScopedDevice::CUDAScopedDevice(const Device& device)
189203

190204
CUDAScopedDevice::~CUDAScopedDevice() { cuda::SetDevice(prev_device_id_); }
191205

192-
constexpr CUDAScopedStream::CreateNewStreamTag
193-
CUDAScopedStream::CreateNewStream;
194-
195-
CUDAScopedStream::CUDAScopedStream(const CreateNewStreamTag&)
196-
: prev_stream_(cuda::GetStream()), owns_new_stream_(true) {
197-
OPEN3D_CUDA_CHECK(cudaStreamCreate(&new_stream_));
198-
cuda::SetStream(new_stream_);
199-
}
200-
201-
CUDAScopedStream::CUDAScopedStream(cudaStream_t stream)
202-
: prev_stream_(cuda::GetStream()),
206+
CUDAScopedStream::CUDAScopedStream(CUDAStream stream, bool destroy_on_exit)
207+
: prev_stream_(CUDAStream::GetInstance()),
203208
new_stream_(stream),
204-
owns_new_stream_(false) {
205-
cuda::SetStream(stream);
209+
owns_new_stream_(destroy_on_exit) {
210+
CUDAStream::GetInstance() = new_stream_;
206211
}
207212

208213
CUDAScopedStream::~CUDAScopedStream() {
209214
if (owns_new_stream_) {
210-
OPEN3D_CUDA_CHECK(cudaStreamDestroy(new_stream_));
215+
OPEN3D_ASSERT((prev_stream_.Get() != new_stream_.Get()) &&
216+
"CUDAScopedStream destroy_on_exit would destroy the same "
217+
"stream which was in place before the scoped stream was "
218+
"created.");
219+
new_stream_.Destroy();
211220
}
212-
cuda::SetStream(prev_stream_);
221+
CUDAStream::GetInstance() = prev_stream_;
213222
}
214223

215224
CUDAState& CUDAState::GetInstance() {

cpp/open3d/core/CUDAUtils.h

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,64 @@ namespace core {
5757

5858
#ifdef BUILD_CUDA_MODULE
5959

60+
/// \class CUDAStream
61+
///
62+
/// An Open3D representation of a CUDA stream.
63+
///
64+
class CUDAStream {
65+
public:
66+
static CUDAStream& GetInstance();
67+
68+
/// Creates a new CUDA stream.
69+
/// The caller is responsible for eventually destroying the stream by
70+
/// calling Destroy().
71+
static CUDAStream CreateNew();
72+
73+
/// Sets the flag indicating if all memory copy operations done within
74+
/// this stream from device -> host should be synchronized. True by
75+
/// default. The default CUDA stream is implicitly synchronized with every
76+
/// other stream. As such, it is invalid to call this function on the
77+
/// default stream. stream. \p sync_memcpy_device_to_host true or false to
78+
/// enable or disable the synchronization. Having non-synchronous memory
79+
/// copy from device to host can result in memory corruption and various
80+
/// other problems if you do not know what you are doing. Example:
81+
/// ```cpp
82+
/// void pokingTheBear() {
83+
/// CUDAScopedStream scoped_stream(CUDAStream::CreateNew(), true);
84+
/// CUDAStream::GetInstance().SetShouldSyncMemcpyFromDeviceToHost(false);
85+
/// Tensor foo = Tensor::Init<float>({0.f}, "CUDA:0");
86+
/// Tensor foo_cpu = foo.To("CPU:0"); // launches an async copy from
87+
/// device to cpu memory owned by foo_cpu. Until the async copy
88+
/// completes, the memory will be uninitialized (random garbage).
89+
/// // Any operations on foo_cpu will be undefined here, as you cannot
90+
/// be sure the async memcpy has finished or not
91+
/// cuda::Synchronize(CUDAStream::GetInstance()); // force a manual sync
92+
/// // It is now safe to perform operations on foo_cpu
93+
/// }
94+
/// ```
95+
void SetShouldSyncMemcpyFromDeviceToHost(bool sync_memcpy_device_to_host);
96+
97+
/// Returns the current value of the memory synchronization flag.
98+
/// The default stream will always return false, because it is implicitly
99+
/// synchronized.
100+
bool ShouldSyncMemcpyFromDeviceToHost() const;
101+
102+
/// Returns true if this refers to the default CUDA stream.
103+
bool IsDefaultStream() const;
104+
105+
cudaStream_t Get() const;
106+
void Set(cudaStream_t stream);
107+
108+
/// Destroys the underlying CUDA stream. It is invalid to call this on the
109+
/// default stream. After this call, this object refers to the default
110+
/// stream.
111+
void Destroy();
112+
113+
private:
114+
cudaStream_t stream_ = static_cast<cudaStream_t>(nullptr);
115+
bool sync_memcpy_from_device_to_host_ = false;
116+
};
117+
60118
/// \class CUDAScopedDevice
61119
///
62120
/// Switch CUDA device id in the current scope. The device id will be reset
@@ -135,29 +193,17 @@ class CUDAScopedDevice {
135193
/// }
136194
/// ```
137195
class CUDAScopedStream {
138-
private:
139-
struct CreateNewStreamTag {
140-
CreateNewStreamTag(const CreateNewStreamTag&) = delete;
141-
CreateNewStreamTag& operator=(const CreateNewStreamTag&) = delete;
142-
CreateNewStreamTag(CreateNewStreamTag&&) = delete;
143-
CreateNewStreamTag& operator=(CreateNewStreamTag&&) = delete;
144-
};
145-
146196
public:
147-
constexpr static CreateNewStreamTag CreateNewStream = {};
148-
149-
explicit CUDAScopedStream(const CreateNewStreamTag&);
150-
151-
explicit CUDAScopedStream(cudaStream_t stream);
197+
explicit CUDAScopedStream(CUDAStream stream, bool destroy_on_exit = false);
152198

153199
~CUDAScopedStream();
154200

155201
CUDAScopedStream(const CUDAScopedStream&) = delete;
156202
CUDAScopedStream& operator=(const CUDAScopedStream&) = delete;
157203

158204
private:
159-
cudaStream_t prev_stream_;
160-
cudaStream_t new_stream_;
205+
CUDAStream prev_stream_;
206+
CUDAStream new_stream_;
161207
bool owns_new_stream_ = false;
162208
};
163209

@@ -265,8 +311,11 @@ bool SupportsMemoryPools(const Device& device);
265311
#ifdef BUILD_CUDA_MODULE
266312

267313
int GetDevice();
268-
cudaStream_t GetStream();
269-
cudaStream_t GetDefaultStream();
314+
CUDAStream GetDefaultStream();
315+
316+
/// Calls cudaStreamSynchronize() for the specified CUDA stream.
317+
/// \param stream The stream to be synchronized.
318+
void Synchronize(const CUDAStream& stream);
270319

271320
#endif
272321

cpp/open3d/core/Indexer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ class Indexer {
638638
class IndexerIterator {
639639
public:
640640
struct Iterator {
641-
Iterator() {};
641+
Iterator(){};
642642
Iterator(const Indexer& indexer);
643643
Iterator(Iterator&& other) = default;
644644

cpp/open3d/core/MemoryManagerCUDA.cpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ void* MemoryManagerCUDA::Malloc(size_t byte_size, const Device& device) {
2222
#if CUDART_VERSION >= 11020
2323
if (cuda::SupportsMemoryPools(device)) {
2424
OPEN3D_CUDA_CHECK(cudaMallocAsync(static_cast<void**>(&ptr),
25-
byte_size, cuda::GetStream()));
25+
byte_size,
26+
CUDAStream::GetInstance().Get()));
2627
} else {
2728
OPEN3D_CUDA_CHECK(cudaMalloc(static_cast<void**>(&ptr), byte_size));
2829
}
@@ -43,7 +44,8 @@ void MemoryManagerCUDA::Free(void* ptr, const Device& device) {
4344
if (ptr && IsCUDAPointer(ptr, device)) {
4445
#if CUDART_VERSION >= 11020
4546
if (cuda::SupportsMemoryPools(device)) {
46-
OPEN3D_CUDA_CHECK(cudaFreeAsync(ptr, cuda::GetStream()));
47+
OPEN3D_CUDA_CHECK(
48+
cudaFreeAsync(ptr, CUDAStream::GetInstance().Get()));
4749
} else {
4850
OPEN3D_CUDA_CHECK(cudaFree(ptr));
4951
}
@@ -62,22 +64,26 @@ void MemoryManagerCUDA::Memcpy(void* dst_ptr,
6264
const void* src_ptr,
6365
const Device& src_device,
6466
size_t num_bytes) {
67+
const CUDAStream& current_stream = CUDAStream::GetInstance();
6568
if (dst_device.IsCUDA() && src_device.IsCPU()) {
6669
if (!IsCUDAPointer(dst_ptr, dst_device)) {
6770
utility::LogError("dst_ptr is not a CUDA pointer.");
6871
}
6972
CUDAScopedDevice scoped_device(dst_device);
7073
OPEN3D_CUDA_CHECK(cudaMemcpyAsync(dst_ptr, src_ptr, num_bytes,
7174
cudaMemcpyHostToDevice,
72-
cuda::GetStream()));
75+
current_stream.Get()));
7376
} else if (dst_device.IsCPU() && src_device.IsCUDA()) {
7477
if (!IsCUDAPointer(src_ptr, src_device)) {
7578
utility::LogError("src_ptr is not a CUDA pointer.");
7679
}
7780
CUDAScopedDevice scoped_device(src_device);
7881
OPEN3D_CUDA_CHECK(cudaMemcpyAsync(dst_ptr, src_ptr, num_bytes,
7982
cudaMemcpyDeviceToHost,
80-
cuda::GetStream()));
83+
current_stream.Get()));
84+
if (current_stream.ShouldSyncMemcpyFromDeviceToHost()) {
85+
OPEN3D_CUDA_CHECK(cudaStreamSynchronize(current_stream.Get()));
86+
}
8187
} else if (dst_device.IsCUDA() && src_device.IsCUDA()) {
8288
if (!IsCUDAPointer(dst_ptr, dst_device)) {
8389
utility::LogError("dst_ptr is not a CUDA pointer.");
@@ -90,25 +96,25 @@ void MemoryManagerCUDA::Memcpy(void* dst_ptr,
9096
CUDAScopedDevice scoped_device(src_device);
9197
OPEN3D_CUDA_CHECK(cudaMemcpyAsync(dst_ptr, src_ptr, num_bytes,
9298
cudaMemcpyDeviceToDevice,
93-
cuda::GetStream()));
99+
current_stream.Get()));
94100
} else if (CUDAState::GetInstance().IsP2PEnabled(src_device.GetID(),
95101
dst_device.GetID())) {
96102
OPEN3D_CUDA_CHECK(cudaMemcpyPeerAsync(
97103
dst_ptr, dst_device.GetID(), src_ptr, src_device.GetID(),
98-
num_bytes, cuda::GetStream()));
104+
num_bytes, current_stream.Get()));
99105
} else {
100106
void* cpu_buf = MemoryManager::Malloc(num_bytes, Device("CPU:0"));
101107
{
102108
CUDAScopedDevice scoped_device(src_device);
103109
OPEN3D_CUDA_CHECK(cudaMemcpyAsync(cpu_buf, src_ptr, num_bytes,
104110
cudaMemcpyDeviceToHost,
105-
cuda::GetStream()));
111+
current_stream.Get()));
106112
}
107113
{
108114
CUDAScopedDevice scoped_device(dst_device);
109115
OPEN3D_CUDA_CHECK(cudaMemcpyAsync(dst_ptr, cpu_buf, num_bytes,
110116
cudaMemcpyHostToDevice,
111-
cuda::GetStream()));
117+
current_stream.Get()));
112118
}
113119
MemoryManager::Free(cpu_buf, Device("CPU:0"));
114120
}

cpp/open3d/core/ParallelFor.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ void ParallelForCUDA_(const Device& device, int64_t n, const func_t& func) {
6161
int64_t grid_size = (n + items_per_block - 1) / items_per_block;
6262

6363
ElementWiseKernel_<OPEN3D_PARFOR_BLOCK, OPEN3D_PARFOR_THREAD>
64-
<<<grid_size, OPEN3D_PARFOR_BLOCK, 0, core::cuda::GetStream()>>>(
65-
n, func);
64+
<<<grid_size, OPEN3D_PARFOR_BLOCK, 0,
65+
CUDAStream::GetInstance().Get()>>>(n, func);
6666
OPEN3D_GET_LAST_CUDA_ERROR("ParallelFor failed.");
6767
}
6868

cpp/open3d/core/hashmap/CUDA/CUDAHashBackendBufferAccessor.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,17 @@ class CUDAHashBackendBufferAccessor {
5858
std::vector<uint8_t *> value_ptrs(n_values_);
5959
for (size_t i = 0; i < n_values_; ++i) {
6060
value_ptrs[i] = value_buffers[i].GetDataPtr<uint8_t>();
61-
cudaMemset(value_ptrs[i], 0, capacity_ * value_dsizes_host[i]);
61+
OPEN3D_CUDA_CHECK(cudaMemsetAsync(
62+
value_ptrs[i], 0, capacity_ * value_dsizes_host[i],
63+
core::CUDAStream::GetInstance().Get()));
6264
}
6365
values_ = static_cast<uint8_t **>(
6466
MemoryManager::Malloc(n_values_ * sizeof(uint8_t *), device));
6567
MemoryManager::MemcpyFromHost(values_, device, value_ptrs.data(),
6668
n_values_ * sizeof(uint8_t *));
6769

6870
heap_top_ = hashmap_buffer.GetHeapTop().cuda.GetDataPtr<int>();
69-
cuda::Synchronize();
71+
cuda::Synchronize(CUDAStream::GetInstance());
7072
OPEN3D_CUDA_CHECK(cudaGetLastError());
7173
}
7274

0 commit comments

Comments
 (0)