Skip to content

Commit dd02e83

Browse files
authored
Enable KvikIO datasource in hybrid scan examples (rapidsai#21318)
Supersedes rapidsai#20805 This PR extends the hybrid scan examples to use datasource enabling them to use KvikIO features such as high-performance parallel I/O, support for remote files, and GDS access. Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - Tianyu Liu (https://github.com/kingcrimsontianyu) - Vukasin Milovanovic (https://github.com/vuule) URL: rapidsai#21318
1 parent 4f137db commit dd02e83

8 files changed

Lines changed: 398 additions & 239 deletions

File tree

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
#include "timer.hpp"
7+
8+
#include <iostream>
9+
10+
#pragma once
11+
12+
template <std::invocable F>
13+
void benchmark(F&& f, std::size_t iterations)
14+
{
15+
double total_time_millis{0.0};
16+
for (std::size_t i = 0; i < iterations; ++i) {
17+
timer timer;
18+
timer.reset();
19+
20+
f();
21+
22+
auto elapsed_time_ms =
23+
static_cast<double>(std::chrono::duration_cast<timer::micros>(timer.elapsed()).count()) /
24+
1000.0;
25+
std::cout << "Iteration: " << i << ", time: " << elapsed_time_ms << " ms\n";
26+
if (i != 0) { total_time_millis += elapsed_time_ms; }
27+
}
28+
29+
std::cout << "Average time (first iteration excluded): " << total_time_millis / (iterations - 1)
30+
<< " ms\n\n";
31+
}

cpp/examples/hybrid_scan_io/common_utils.cpp

Lines changed: 105 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <cudf/ast/expressions.hpp>
99
#include <cudf/concatenate.hpp>
1010
#include <cudf/detail/nvtx/ranges.hpp>
11+
#include <cudf/detail/utilities/integer_utils.hpp>
1112
#include <cudf/io/parquet.hpp>
1213
#include <cudf/io/text/byte_range_info.hpp>
1314
#include <cudf/join/filtered_join.hpp>
@@ -19,6 +20,7 @@
1920
#include <rmm/mr/owning_wrapper.hpp>
2021
#include <rmm/mr/pool_memory_resource.hpp>
2122

23+
#include <numeric>
2224
#include <string>
2325
#include <vector>
2426

@@ -27,6 +29,14 @@
2729
* @brief Definitions for utilities for `hybrid_scan_io` example
2830
*/
2931

32+
bool get_boolean(std::string input)
33+
{
34+
std::transform(input.begin(), input.end(), input.begin(), ::toupper);
35+
36+
// Check if the input string matches to any of the following
37+
return input == "ON" or input == "TRUE" or input == "YES" or input == "Y" or input == "T";
38+
}
39+
3040
std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_pool_used)
3141
{
3242
if (is_pool_used) {
@@ -91,68 +101,130 @@ void check_tables_equal(cudf::table_view const& lhs_table,
91101
}
92102
}
93103

94-
cudf::host_span<uint8_t const> fetch_footer_bytes(cudf::host_span<uint8_t const> buffer)
104+
std::unique_ptr<cudf::io::datasource::buffer> fetch_footer_bytes(cudf::io::datasource& datasource)
95105
{
96106
CUDF_FUNC_RANGE();
97107

98108
using namespace cudf::io::parquet;
99109

100110
constexpr auto header_len = sizeof(file_header_s);
101111
constexpr auto ender_len = sizeof(file_ender_s);
102-
size_t const len = buffer.size();
112+
size_t const len = datasource.size();
103113

104-
auto const header_buffer = cudf::host_span<uint8_t const>(buffer.data(), header_len);
105-
auto const header = reinterpret_cast<file_header_s const*>(header_buffer.data());
106-
auto const ender_buffer =
107-
cudf::host_span<uint8_t const>(buffer.data() + len - ender_len, ender_len);
108-
auto const ender = reinterpret_cast<file_ender_s const*>(ender_buffer.data());
114+
auto header_buffer = datasource.host_read(0, header_len);
115+
auto const header = reinterpret_cast<file_header_s const*>(header_buffer->data());
116+
auto ender_buffer = datasource.host_read(len - ender_len, ender_len);
117+
auto const ender = reinterpret_cast<file_ender_s const*>(ender_buffer->data());
109118
CUDF_EXPECTS(len > header_len + ender_len, "Incorrect data source");
110119
constexpr uint32_t parquet_magic = (('P' << 0) | ('A' << 8) | ('R' << 16) | ('1' << 24));
111120
CUDF_EXPECTS(header->magic == parquet_magic && ender->magic == parquet_magic,
112121
"Corrupted header or footer");
113122
CUDF_EXPECTS(ender->footer_len != 0 && ender->footer_len <= (len - header_len - ender_len),
114123
"Incorrect footer length");
115124

116-
return cudf::host_span<uint8_t const>(buffer.data() + len - ender->footer_len - ender_len,
117-
ender->footer_len);
125+
return datasource.host_read(len - ender->footer_len - ender_len, ender->footer_len);
118126
}
119127

120-
cudf::host_span<uint8_t const> fetch_page_index_bytes(
121-
cudf::host_span<uint8_t const> buffer, cudf::io::text::byte_range_info const page_index_bytes)
128+
std::unique_ptr<cudf::io::datasource::buffer> fetch_page_index_bytes(
129+
cudf::io::datasource& datasource, cudf::io::text::byte_range_info const page_index_bytes)
122130
{
123-
return cudf::host_span<uint8_t const>(
124-
reinterpret_cast<uint8_t const*>(buffer.data()) + page_index_bytes.offset(),
125-
page_index_bytes.size());
131+
return datasource.host_read(page_index_bytes.offset(), page_index_bytes.size());
126132
}
127133

128-
std::vector<rmm::device_buffer> fetch_byte_ranges(
129-
cudf::host_span<uint8_t const> host_buffer,
130-
cudf::host_span<cudf::io::text::byte_range_info const> byte_ranges,
131-
rmm::cuda_stream_view stream,
132-
rmm::device_async_resource_ref mr)
134+
cudf::host_span<uint8_t const> make_host_span(
135+
std::reference_wrapper<cudf::io::datasource::buffer const> buffer)
133136
{
134-
CUDF_FUNC_RANGE();
137+
return cudf::host_span<uint8_t const>{static_cast<uint8_t const*>(buffer.get().data()),
138+
buffer.get().size()};
139+
}
135140

141+
std::tuple<std::vector<rmm::device_buffer>,
142+
std::vector<cudf::device_span<uint8_t const>>,
143+
std::future<void>>
144+
fetch_byte_ranges(cudf::io::datasource& datasource,
145+
cudf::host_span<cudf::io::text::byte_range_info const> byte_ranges,
146+
rmm::cuda_stream_view stream,
147+
rmm::device_async_resource_ref mr)
148+
{
136149
static std::mutex mutex;
137150

138-
std::vector<rmm::device_buffer> buffers(byte_ranges.size());
151+
// Allocate device spans for each column chunk
152+
std::vector<cudf::device_span<uint8_t const>> column_chunk_data{};
153+
column_chunk_data.reserve(byte_ranges.size());
154+
155+
auto total_size = std::accumulate(
156+
byte_ranges.begin(), byte_ranges.end(), std::size_t{0}, [&](auto acc, auto const& range) {
157+
return acc + range.size();
158+
});
159+
160+
// Allocate single device buffer for all column chunks
161+
std::vector<rmm::device_buffer> column_chunk_buffers{};
162+
column_chunk_buffers.emplace_back(total_size, stream, mr);
163+
auto buffer_data = static_cast<uint8_t*>(column_chunk_buffers.back().data());
164+
std::ignore = std::accumulate(
165+
byte_ranges.begin(), byte_ranges.end(), std::size_t{0}, [&](auto acc, auto const& range) {
166+
column_chunk_data.emplace_back(buffer_data + acc, static_cast<size_t>(range.size()));
167+
return acc + range.size();
168+
});
169+
170+
std::vector<std::future<size_t>> device_read_tasks{};
171+
std::vector<std::future<size_t>> host_read_tasks{};
172+
device_read_tasks.reserve(byte_ranges.size());
173+
host_read_tasks.reserve(byte_ranges.size());
139174
{
140175
std::lock_guard<std::mutex> lock(mutex);
141176

142-
std::transform(
143-
byte_ranges.begin(), byte_ranges.end(), buffers.begin(), [&](auto const& byte_range) {
144-
auto const chunk_offset = host_buffer.data() + byte_range.offset();
145-
auto const chunk_size = static_cast<size_t>(byte_range.size());
146-
auto buffer = rmm::device_buffer(chunk_size, stream, mr);
147-
cudf::detail::cuda_memcpy_async(
148-
cudf::device_span<uint8_t>{static_cast<uint8_t*>(buffer.data()), chunk_size},
149-
cudf::host_span<uint8_t const>{chunk_offset, chunk_size},
150-
stream);
151-
return buffer;
152-
});
177+
for (size_t chunk = 0; chunk < byte_ranges.size();) {
178+
auto const io_offset = static_cast<size_t>(byte_ranges[chunk].offset());
179+
auto io_size = static_cast<size_t>(byte_ranges[chunk].size());
180+
size_t next_chunk = chunk + 1;
181+
while (next_chunk < byte_ranges.size()) {
182+
size_t const next_offset = byte_ranges[next_chunk].offset();
183+
if (next_offset != io_offset + io_size) { break; }
184+
io_size += byte_ranges[next_chunk].size();
185+
next_chunk++;
186+
}
187+
188+
if (io_size != 0) {
189+
auto dest = const_cast<uint8_t*>(column_chunk_data[chunk].data());
190+
// Directly read the column chunk data to the device
191+
// buffer if supported
192+
if (datasource.supports_device_read() and datasource.is_device_read_preferred(io_size)) {
193+
device_read_tasks.emplace_back(
194+
datasource.device_read_async(io_offset, io_size, dest, stream));
195+
} else {
196+
// Read the column chunk data to the host buffer and
197+
// copy it to the device buffer
198+
host_read_tasks.emplace_back(
199+
std::async(std::launch::deferred, [&datasource, io_offset, io_size, dest, stream]() {
200+
auto host_buffer = datasource.host_read(io_offset, io_size);
201+
cudf::detail::cuda_memcpy_async(
202+
cudf::device_span<uint8_t>{dest, io_size},
203+
cudf::host_span<uint8_t const>{host_buffer->data(), io_size},
204+
stream);
205+
return io_size;
206+
}));
207+
}
208+
}
209+
chunk = next_chunk;
210+
}
153211
}
154212

155-
return buffers;
213+
auto sync_function = [](decltype(host_read_tasks) host_read_tasks,
214+
decltype(device_read_tasks) device_read_tasks) {
215+
for (auto& task : host_read_tasks) {
216+
task.get();
217+
}
218+
for (auto& task : device_read_tasks) {
219+
task.get();
220+
}
221+
};
222+
return {std::move(column_chunk_buffers),
223+
std::move(column_chunk_data),
224+
std::async(std::launch::deferred,
225+
sync_function,
226+
std::move(host_read_tasks),
227+
std::move(device_read_tasks))};
156228
}
157229

158230
std::unique_ptr<cudf::table> concatenate_tables(std::vector<std::unique_ptr<cudf::table>> tables,

cpp/examples/hybrid_scan_io/common_utils.hpp

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#pragma once
77

88
#include <cudf/ast/expressions.hpp>
9+
#include <cudf/io/datasource.hpp>
910
#include <cudf/io/text/byte_range_info.hpp>
1011
#include <cudf/io/types.hpp>
1112
#include <cudf/table/table_view.hpp>
@@ -19,6 +20,14 @@
1920
* @brief Utilities for `hybrid_scan_io` example
2021
*/
2122

23+
/**
24+
* @brief Get boolean from they keyword
25+
*
26+
* @param input keyword affirmation string such as: Y, T, YES, TRUE, ON
27+
* @return true or false
28+
*/
29+
[[nodiscard]] bool get_boolean(std::string input);
30+
2231
/**
2332
* @brief Create memory resource for libcudf functions
2433
*
@@ -60,56 +69,50 @@ void check_tables_equal(cudf::table_view const& lhs_table,
6069
rmm::cuda_stream_view stream = cudf::get_default_stream());
6170

6271
/**
63-
* @brief Fetches a host span of Parquet footer bytes from the input buffer span
72+
* @brief Fetches a host buffer of Parquet footer bytes from the input data source
6473
*
65-
* @param buffer Input buffer span
66-
* @return A host span of the footer bytes
74+
* @param datasource Input data source
75+
* @return Host buffer containing footer bytes
6776
*/
68-
cudf::host_span<uint8_t const> fetch_footer_bytes(cudf::host_span<uint8_t const> buffer);
77+
std::unique_ptr<cudf::io::datasource::buffer> fetch_footer_bytes(cudf::io::datasource& datasource);
78+
6979
/**
70-
* @brief Fetches a host span of Parquet PageIndexbytes from the input buffer span
80+
* @brief Fetches a host buffer of Parquet page index from the input data source
7181
*
72-
* @param buffer Input buffer span
73-
* @param page_index_bytes Byte range of `PageIndex` to fetch
74-
* @return A host span of the PageIndex bytes
82+
* @param datasource Input datasource
83+
* @param page_index_bytes Byte range of page index
84+
* @return Host buffer containing page index bytes
7585
*/
76-
cudf::host_span<uint8_t const> fetch_page_index_bytes(
77-
cudf::host_span<uint8_t const> buffer, cudf::io::text::byte_range_info const page_index_bytes);
86+
std::unique_ptr<cudf::io::datasource::buffer> fetch_page_index_bytes(
87+
cudf::io::datasource& datasource, cudf::io::text::byte_range_info const page_index_bytes);
7888

7989
/**
80-
* @brief Converts a span of device buffers into a vector of corresponding device spans
90+
* @brief Converts a host buffer into a host span
8191
*
82-
* @tparam T Type of output device spans
83-
* @param buffers Host span of device buffers
84-
* @return Device spans corresponding to the input device buffers
92+
* @param buffer Host buffer
93+
* @return Host span of input host buffer
8594
*/
86-
template <typename T>
87-
std::vector<cudf::device_span<T const>> make_device_spans(
88-
cudf::host_span<rmm::device_buffer const> buffers)
89-
requires(sizeof(T) == 1)
90-
{
91-
std::vector<cudf::device_span<T const>> device_spans(buffers.size());
92-
std::transform(buffers.begin(), buffers.end(), device_spans.begin(), [](auto const& buffer) {
93-
return cudf::device_span<T const>{static_cast<T const*>(buffer.data()), buffer.size()};
94-
});
95-
return device_spans;
96-
}
95+
cudf::host_span<uint8_t const> make_host_span(
96+
std::reference_wrapper<cudf::io::datasource::buffer const> buffer);
9797

9898
/**
9999
* @brief Fetches a list of byte ranges from a host buffer into device buffers
100100
*
101-
* @param host_buffer Host buffer span
101+
* @param datasource Input datasource
102102
* @param byte_ranges Byte ranges to fetch
103103
* @param stream CUDA stream
104104
* @param mr Device memory resource
105105
*
106-
* @return Device buffers
106+
* @return A tuple containing the device buffers, the device spans of the fetched data, and a future
107+
* to wait on the read tasks
107108
*/
108-
std::vector<rmm::device_buffer> fetch_byte_ranges(
109-
cudf::host_span<uint8_t const> host_buffer,
110-
cudf::host_span<cudf::io::text::byte_range_info const> byte_ranges,
111-
rmm::cuda_stream_view stream,
112-
rmm::device_async_resource_ref mr);
109+
std::tuple<std::vector<rmm::device_buffer>,
110+
std::vector<cudf::device_span<uint8_t const>>,
111+
std::future<void>>
112+
fetch_byte_ranges(cudf::io::datasource& datasource,
113+
cudf::host_span<cudf::io::text::byte_range_info const> byte_ranges,
114+
rmm::cuda_stream_view stream,
115+
rmm::device_async_resource_ref mr);
113116

114117
/**
115118
* @brief Concatenate a vector of tables and return the resultant table

0 commit comments

Comments
 (0)