Skip to content

Commit 4d40eb8

Browse files
authored
Avoiding repeated encoding and compression operation for the sort column included writes (opensearch-project#21464)
Signed-off-by: Chaitanya KSR <ksrchai@amazon.com>
1 parent 36809cc commit 4d40eb8

6 files changed

Lines changed: 579 additions & 103 deletions

File tree

sandbox/libs/dataformat-native/rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ members = [
1818
# Arrow / Parquet
1919
arrow = { version = "57.3.0", features = ["ffi"] }
2020
arrow-array = "57.3.0"
21+
arrow-ipc = "57.3.0"
2122
arrow-schema = "57.3.0"
2223
arrow-buffer = "57.3.0"
2324
parquet = "57.3.0"

sandbox/plugins/parquet-data-format/src/main/rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ crate-type = ["rlib"]
1515
[dependencies]
1616
arrow = { workspace = true }
1717
parquet = { workspace = true }
18+
arrow-ipc = { workspace = true }
1819
lazy_static = { workspace = true }
1920
dashmap = { workspace = true }
2021
tempfile = { workspace = true }

sandbox/plugins/parquet-data-format/src/main/rust/src/merge/heap.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use arrow::datatypes::{
1414
DataType as ArrowDataType, Date32Type, Date64Type, DurationMicrosecondType,
1515
DurationMillisecondType, DurationNanosecondType, DurationSecondType, Float32Type, Float64Type,
1616
Int16Type, Int32Type, Int64Type, Int8Type, TimestampMicrosecondType,
17-
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt32Type,
17+
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
1818
};
1919

2020
use super::error::{MergeError, MergeResult};
@@ -138,7 +138,6 @@ pub fn get_sort_value(
138138
ArrowDataType::Int32 => SortKey::Int(col.as_primitive::<Int32Type>().value(row) as i64),
139139
ArrowDataType::Int16 => SortKey::Int(col.as_primitive::<Int16Type>().value(row) as i64),
140140
ArrowDataType::Int8 => SortKey::Int(col.as_primitive::<Int8Type>().value(row) as i64),
141-
ArrowDataType::UInt32 => SortKey::Int(col.as_primitive::<UInt32Type>().value(row) as i64),
142141
ArrowDataType::Date32 => SortKey::Int(col.as_primitive::<Date32Type>().value(row) as i64),
143142
ArrowDataType::Date64 => SortKey::Int(col.as_primitive::<Date64Type>().value(row)),
144143
ArrowDataType::Timestamp(unit, _) => SortKey::Int(match unit {

sandbox/plugins/parquet-data-format/src/main/rust/src/tests/mod.rs

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,238 @@ fn test_unsorted_writer_preserves_insertion_order() {
276276
cleanup_ffi_schema(schema_ptr);
277277
}
278278

279+
// ===== Arrow IPC staging path tests =====
280+
281+
#[test]
282+
fn test_ipc_staging_sorted_writer_creates_and_cleans_up_staging_file() {
283+
let (_temp_dir, filename) = get_temp_file_path("ipc_cleanup.parquet");
284+
let (_schema, schema_ptr) = create_sorted_writer_and_assert_success(&filename, "id", false);
285+
286+
// The IPC staging file should exist while the writer is open
287+
let temp_filename = format!(
288+
"{}/temp-{}",
289+
Path::new(&filename).parent().unwrap().to_string_lossy(),
290+
Path::new(&filename).file_name().unwrap().to_string_lossy()
291+
);
292+
let ipc_staging_path = format!("{}.arrow_ipc_staging", temp_filename);
293+
assert!(Path::new(&ipc_staging_path).exists(), "IPC staging file should exist while writer is open");
294+
295+
let (ap, sp) = create_test_ffi_data_with_ids(vec![30, 10, 20], vec![Some("C"), Some("A"), Some("B")]).unwrap();
296+
NativeParquetWriter::write_data(filename.clone(), ap, sp).unwrap();
297+
cleanup_ffi_data(ap, sp);
298+
299+
NativeParquetWriter::finalize_writer(filename.clone()).unwrap();
300+
301+
// After finalize, the IPC staging file should be cleaned up
302+
assert!(!Path::new(&ipc_staging_path).exists(), "IPC staging file should be deleted after finalize");
303+
// The final Parquet file should exist
304+
assert!(Path::new(&filename).exists(), "Final Parquet file should exist");
305+
306+
// Verify data is sorted
307+
let ids = read_parquet_file_sorted_ids(&filename);
308+
assert_eq!(ids, vec![10, 20, 30]);
309+
310+
cleanup_ffi_schema(schema_ptr);
311+
}
312+
313+
#[test]
314+
fn test_ipc_staging_has_writer_returns_true() {
315+
let (_temp_dir, filename) = get_temp_file_path("ipc_has_writer.parquet");
316+
let (_schema, schema_ptr) = create_sorted_writer_and_assert_success(&filename, "id", false);
317+
318+
assert!(NativeParquetWriter::has_writer(&filename), "has_writer should return true for IPC writer");
319+
320+
close_writer_and_cleanup_schema(&filename, schema_ptr);
321+
}
322+
323+
#[test]
324+
fn test_ipc_staging_duplicate_writer_rejected() {
325+
let (_temp_dir, filename) = get_temp_file_path("ipc_dup.parquet");
326+
let (_schema, schema_ptr) = create_sorted_writer_and_assert_success(&filename, "id", false);
327+
328+
let (_, schema_ptr2) = create_test_ffi_schema();
329+
let result = NativeParquetWriter::create_writer(
330+
filename.clone(), "test-index".to_string(), schema_ptr2,
331+
vec!["id".to_string()], vec![false], vec![false], 0
332+
);
333+
assert!(result.is_err());
334+
assert!(result.unwrap_err().to_string().contains("Writer already exists"));
335+
336+
cleanup_ffi_schema(schema_ptr2);
337+
close_writer_and_cleanup_schema(&filename, schema_ptr);
338+
}
339+
340+
#[test]
341+
fn test_ipc_staging_empty_data_produces_valid_parquet() {
342+
let (_temp_dir, filename) = get_temp_file_path("ipc_empty.parquet");
343+
let (_schema, schema_ptr) = create_sorted_writer_and_assert_success(&filename, "id", false);
344+
345+
// Finalize without writing any data
346+
let result = NativeParquetWriter::finalize_writer(filename.clone());
347+
assert!(result.is_ok());
348+
assert!(Path::new(&filename).exists(), "Empty Parquet file should be created");
349+
350+
let metadata = result.unwrap().unwrap();
351+
assert_eq!(metadata.metadata.file_metadata().num_rows(), 0);
352+
353+
cleanup_ffi_schema(schema_ptr);
354+
}
355+
356+
#[test]
357+
fn test_ipc_staging_multi_batch_sort() {
358+
let (_temp_dir, filename) = get_temp_file_path("ipc_multi_batch.parquet");
359+
let (_schema, schema_ptr) = create_sorted_writer_and_assert_success(&filename, "id", false);
360+
361+
// Write multiple batches with interleaved values
362+
let (ap1, sp1) = create_test_ffi_data_with_ids(vec![50, 10], vec![Some("E"), Some("A")]).unwrap();
363+
NativeParquetWriter::write_data(filename.clone(), ap1, sp1).unwrap();
364+
cleanup_ffi_data(ap1, sp1);
365+
366+
let (ap2, sp2) = create_test_ffi_data_with_ids(vec![30, 20], vec![Some("C"), Some("B")]).unwrap();
367+
NativeParquetWriter::write_data(filename.clone(), ap2, sp2).unwrap();
368+
cleanup_ffi_data(ap2, sp2);
369+
370+
let (ap3, sp3) = create_test_ffi_data_with_ids(vec![40, 60], vec![Some("D"), Some("F")]).unwrap();
371+
NativeParquetWriter::write_data(filename.clone(), ap3, sp3).unwrap();
372+
cleanup_ffi_data(ap3, sp3);
373+
374+
NativeParquetWriter::finalize_writer(filename.clone()).unwrap();
375+
376+
let ids = read_parquet_file_sorted_ids(&filename);
377+
assert_eq!(ids, vec![10, 20, 30, 40, 50, 60], "Multiple IPC batches should be sorted correctly");
378+
379+
cleanup_ffi_schema(schema_ptr);
380+
}
381+
382+
#[test]
383+
fn test_ipc_staging_descending_sort() {
384+
let (_temp_dir, filename) = get_temp_file_path("ipc_desc.parquet");
385+
let (_schema, schema_ptr) = create_sorted_writer_and_assert_success(&filename, "id", true);
386+
387+
let (ap, sp) = create_test_ffi_data_with_ids(vec![10, 30, 20], vec![Some("A"), Some("C"), Some("B")]).unwrap();
388+
NativeParquetWriter::write_data(filename.clone(), ap, sp).unwrap();
389+
cleanup_ffi_data(ap, sp);
390+
391+
NativeParquetWriter::finalize_writer(filename.clone()).unwrap();
392+
393+
let ids = read_parquet_file_sorted_ids(&filename);
394+
assert_eq!(ids, vec![30, 20, 10], "IPC path should support descending sort");
395+
396+
cleanup_ffi_schema(schema_ptr);
397+
}
398+
399+
#[test]
400+
fn test_ipc_and_parquet_writers_coexist() {
401+
let (_temp_dir1, sorted_file) = get_temp_file_path("ipc_sorted.parquet");
402+
let (_temp_dir2, unsorted_file) = get_temp_file_path("parquet_unsorted.parquet");
403+
404+
// Create one IPC writer (sorted) and one Parquet writer (unsorted)
405+
let (_schema1, sp1) = create_sorted_writer_and_assert_success(&sorted_file, "id", false);
406+
let (_schema2, sp2) = create_writer_and_assert_success(&unsorted_file);
407+
408+
// Write to both
409+
let (ap1, dp1) = create_test_ffi_data_with_ids(vec![30, 10, 20], vec![Some("C"), Some("A"), Some("B")]).unwrap();
410+
NativeParquetWriter::write_data(sorted_file.clone(), ap1, dp1).unwrap();
411+
cleanup_ffi_data(ap1, dp1);
412+
413+
let (ap2, dp2) = create_test_ffi_data_with_ids(vec![30, 10, 20], vec![Some("C"), Some("A"), Some("B")]).unwrap();
414+
NativeParquetWriter::write_data(unsorted_file.clone(), ap2, dp2).unwrap();
415+
cleanup_ffi_data(ap2, dp2);
416+
417+
// Finalize both
418+
NativeParquetWriter::finalize_writer(sorted_file.clone()).unwrap();
419+
NativeParquetWriter::finalize_writer(unsorted_file.clone()).unwrap();
420+
421+
// Sorted file should be sorted
422+
let sorted_ids = read_parquet_file_sorted_ids(&sorted_file);
423+
assert_eq!(sorted_ids, vec![10, 20, 30]);
424+
425+
// Unsorted file should preserve insertion order
426+
let unsorted_ids = read_parquet_file_sorted_ids(&unsorted_file);
427+
assert_eq!(unsorted_ids, vec![30, 10, 20]);
428+
429+
cleanup_ffi_schema(sp1);
430+
cleanup_ffi_schema(sp2);
431+
}
432+
433+
#[test]
434+
fn test_ipc_staging_concurrent_sorted_writers() {
435+
let temp_dir = tempdir().unwrap();
436+
let thread_count = 6;
437+
let success_count = Arc::new(AtomicUsize::new(0));
438+
let mut handles = vec![];
439+
440+
for i in 0..thread_count {
441+
let temp_dir_path = temp_dir.path().to_path_buf();
442+
let success_count = Arc::clone(&success_count);
443+
let handle = thread::spawn(move || {
444+
let file_path = temp_dir_path.join(format!("ipc_concurrent_{}.parquet", i));
445+
let filename = file_path.to_string_lossy().to_string();
446+
let (_schema, schema_ptr) = create_test_ffi_schema();
447+
448+
if NativeParquetWriter::create_writer(
449+
filename.clone(), "test-index".to_string(), schema_ptr,
450+
vec!["id".to_string()], vec![false], vec![false], 0
451+
).is_ok() {
452+
let (ap, sp) = create_test_ffi_data_with_ids(
453+
vec![30, 10, 20], vec![Some("C"), Some("A"), Some("B")]
454+
).unwrap();
455+
let write_ok = NativeParquetWriter::write_data(filename.clone(), ap, sp).is_ok();
456+
cleanup_ffi_data(ap, sp);
457+
458+
if write_ok {
459+
if let Ok(Some(metadata)) = NativeParquetWriter::finalize_writer(filename.clone()) {
460+
if metadata.metadata.file_metadata().num_rows() == 3 {
461+
let ids = read_parquet_file_sorted_ids(&filename);
462+
if ids == vec![10, 20, 30] {
463+
success_count.fetch_add(1, Ordering::SeqCst);
464+
}
465+
}
466+
}
467+
}
468+
}
469+
cleanup_ffi_schema(schema_ptr);
470+
});
471+
handles.push(handle);
472+
}
473+
474+
for handle in handles {
475+
handle.join().unwrap();
476+
}
477+
assert_eq!(success_count.load(Ordering::SeqCst), thread_count);
478+
}
479+
480+
#[test]
481+
fn test_ipc_staging_complete_lifecycle_with_sync() {
482+
let (_temp_dir, filename) = get_temp_file_path("ipc_lifecycle.parquet");
483+
let file_path = Path::new(&filename);
484+
let (_schema, schema_ptr) = create_sorted_writer_and_assert_success(&filename, "id", false);
485+
486+
for batch_ids in [vec![50, 30], vec![10, 40], vec![20, 60]] {
487+
let names: Vec<Option<&str>> = batch_ids.iter().map(|_| Some("x")).collect();
488+
let (ap, sp) = create_test_ffi_data_with_ids(batch_ids, names).unwrap();
489+
NativeParquetWriter::write_data(filename.clone(), ap, sp).unwrap();
490+
cleanup_ffi_data(ap, sp);
491+
}
492+
493+
let result = NativeParquetWriter::finalize_writer(filename.clone());
494+
assert!(result.is_ok());
495+
let metadata = result.unwrap().unwrap();
496+
assert_eq!(metadata.metadata.file_metadata().num_rows(), 6);
497+
498+
assert!(NativeParquetWriter::sync_to_disk(filename.clone()).is_ok());
499+
assert!(file_path.exists());
500+
assert!(file_path.metadata().unwrap().len() > 0);
501+
502+
let ids = read_parquet_file_sorted_ids(&filename);
503+
assert_eq!(ids, vec![10, 20, 30, 40, 50, 60]);
504+
505+
let read_metadata = NativeParquetWriter::get_file_metadata(filename.clone()).unwrap();
506+
assert_eq!(read_metadata.num_rows(), 6);
507+
508+
cleanup_ffi_schema(schema_ptr);
509+
}
510+
279511
#[test]
280512
fn test_get_filtered_writer_memory_usage_with_writers() {
281513
let (_temp_dir, filename1) = get_temp_file_path("test1.parquet");

0 commit comments

Comments
 (0)