Skip to content

Commit e4cf319

Browse files
authored
[EM] Initial support for using snappy with sparse data. (#11520)
1 parent c13af05 commit e4cf319

16 files changed

Lines changed: 359 additions & 94 deletions

include/xgboost/data.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,6 @@ struct BatchParam {
291291
* @brief The number of batches to pre-fetch for external memory.
292292
*/
293293
std::int32_t n_prefetch_batches{3};
294-
295294
/**
296295
* @brief Exact or others that don't need histogram.
297296
*/
@@ -542,6 +541,10 @@ struct ExtMemConfig {
542541
float missing;
543542
// The number of CPU threads.
544543
std::int32_t n_threads{0};
544+
// The ratio of the cache that can be compressed. Used for testing.
545+
float hw_decomp_ratio{std::numeric_limits<float>::quiet_NaN()};
546+
// Fallback to using nvcomp. Used for testing.
547+
bool allow_decomp_fallback{false};
545548

546549
ExtMemConfig() = delete;
547550
ExtMemConfig(std::string cache, bool on_host, float h_ratio, std::int64_t min_cache,
@@ -552,6 +555,12 @@ struct ExtMemConfig {
552555
min_cache_page_bytes{min_cache},
553556
missing{missing},
554557
n_threads{n_threads} {}
558+
559+
ExtMemConfig& SetParamsForTest(float _hw_decomp_ratio, bool _allow_decomp_fallback) {
560+
this->hw_decomp_ratio = _hw_decomp_ratio;
561+
this->allow_decomp_fallback = _allow_decomp_fallback;
562+
return *this;
563+
}
555564
};
556565

557566
/**

src/c_api/c_api.cu

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
/**
2-
* Copyright 2019-2024, XGBoost Contributors
2+
* Copyright 2019-2025, XGBoost Contributors
33
*/
44
#include <thrust/transform.h> // for transform
55

66
#include "../common/api_entry.h" // for XGBAPIThreadLocalEntry
77
#include "../common/cuda_context.cuh" // for CUDAContext
8-
#include "../common/threading_utils.h"
98
#include "../data/array_interface.h" // for DispatchDType, ArrayInterface
109
#include "../data/device_adapter.cuh"
1110
#include "../data/proxy_dmatrix.h"
@@ -18,6 +17,9 @@
1817
#if defined(XGBOOST_USE_NCCL)
1918
#include <nccl.h>
2019
#endif
20+
#if defined(XGBOOST_USE_NVCOMP)
21+
#include <nvcomp/version.h>
22+
#endif // defined(XGBOOST_USE_NVCOMP)
2123

2224
namespace xgboost {
2325
void XGBBuildInfoDevice(Json *p_info) {
@@ -56,6 +58,15 @@ void XGBBuildInfoDevice(Json *p_info) {
5658
#else
5759
info["USE_RMM"] = Boolean{false};
5860
#endif
61+
62+
#if defined(XGBOOST_USE_NVCOMP)
63+
info["USE_NVCOMP"] = Boolean{true};
64+
v = {Json{Integer{NVCOMP_VER_MAJOR}}, Json{Integer{NVCOMP_VER_MINOR}},
65+
Json{Integer{NVCOMP_VER_PATCH}}};
66+
info["NVCOMP_VERSION"] = v;
67+
#else
68+
info["USE_NVCOMP"] = Boolean{false};
69+
#endif
5970
}
6071

6172
void XGBoostAPIGuard::SetGPUAttribute() {

src/common/cuda_dr_utils.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
#include "xgboost/string_view.h" // for StringView
1818

19-
#if CUDART_VERSION >= 12080
19+
#if CUDART_VERSION >= 12080 && defined(__linux__)
2020
#define CUDA_HW_DECOM_AVAILABLE 1
2121
#endif
2222

src/common/cuda_pinned_allocator.cu

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ namespace xgboost::common::cuda_impl {
6363
[](cudaMemPool_t* mem_pool) {
6464
if (mem_pool) {
6565
dh::safe_cuda(cudaMemPoolDestroy(*mem_pool));
66+
delete mem_pool;
6667
}
6768
}};
6869
return mem_pool;

src/common/device_compression.cu

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
#include <mutex> // for once_flag, call_once
2626
#include <vector> // for vector
2727

28+
#include "common.h" // for HumanMemUnit
2829
#include "compressed_iterator.h" // for CompressedByteT
2930
#include "cuda_context.cuh" // for CUDAContext
3031
#include "cuda_dr_utils.h" // for GetGlobalCuDriverApi
32+
#include "cuda_rt_utils.h" // for CurrentDevice
3133
#include "device_compression.h"
3234
#include "device_vector.cuh" // for DeviceUVector
3335
#include "nvtx_utils.h" // for xgboost_NVTX_FN_RANGE
@@ -290,7 +292,6 @@ void DecompressSnappy(dh::CUDAStreamView stream, SnappyDecomprMgr const& mgr,
290292
dh::device_vector<void*> d_out_ptrs(n_chunks);
291293
dh::safe_cuda(cudaMemcpyAsync(d_out_ptrs.data().get(), h_out_ptrs.data(),
292294
dh::ToSpan(d_out_ptrs).size_bytes(), cudaMemcpyDefault, stream));
293-
CHECK(curt::SupportsPageableMem() || curt::SupportsAts());
294295
// Run nvcomp
295296
SafeNvComp(nvcompBatchedSnappyDecompressAsync(
296297
mgr_impl->d_in_chunk_ptrs.data().get(), mgr_impl->d_in_chunk_sizes.data().get(),
@@ -383,8 +384,11 @@ void DecompressSnappy(dh::CUDAStreamView stream, SnappyDecomprMgr const& mgr,
383384
auto n_bytes = thrust::reduce(cuctx->CTP(), out_sizes.cbegin(), out_sizes.cend());
384385
auto n_total_bytes = p_out->size();
385386
auto ratio = static_cast<double>(n_total_bytes) / in.size_bytes();
386-
LOG(DEBUG) << "[snappy] Input: " << in.size_bytes() << ", need:" << n_bytes
387-
<< " allocated:" << n_total_bytes << " ratio:" << ratio;
387+
auto ratio_act = static_cast<double>(n_bytes) / in.size_bytes();
388+
LOG(DEBUG) << "[snappy] Input: " << common::HumanMemUnit(in.size_bytes())
389+
<< ", need:" << common::HumanMemUnit(n_bytes)
390+
<< ", allocated:" << common::HumanMemUnit(n_total_bytes) << ", ratio:" << ratio
391+
<< ", actual ratio:" << ratio_act;
388392

389393
/**
390394
* Meta
@@ -470,10 +474,7 @@ SnappyDecomprMgr::~SnappyDecomprMgr() = default;
470474
SnappyDecomprMgrImpl* SnappyDecomprMgr::Impl() const { return nullptr; }
471475

472476
[[nodiscard]] bool SnappyDecomprMgr::Empty() const { return true; }
473-
[[nodiscard]] std::size_t SnappyDecomprMgr::DecompressedBytes() const {
474-
common::AssertNvCompSupport();
475-
return 0;
476-
}
477+
[[nodiscard]] std::size_t SnappyDecomprMgr::DecompressedBytes() const { return 0; }
477478

478479
// Round-trip compression
479480
void DecompressSnappy(dh::CUDAStreamView, SnappyDecomprMgr const&,
@@ -482,15 +483,22 @@ void DecompressSnappy(dh::CUDAStreamView, SnappyDecomprMgr const&,
482483
}
483484

484485
[[nodiscard]] CuMemParams CompressSnappy(Context const*,
485-
common::Span<common::CompressedByteT const>,
486+
common::Span<common::CompressedByteT const> in,
486487
dh::DeviceUVector<std::uint8_t>*, std::size_t) {
488+
if (in.empty()) {
489+
return {};
490+
}
487491
common::AssertNvCompSupport();
488492
return {};
489493
}
490494

491495
[[nodiscard]] common::RefResourceView<std::uint8_t> CoalesceCompressedBuffersToHost(
492-
dh::CUDAStreamView, std::shared_ptr<HostPinnedMemPool>, CuMemParams const&,
496+
dh::CUDAStreamView, std::shared_ptr<HostPinnedMemPool>, CuMemParams const& in_params,
493497
dh::DeviceUVector<std::uint8_t> const&, CuMemParams*) {
498+
std::size_t n_total_bytes = in_params.TotalSrcBytes();
499+
if (n_total_bytes == 0) {
500+
return {};
501+
}
494502
common::AssertNvCompSupport();
495503
return {};
496504
}

src/common/device_compression.cuh

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,20 @@ struct SnappyDecomprMgrImpl {
104104
[[nodiscard]] bool Empty() const;
105105
};
106106

107-
inline auto MakeSnappyDecomprMgr(dh::CUDAStreamView s, std::shared_ptr<HostPinnedMemPool> pool,
108-
CuMemParams params,
109-
common::Span<std::uint8_t const> in_compressed_data) {
107+
#if defined(XGBOOST_USE_NVCOMP)
108+
[[nodiscard]] inline auto MakeSnappyDecomprMgr(
109+
dh::CUDAStreamView s, std::shared_ptr<HostPinnedMemPool> pool, CuMemParams params,
110+
common::Span<std::uint8_t const> in_compressed_data) {
110111
SnappyDecomprMgr mgr;
111112
*mgr.Impl() = SnappyDecomprMgrImpl{s, std::move(pool), std::move(params), in_compressed_data};
112113
return mgr;
113114
}
115+
#else
116+
[[nodiscard]] inline auto MakeSnappyDecomprMgr(dh::CUDAStreamView,
117+
std::shared_ptr<HostPinnedMemPool>, CuMemParams,
118+
common::Span<std::uint8_t const>) {
119+
SnappyDecomprMgr mgr;
120+
return mgr;
121+
}
122+
#endif // defined(XGBOOST_USE_NVCOMP)
114123
} // namespace xgboost::dc

src/common/io.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@
4242
#include <limits> // for numeric_limits
4343
#endif
4444

45+
#if defined(__linux__)
46+
#include <sys/sysinfo.h>
47+
#endif
48+
4549
namespace xgboost::common {
4650
size_t PeekableInStream::Read(void* dptr, size_t size) {
4751
size_t nbuffer = buffer_.length() - buffer_ptr_;
@@ -350,4 +354,19 @@ AlignedMemWriteStream::~AlignedMemWriteStream() = default;
350354
}
351355
return result;
352356
}
357+
358+
[[nodiscard]] std::size_t TotalMemory() {
359+
#if defined(__linux__)
360+
struct sysinfo info;
361+
CHECK_EQ(sysinfo(&info), 0) << SystemErrorMsg();
362+
return info.totalram * info.mem_unit;
363+
#elif defined(xgboost_IS_WIN)
364+
MEMORYSTATUSEX status;
365+
status.dwLength = sizeof(status);
366+
CHECK(GlobalMemoryStatusEx(&status)) << SystemErrorMsg();
367+
return static_cast<std::size_t>(status.ullTotalPhys);
368+
#else
369+
LOG(FATAL) << "Not implemented";
370+
#endif // defined(__linux__)
371+
}
353372
} // namespace xgboost::common

src/common/io.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,5 +633,7 @@ class AlignedMemWriteStream : public AlignedFileWriteStream {
633633

634634
// Run a system command, get its stdout.
635635
[[nodiscard]] std::string CmdOutput(StringView cmd);
636+
637+
[[nodiscard]] std::size_t TotalMemory();
636638
} // namespace xgboost::common
637639
#endif // XGBOOST_COMMON_IO_H_

src/data/ellpack_page_raw_format.cu

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,10 @@ template <typename T>
102102
auto* impl = page->Impl();
103103
CHECK(this->cuts_->cut_values_.DeviceCanRead());
104104

105+
auto ctx = Context{}.MakeCUDA(curt::CurrentDevice());
106+
105107
auto dispatch = [&] {
106-
fi->Read(page, this->param_.prefetch_copy || !this->has_hmm_ats_);
108+
fi->Read(&ctx, page, this->param_.prefetch_copy || !this->has_hmm_ats_);
107109
impl->SetCuts(this->cuts_);
108110
};
109111

0 commit comments

Comments
 (0)