Skip to content

Commit 103bc60

Browse files
authored
Allocate single compressed buffer per source in parquet reader (rapidsai#21323)
## Summary This PR changes the parquet reader to allocate one device buffer per source file for compressed page data, instead of potentially many buffers per chunk. This reduces the number of allocations from O(num_chunks) to O(num_sources), which can reduce memory fragmentation and allocator overhead. ## Implementation The `read_column_chunks_async` function now has three steps: 1. Calculate total compressed size per source and the offset within the buffer for each chunk 2. Allocate one buffer per source with the total size needed 3. Issue reads (still coalescing adjacent contiguous chunks) into the appropriate offsets within each source's buffer Individual chunks still reference their data via `ColumnChunkDesc::compressed_data` pointers into these buffers, so downstream code is unchanged. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Bradley Dice (https://github.com/bdice) - Muhammad Haseeb (https://github.com/mhaseeb123) URL: rapidsai#21323
1 parent 8c288de commit 103bc60

2 files changed

Lines changed: 56 additions & 39 deletions

File tree

cpp/src/io/parquet/reader_impl_preprocess.cu

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ std::pair<bool, std::future<void>> reader_impl::read_column_chunks()
473473
std::vector<size_type> chunk_source_map(num_chunks);
474474

475475
// Tracker for eventually deallocating compressed and uncompressed data
476-
raw_page_data = std::vector<rmm::device_buffer>(num_chunks);
476+
raw_page_data = std::vector<rmm::device_buffer>(_num_sources);
477477

478478
// Keep track of column chunk file offsets
479479
std::vector<size_t> column_chunk_offsets(num_chunks);

cpp/src/io/parquet/reader_impl_preprocess_utils.cu

Lines changed: 55 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <thrust/transform.h>
2929
#include <thrust/transform_scan.h>
3030

31+
#include <algorithm>
3132
#include <bitset>
3233
#include <iostream>
3334
#include <numeric>
@@ -174,57 +175,73 @@ void generate_depth_remappings(
174175
std::vector<size_type> const& chunk_source_map,
175176
rmm::cuda_stream_view stream)
176177
{
177-
// Transfer chunk data, coalescing adjacent chunks
178+
// Calculate total size per source and offset for each chunk
179+
std::vector<size_t> source_total_size(sources.size(), 0);
180+
std::vector<size_t> chunk_buffer_offset(end_chunk - begin_chunk);
181+
182+
for (size_t chunk = begin_chunk; chunk < end_chunk; ++chunk) {
183+
auto const source_idx = chunk_source_map[chunk];
184+
chunk_buffer_offset[chunk - begin_chunk] = source_total_size[source_idx];
185+
source_total_size[source_idx] += chunks[chunk].compressed_size;
186+
}
187+
188+
// Allocate one buffer per source
189+
std::transform(
190+
source_total_size.begin(), source_total_size.end(), page_data.begin(), [&](size_t total_size) {
191+
return rmm::device_buffer(
192+
cudf::util::round_up_safe(total_size, cudf::io::detail::BUFFER_PADDING_MULTIPLE), stream);
193+
});
194+
// device_read_async is not guaranteed to follow stream-ordering (see datasource API docs).
195+
stream.synchronize();
196+
197+
// Issue reads, coalescing adjacent chunks
178198
std::vector<std::future<size_t>> read_tasks;
179199
for (size_t chunk = begin_chunk; chunk < end_chunk;) {
180-
size_t const io_offset = column_chunk_offsets[chunk];
181-
size_t io_size = chunks[chunk].compressed_size;
182-
size_t next_chunk = chunk + 1;
200+
auto const source_idx = chunk_source_map[chunk];
201+
auto const io_offset = column_chunk_offsets[chunk];
202+
size_t io_size = chunks[chunk].compressed_size;
203+
size_t const first_chunk = chunk;
204+
size_t next_chunk = chunk + 1;
205+
183206
while (next_chunk < end_chunk) {
184-
size_t const next_offset = column_chunk_offsets[next_chunk];
185-
if (next_offset != io_offset + io_size ||
186-
chunk_source_map[chunk] != chunk_source_map[next_chunk]) {
187-
break;
188-
}
207+
if (chunk_source_map[next_chunk] != source_idx) { break; }
208+
auto const next_offset = column_chunk_offsets[next_chunk];
209+
if (next_offset != io_offset + io_size) { break; }
189210
io_size += chunks[next_chunk].compressed_size;
190211
next_chunk++;
191212
}
213+
192214
if (io_size != 0) {
193-
auto& source = sources[chunk_source_map[chunk]];
194-
// Buffer needs to be padded.
195-
// Required by `gpuDecodePageData`.
196-
page_data[chunk] = rmm::device_buffer(
197-
cudf::util::round_up_safe(io_size, cudf::io::detail::BUFFER_PADDING_MULTIPLE), stream);
215+
auto& source = sources[source_idx];
216+
auto* dest = static_cast<uint8_t*>(page_data[source_idx].data()) +
217+
chunk_buffer_offset[first_chunk - begin_chunk];
198218

199219
if (source->is_device_read_preferred(io_size)) {
200-
auto fut_read_size = source->device_read_async(
201-
io_offset, io_size, static_cast<uint8_t*>(page_data[chunk].data()), stream);
202-
read_tasks.emplace_back(std::move(fut_read_size));
220+
auto fut = source->device_read_async(io_offset, io_size, dest, stream);
221+
read_tasks.emplace_back(std::move(fut));
203222
} else {
204-
read_tasks.emplace_back(
205-
std::async(std::launch::deferred,
206-
[source = std::ref(*source),
207-
io_offset,
208-
io_size,
209-
dest = page_data[chunk].data(),
210-
stream]() {
211-
auto const read_buffer = source.get().host_read(io_offset, io_size);
212-
cudf::detail::cuda_memcpy_async(
213-
cudf::device_span<uint8_t>{static_cast<uint8_t*>(dest), io_size},
214-
cudf::host_span<uint8_t const>{read_buffer->data(), io_size},
215-
stream);
216-
return io_size;
217-
}));
223+
read_tasks.emplace_back(std::async(
224+
std::launch::deferred, [source = std::ref(*source), io_offset, io_size, dest, stream]() {
225+
auto const read_buffer = source.get().host_read(io_offset, io_size);
226+
cudf::detail::cuda_memcpy_async(
227+
cudf::device_span<uint8_t>{static_cast<uint8_t*>(dest), io_size},
228+
cudf::host_span<uint8_t const>{read_buffer->data(), io_size},
229+
stream);
230+
return io_size;
231+
}));
232+
}
233+
234+
// Set compressed_data pointers for all coalesced chunks
235+
auto* ptr = static_cast<uint8_t const*>(dest);
236+
for (size_t c = first_chunk; c < next_chunk; ++c) {
237+
chunks[c].compressed_data = ptr;
238+
ptr += chunks[c].compressed_size;
218239
}
219-
auto d_compdata = static_cast<uint8_t const*>(page_data[chunk].data());
220-
do {
221-
chunks[chunk].compressed_data = d_compdata;
222-
d_compdata += chunks[chunk].compressed_size;
223-
} while (++chunk != next_chunk);
224-
} else {
225-
chunk = next_chunk;
226240
}
241+
242+
chunk = next_chunk;
227243
}
244+
228245
auto sync_fn = [](decltype(read_tasks) read_tasks) {
229246
for (auto& task : read_tasks) {
230247
task.get();

0 commit comments

Comments
 (0)