Skip to content

Commit 4f892e2

Browse files
authored
This change is the cause of the ongoing Parquet memcheck failures in nightlies. Authors: - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Yunsong Wang (https://github.com/PointKernel) URL: rapidsai#20955
1 parent 68683d5 commit 4f892e2

7 files changed

Lines changed: 150 additions & 114 deletions

File tree

cpp/src/io/parquet/page_string_decode.cu

Lines changed: 118 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,47 @@ __device__ size_t totalDictEntriesSize(uint8_t const* data,
423423
return sum_l;
424424
}
425425

426+
/**
427+
* @brief Compute string size information for plain encoded strings.
428+
*
429+
* @param data Pointer to the start of the page data stream
430+
* @param data_size Length of data
431+
* @param start_value Do not count values that occur before this index
432+
* @param end_value Do not count values that occur after this index
433+
*/
434+
__device__ size_t totalPlainEntriesSize(uint8_t const* data,
435+
int data_size,
436+
int start_value,
437+
int end_value)
438+
{
439+
int const t = threadIdx.x;
440+
int pos = 0;
441+
size_t total_len = 0;
442+
443+
// This step is purely serial
444+
if (!t) {
445+
uint8_t const* cur = data;
446+
int k = 0;
447+
448+
while (pos < end_value && k < data_size) {
449+
int len;
450+
if (k + 4 <= data_size) {
451+
len = (cur[k]) | (cur[k + 1] << 8) | (cur[k + 2] << 16) | (cur[k + 3] << 24);
452+
k += 4;
453+
if (k + len > data_size) { len = 0; }
454+
} else {
455+
len = 0;
456+
}
457+
458+
k += len;
459+
if (pos >= start_value) { total_len += len; }
460+
pos++;
461+
}
462+
}
463+
464+
return total_len;
465+
}
466+
426467
/**
427468
* @brief Compute string size information for DELTA_BYTE_ARRAY encoded strings.
428469
*
@@ -842,15 +883,13 @@ CUDF_KERNEL void __launch_bounds__(delta_length_block_size)
842883
* @param pages All pages to be decoded
843884
* @param chunks All chunks to be decoded
844885
* @param page_mask Page mask indicating if this column needs to be decoded
845-
* @param page_string_offset_indices Per-page indices into the string offset buffer
846886
* @param min_rows crop all rows below min_row
847887
* @param num_rows Maximum number of rows to read
848888
*/
849889
CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
850890
compute_page_string_sizes_kernel(PageInfo* pages,
851891
device_span<ColumnChunkDesc const> chunks,
852892
device_span<bool const> page_mask,
853-
device_span<size_t const> page_string_offset_indices,
854893
size_t min_row,
855894
size_t num_rows)
856895
{
@@ -929,23 +968,9 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
929968
data, dict_base, s->dict_bits, dict_size, (end - data), start_value, end_value);
930969
break;
931970
case Encoding::PLAIN:
932-
// Check if we have precomputed offsets available
933-
if (col.column_string_offset_base == nullptr || page_string_offset_indices.empty()) {
934-
CUDF_UNREACHABLE("string offsets should have been preprocessed already!");
935-
}
936-
937-
if (start_value >= end_value) {
938-
str_bytes = 0;
939-
} else {
940-
// The span from str_offsets[start] to str_offsets[end] includes the length prefixes
941-
// of strings [start+1, end), but we only want the string data bytes.
942-
// So we need to subtract 4 bytes per string for those embedded length prefixes.
943-
uint32_t const* str_offsets =
944-
col.column_string_offset_base + page_string_offset_indices[page_idx];
945-
int const num_values = end_value - start_value;
946-
size_t const span = str_offsets[end_value] - str_offsets[start_value];
947-
str_bytes = span - sizeof(int32_t) * num_values;
948-
}
971+
dict_size = static_cast<int32_t>(end - data);
972+
str_bytes = is_bounds_pg ? totalPlainEntriesSize(data, dict_size, start_value, end_value)
973+
: dict_size - sizeof(int) * pp->num_valids;
949974
break;
950975
}
951976
}
@@ -980,7 +1005,6 @@ struct page_tform_functor {
9801005
void compute_page_string_sizes_pass1(cudf::detail::hostdevice_span<PageInfo> pages,
9811006
cudf::detail::hostdevice_span<ColumnChunkDesc const> chunks,
9821007
cudf::device_span<bool const> page_mask,
983-
cudf::device_span<size_t const> page_string_offset_indices,
9841008
size_t min_row,
9851009
size_t num_rows,
9861010
uint32_t kernel_mask,
@@ -1020,7 +1044,7 @@ void compute_page_string_sizes_pass1(cudf::detail::hostdevice_span<PageInfo> pag
10201044
}
10211045
if (BitAnd(kernel_mask, STRINGS_MASK_NON_DELTA) != 0) {
10221046
compute_page_string_sizes_kernel<<<dim_grid, dim_block, 0, streams[s_idx++].value()>>>(
1023-
pages.device_ptr(), chunks, page_mask, page_string_offset_indices, min_row, num_rows);
1047+
pages.device_ptr(), chunks, page_mask, min_row, num_rows);
10241048
}
10251049

10261050
// synchronize the streams
@@ -1134,11 +1158,13 @@ inline __device__ bool prefetch_string_data(int t,
11341158
* on prefetching data and filling the remaining entries.
11351159
*
11361160
* @param s Page state containing data_start, dict_size and other info
1161+
* @param num_values_to_skip Number of values to skip before processing
11371162
* @param num_values_to_process Number of values to process
11381163
* @param str_offsets Output buffer for string offsets
11391164
*/
11401165
template <int32_t block_size, size_t prefetch_size>
11411166
inline __device__ void read_string_offsets_buffered(page_state_s* s,
1167+
size_t num_values_to_skip,
11421168
size_t num_values_to_process,
11431169
uint32_t* str_offsets)
11441170
{
@@ -1152,48 +1178,54 @@ inline __device__ void read_string_offsets_buffered(page_state_s* s,
11521178
int32_t next_length_offset = 0;
11531179
__shared__ __align__(128) uint8_t prefetch_buffer[prefetch_size];
11541180

1181+
auto process_data = [&]<bool save_offset>(size_t loop_end) {
1182+
// Parquet data is: 4-byte length, string, 4-byte length, string, ...
1183+
for (size_t pos = 0; pos < loop_end; pos++) {
1184+
int32_t const string_offset = next_length_offset + sizeof(int32_t);
1185+
if (string_offset > dict_size) { return pos; } // Can't read any more valid data
1186+
1187+
// Check if we need to prefetch more data
1188+
if ((next_length_offset + sizeof(int32_t)) > buffer_end) {
1189+
block.sync(); // Make sure all of the threads have finished reading the previous data
1190+
if (!prefetch_string_data<prefetch_size, block_size>(
1191+
t, next_length_offset, dict_size, cur, prefetch_buffer, buffer_base, buffer_end)) {
1192+
return pos; // End of the data
1193+
}
1194+
block.sync(); // Sync all of the prefetched data for all of the threads
1195+
}
1196+
1197+
// Read the length of the string from the prefetched buffer
1198+
int32_t const prefetch_read_index = next_length_offset - buffer_base;
1199+
int32_t len;
1200+
cuda::std::memcpy(reinterpret_cast<void*>(&len),
1201+
reinterpret_cast<void const*>(&prefetch_buffer[prefetch_read_index]),
1202+
sizeof(int32_t));
1203+
1204+
if (string_offset + len > dict_size) { return pos; } // Data is corrupted or incomplete
1205+
next_length_offset = string_offset + len;
1206+
1207+
if constexpr (save_offset) {
1208+
if (t == 0) { str_offsets[pos] = string_offset; }
1209+
}
1210+
}
1211+
return loop_end;
1212+
};
1213+
11551214
// Initial prefetch
11561215
if (!prefetch_string_data<prefetch_size, block_size>(
11571216
t, next_length_offset, dict_size, cur, prefetch_buffer, buffer_base, buffer_end)) {
11581217
return; // No data to process
11591218
}
11601219
block.sync(); // Sync all of the prefetched data for all of the threads
11611220

1162-
// Parquet data is: 4-byte length, string, 4-byte length, string, ...
1163-
size_t num_values_written = num_values_to_process; // Will update below if run out of data
1164-
for (size_t pos = 0; pos < num_values_to_process; pos++) {
1165-
int32_t const string_offset = next_length_offset + sizeof(int32_t);
1166-
if (string_offset > dict_size) {
1167-
num_values_written = pos; // Can't read any more valid data
1168-
break;
1169-
}
1170-
1171-
// Check if we need to prefetch more data
1172-
if ((next_length_offset + sizeof(int32_t)) > buffer_end) {
1173-
block.sync(); // Make sure all of the threads have finished reading the previous data
1174-
if (!prefetch_string_data<prefetch_size, block_size>(
1175-
t, next_length_offset, dict_size, cur, prefetch_buffer, buffer_base, buffer_end)) {
1176-
num_values_written = pos; // End of the data
1177-
break;
1178-
}
1179-
block.sync(); // Sync all of the prefetched data for all of the threads
1180-
}
1181-
1182-
// Read the length of the string from the prefetched buffer
1183-
int32_t const prefetch_read_index = next_length_offset - buffer_base;
1184-
int32_t len;
1185-
cuda::std::memcpy(reinterpret_cast<void*>(&len),
1186-
reinterpret_cast<void const*>(&prefetch_buffer[prefetch_read_index]),
1187-
sizeof(int32_t));
1221+
// Skip to the first value we need to process
1222+
int32_t const skip_pos = process_data.template operator()<false>(num_values_to_skip);
11881223

1189-
if (string_offset + len > dict_size) {
1190-
num_values_written = pos; // Data is corrupted or incomplete
1191-
break;
1192-
}
1193-
next_length_offset = string_offset + len;
1194-
1195-
if (t == 0) { str_offsets[pos] = string_offset; }
1196-
}
1224+
// Process the values we need - only if we successfully skipped past all the values we needed to
1225+
size_t const num_values_written =
1226+
(skip_pos != num_values_to_skip)
1227+
? 0
1228+
: process_data.template operator()<true>(num_values_to_process);
11971229

11981230
// +4 for "stored" length of "next" string that we'll subtract off during decode
11991231
// Easier/faster than branching in the decode loop
@@ -1215,11 +1247,13 @@ inline __device__ void read_string_offsets_buffered(page_state_s* s,
12151247
* then all threads cooperatively fill the remaining entries.
12161248
*
12171249
* @param s Page state containing data_start, dict_size and other info
1250+
* @param num_values_to_skip Number of values to skip before processing
12181251
* @param num_values_to_process Number of values to process
12191252
* @param str_offsets Output buffer for string offsets
12201253
*/
12211254
template <int32_t block_size>
12221255
inline __device__ void read_string_offsets_sequential(page_state_s* s,
1256+
size_t num_values_to_skip,
12231257
size_t num_values_to_process,
12241258
uint32_t* str_offsets)
12251259
{
@@ -1234,28 +1268,31 @@ inline __device__ void read_string_offsets_sequential(page_state_s* s,
12341268
uint32_t length_offset = 0;
12351269
auto const dict_size = s->dict_size;
12361270

1237-
// Process the data
1238-
// Parquet data is: 4-byte length, string, 4-byte length, string, ...
1239-
num_values_written = num_values_to_process; // Will update below if run out of data
1240-
for (size_t pos = 0; pos < num_values_to_process; pos++) {
1241-
uint32_t const string_offset = length_offset + sizeof(int32_t);
1242-
if (string_offset > dict_size) {
1243-
num_values_written = pos; // Can't read any more valid data
1244-
break;
1271+
auto process_loop = [&]<bool save_offsets>(size_t loop_end) {
1272+
// Parquet data is: 4-byte length, string, 4-byte length, string, ...
1273+
for (size_t pos = 0; pos < loop_end; pos++) {
1274+
uint32_t const string_offset = length_offset + sizeof(int32_t);
1275+
if (string_offset > dict_size) { return pos; } // Can't read any more valid data
1276+
1277+
// Read the length of the string from the data stream
1278+
int32_t len;
1279+
cuda::std::memcpy(reinterpret_cast<void*>(&len),
1280+
reinterpret_cast<const void*>(&cur[length_offset]),
1281+
sizeof(int32_t));
1282+
1283+
if (string_offset + len > dict_size) { return pos; } // Data is corrupted or incomplete
1284+
if constexpr (save_offsets) { str_offsets[pos] = string_offset; }
1285+
length_offset = string_offset + len;
12451286
}
1287+
return loop_end;
1288+
};
12461289

1247-
// Read the length of the string from the data stream
1248-
int32_t len;
1249-
cuda::std::memcpy(reinterpret_cast<void*>(&len),
1250-
reinterpret_cast<const void*>(&cur[length_offset]),
1251-
sizeof(int32_t));
1252-
1253-
if (string_offset + len > dict_size) {
1254-
num_values_written = pos; // Data is corrupted or incomplete
1255-
break;
1256-
}
1257-
str_offsets[pos] = string_offset;
1258-
length_offset = string_offset + len;
1290+
// Skip to the first value we need to process, then process the values we need
1291+
size_t const skip_pos = process_loop.template operator()<false>(num_values_to_skip);
1292+
if (skip_pos == num_values_to_skip) {
1293+
num_values_written = process_loop.template operator()<true>(num_values_to_process);
1294+
} else {
1295+
num_values_written = 0; // Skipped past all the values in the page
12591296
}
12601297

12611298
// +4 for "stored" length of "next" string that we'll subtract off during decode
@@ -1328,10 +1365,10 @@ CUDF_KERNEL void preprocess_string_offsets_kernel(
13281365
}
13291366

13301367
// Determine if this is a list column and how many values to process
1331-
// We don't know how many values we'll need to read, because we don't know
1368+
// For non-lists, we don't know how many values we'll need to read, because we don't know
13321369
// how many nulls we'll skip. So we have to read through the skipped rows.
1333-
// This runs before we know skipped_leaf_values so can't skip for lists either.
1334-
bool const is_list = (chunk.max_level[level_type::REPETITION] != 0);
1370+
bool const is_list = (chunk.max_level[level_type::REPETITION] != 0);
1371+
size_t const num_values_to_skip = is_list ? pp->skipped_leaf_values : 0;
13351372
size_t const num_values_to_process =
13361373
is_list ? pp->nesting[chunk.max_nesting_depth - 1].batch_size : s->num_rows + s->first_row;
13371374

@@ -1349,11 +1386,12 @@ CUDF_KERNEL void preprocess_string_offsets_kernel(
13491386

13501387
if (avg_string_length > max_avg_string_length_for_buffer) {
13511388
// Use sequential processing for large average string lengths
1352-
read_string_offsets_sequential<decode_block_size>(s, num_values_to_process, str_offsets);
1389+
read_string_offsets_sequential<decode_block_size>(
1390+
s, num_values_to_skip, num_values_to_process, str_offsets);
13531391
} else {
13541392
// Use buffered processing for typical string lengths
13551393
read_string_offsets_buffered<decode_block_size, prefetch_size>(
1356-
s, num_values_to_process, str_offsets);
1394+
s, num_values_to_skip, num_values_to_process, str_offsets);
13571395
}
13581396
}
13591397

cpp/src/io/parquet/page_string_utils.cuh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,11 +293,11 @@ __device__ size_t decode_strings(page_state_s* s,
293293
int input_thread_string_offset;
294294
int string_length;
295295
if (s->col.physical_type == Type::FIXED_LEN_BYTE_ARRAY) {
296-
input_thread_string_offset = src_pos * s->dtype_len_in;
296+
input_thread_string_offset = (thread_pos + skipped_leaf_values) * s->dtype_len_in;
297297
string_length = s->dtype_len_in;
298298
} else {
299-
input_thread_string_offset = str_offsets[src_pos];
300-
int const next_offset = str_offsets[src_pos + 1];
299+
input_thread_string_offset = str_offsets[thread_pos];
300+
int const next_offset = str_offsets[thread_pos + 1];
301301
// The memory is laid out as: 4-byte length, string, 4-byte length, string, ...
302302
// String length = subtract the offsets and the stored length of the next string
303303
// Except at the end of the dictionary, where the last string offset is repeated.

cpp/src/io/parquet/parquet_gpu.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -800,7 +800,6 @@ void compute_page_sizes(cudf::detail::hostdevice_span<PageInfo> pages,
800800
void compute_page_string_sizes_pass1(cudf::detail::hostdevice_span<PageInfo> pages,
801801
cudf::detail::hostdevice_span<ColumnChunkDesc const> chunks,
802802
cudf::device_span<bool const> page_mask,
803-
cudf::device_span<size_t const> page_string_offset_indices,
804803
size_t min_row,
805804
size_t num_rows,
806805
uint32_t kernel_mask,

cpp/src/io/parquet/reader_impl.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ void reader_impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num_
191191
initial_str_offsets =
192192
cudf::detail::make_device_uvector_async(host_offsets_vector, _stream, _mr);
193193
chunk_nested_str_data.host_to_device_async(_stream);
194+
195+
// Allocate string offset buffers and get string offsets for non-dictionary, non-FLBA string
196+
// columns
197+
compute_page_string_offset_indices(skip_rows, num_rows);
194198
}
195199

196200
// create this before we fork streams
@@ -211,7 +215,7 @@ void reader_impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num_
211215
decoder_mask,
212216
_subpass_page_mask,
213217
initial_str_offsets,
214-
subpass.page_string_offset_indices,
218+
_page_string_offset_indices,
215219
error_code.data(),
216220
streams[s_idx++]);
217221
};
@@ -471,12 +475,18 @@ void reader_impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num_
471475
}
472476
}
473477

478+
// Clear string offset buffers to free device memory
479+
_page_string_offset_indices.resize(0, _stream);
480+
_string_offset_buffer.resize(0, _stream);
481+
474482
_stream.synchronize();
475483
}
476484

477485
reader_impl::reader_impl()
478486
: _options{},
479-
_subpass_page_mask{cudf::detail::hostdevice_vector<bool>(0, cudf::get_default_stream())}
487+
_subpass_page_mask{cudf::detail::hostdevice_vector<bool>(0, cudf::get_default_stream())},
488+
_string_offset_buffer{0, cudf::get_default_stream()},
489+
_page_string_offset_indices{0, cudf::get_default_stream()}
480490
{
481491
}
482492

@@ -513,6 +523,8 @@ reader_impl::reader_impl(std::size_t chunk_read_limit,
513523
options.is_enabled_use_jit_filter()},
514524
_sources{std::move(sources)},
515525
_subpass_page_mask{cudf::detail::hostdevice_vector<bool>(0, _stream)},
526+
_string_offset_buffer{0, _stream},
527+
_page_string_offset_indices{0, _stream},
516528
_output_chunk_read_limit{chunk_read_limit},
517529
_input_pass_read_limit{pass_read_limit}
518530
{
@@ -634,7 +646,6 @@ void reader_impl::preprocess_chunk_strings(read_mode mode, row_range const& read
634646
compute_page_string_sizes_pass1(subpass.pages,
635647
pass.chunks,
636648
_subpass_page_mask,
637-
subpass.page_string_offset_indices,
638649
read_info.skip_rows,
639650
read_info.num_rows,
640651
subpass.kernel_mask,

0 commit comments

Comments
 (0)