Skip to content

Commit 139e226

Browse files
committed
Some fixes
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent ccc391d commit 139e226

3 files changed

Lines changed: 361 additions & 11 deletions

File tree

vortex-duckdb/src/e2e_test/vortex_scan_test.rs

Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
//! This module contains tests for the `vortex_scan` table function.
55
6+
use std::collections::BTreeMap;
67
use std::ffi::CStr;
78
use std::io::Write;
89
use std::net::TcpListener;
@@ -174,6 +175,28 @@ async fn write_vortex_file_to_dir(
174175
temp_file_path
175176
}
176177

178+
async fn write_vortex_struct_file_to_dir(
179+
dir: &Path,
180+
prefix: &str,
181+
iter: impl Iterator<Item = (impl AsRef<str>, impl IntoArray)>,
182+
) -> NamedTempFile {
183+
let struct_array = StructArray::try_from_iter(iter).unwrap();
184+
let temp_file_path = tempfile::Builder::new()
185+
.prefix(prefix)
186+
.suffix(".vortex")
187+
.tempfile_in(dir)
188+
.unwrap();
189+
190+
let mut file = async_fs::File::create(&temp_file_path).await.unwrap();
191+
SESSION
192+
.write_options()
193+
.write(&mut file, struct_array.to_array_stream())
194+
.await
195+
.unwrap();
196+
197+
temp_file_path
198+
}
199+
177200
#[test]
178201
fn test_scan_function_registration() {
179202
let conn = database_connection();
@@ -350,6 +373,284 @@ fn test_vortex_scan_multiple_files() {
350373
assert_eq!(total_sum, 21);
351374
}
352375

376+
#[test]
377+
fn test_vortex_scan_tpch_q13_style_left_join_over_multifile_orders() {
378+
let (tempdir, _customer_file, _orders_file1, _orders_file2) = RUNTIME.block_on(async {
379+
let tempdir = tempfile::tempdir().unwrap();
380+
381+
let customer_file = write_vortex_struct_file_to_dir(
382+
tempdir.path(),
383+
"customer_",
384+
[
385+
("c_custkey", buffer![1i64, 2, 3, 4, 5].into_array()),
386+
(
387+
"c_comment",
388+
VarBinArray::from(vec!["c1", "c2", "c3", "c4", "c5"]).into_array(),
389+
),
390+
]
391+
.into_iter(),
392+
)
393+
.await;
394+
395+
let orders_file1 = write_vortex_struct_file_to_dir(
396+
tempdir.path(),
397+
"orders_",
398+
[
399+
("o_orderkey", buffer![10i64, 20, 30].into_array()),
400+
("o_custkey", buffer![1i64, 2, 3].into_array()),
401+
(
402+
"o_comment",
403+
VarBinArray::from(vec![
404+
"ordinary order",
405+
"special handling requests",
406+
"regular comment",
407+
])
408+
.into_array(),
409+
),
410+
]
411+
.into_iter(),
412+
)
413+
.await;
414+
415+
let orders_file2 = write_vortex_struct_file_to_dir(
416+
tempdir.path(),
417+
"orders_",
418+
[
419+
("o_orderkey", buffer![11i64, 21, 40].into_array()),
420+
("o_custkey", buffer![1i64, 2, 4].into_array()),
421+
(
422+
"o_comment",
423+
VarBinArray::from(vec![
424+
"special service requests",
425+
"another normal order",
426+
"special packaging requests",
427+
])
428+
.into_array(),
429+
),
430+
]
431+
.into_iter(),
432+
)
433+
.await;
434+
435+
(tempdir, customer_file, orders_file1, orders_file2)
436+
});
437+
438+
let customer_glob = format!("{}/customer_*.vortex", tempdir.path().display());
439+
let orders_glob = format!("{}/orders_*.vortex", tempdir.path().display());
440+
441+
let conn = database_connection();
442+
conn.query(&format!(
443+
"CREATE OR REPLACE VIEW customer AS SELECT * FROM read_vortex('{customer_glob}') WHERE c_custkey IS NOT NULL"
444+
))
445+
.unwrap();
446+
conn.query(&format!(
447+
"CREATE OR REPLACE VIEW orders AS SELECT * FROM read_vortex('{orders_glob}') WHERE o_orderkey IS NOT NULL"
448+
))
449+
.unwrap();
450+
451+
let result = conn
452+
.query(
453+
"
454+
SELECT
455+
c_count,
456+
count(*) AS custdist
457+
FROM (
458+
SELECT
459+
c_custkey,
460+
count(o_orderkey) AS c_count
461+
FROM
462+
customer
463+
LEFT OUTER JOIN orders
464+
ON c_custkey = o_custkey
465+
AND o_comment NOT LIKE '%special%requests%'
466+
GROUP BY
467+
c_custkey
468+
) AS c_orders(c_custkey, c_count)
469+
GROUP BY
470+
c_count
471+
ORDER BY
472+
custdist DESC,
473+
c_count DESC
474+
",
475+
)
476+
.unwrap();
477+
478+
let mut rows = Vec::new();
479+
for chunk in result {
480+
let len = chunk.len().as_();
481+
let counts = chunk.get_vector(0);
482+
let dists = chunk.get_vector(1);
483+
let counts = counts.as_slice_with_len::<i64>(len);
484+
let dists = dists.as_slice_with_len::<i64>(len);
485+
rows.extend(counts.iter().copied().zip(dists.iter().copied()));
486+
}
487+
488+
assert_eq!(rows, vec![(1, 3), (0, 2)]);
489+
}
490+
491+
#[test]
492+
fn test_vortex_scan_tpch_q13_style_large_multifile_orders_matches_expected() {
493+
const CUSTOMER_COUNT: i64 = 2048;
494+
const ORDER_FILE_COUNT: usize = 3;
495+
const INCLUDED_MODULUS: i64 = 64;
496+
const EXCLUDED_MODULUS: i64 = 7;
497+
498+
let mut expected_counts = BTreeMap::<i64, i64>::new();
499+
let (tempdir, _customer_file, _order_files) = RUNTIME.block_on(async {
500+
let tempdir = tempfile::tempdir().unwrap();
501+
502+
let customer_keys: Vec<i64> = (1..=CUSTOMER_COUNT).collect();
503+
let customer_comments: Vec<String> = (1..=CUSTOMER_COUNT)
504+
.map(|custkey| format!("customer {custkey}"))
505+
.collect();
506+
let customer_file = write_vortex_struct_file_to_dir(
507+
tempdir.path(),
508+
"customer_",
509+
[
510+
(
511+
"c_custkey",
512+
customer_keys
513+
.into_iter()
514+
.collect::<PrimitiveArray>()
515+
.into_array(),
516+
),
517+
(
518+
"c_comment",
519+
VarBinArray::from(customer_comments).into_array(),
520+
),
521+
]
522+
.into_iter(),
523+
)
524+
.await;
525+
526+
let mut order_keys_per_file = vec![Vec::new(); ORDER_FILE_COUNT];
527+
let mut order_custkeys_per_file = vec![Vec::new(); ORDER_FILE_COUNT];
528+
let mut order_comments_per_file = vec![Vec::new(); ORDER_FILE_COUNT];
529+
let mut next_orderkey = 1_i64;
530+
531+
for custkey in 1..=CUSTOMER_COUNT {
532+
let included = custkey % INCLUDED_MODULUS;
533+
let excluded = custkey % EXCLUDED_MODULUS;
534+
*expected_counts.entry(included).or_default() += 1;
535+
536+
for _ in 0..included {
537+
let file_idx = (next_orderkey as usize) % ORDER_FILE_COUNT;
538+
order_keys_per_file[file_idx].push(next_orderkey);
539+
order_custkeys_per_file[file_idx].push(custkey);
540+
order_comments_per_file[file_idx].push("ordinary order".to_string());
541+
next_orderkey += 1;
542+
}
543+
544+
for _ in 0..excluded {
545+
let file_idx = (next_orderkey as usize) % ORDER_FILE_COUNT;
546+
order_keys_per_file[file_idx].push(next_orderkey);
547+
order_custkeys_per_file[file_idx].push(custkey);
548+
order_comments_per_file[file_idx].push("special handling requests".to_string());
549+
next_orderkey += 1;
550+
}
551+
}
552+
553+
let mut order_files = Vec::with_capacity(ORDER_FILE_COUNT);
554+
for file_idx in 0..ORDER_FILE_COUNT {
555+
order_files.push(
556+
write_vortex_struct_file_to_dir(
557+
tempdir.path(),
558+
"orders_",
559+
[
560+
(
561+
"o_orderkey",
562+
std::mem::take(&mut order_keys_per_file[file_idx])
563+
.into_iter()
564+
.collect::<PrimitiveArray>()
565+
.into_array(),
566+
),
567+
(
568+
"o_custkey",
569+
std::mem::take(&mut order_custkeys_per_file[file_idx])
570+
.into_iter()
571+
.collect::<PrimitiveArray>()
572+
.into_array(),
573+
),
574+
(
575+
"o_comment",
576+
VarBinArray::from(std::mem::take(
577+
&mut order_comments_per_file[file_idx],
578+
))
579+
.into_array(),
580+
),
581+
]
582+
.into_iter(),
583+
)
584+
.await,
585+
);
586+
}
587+
588+
(tempdir, customer_file, order_files)
589+
});
590+
591+
let customer_glob = format!("{}/customer_*.vortex", tempdir.path().display());
592+
let orders_glob = format!("{}/orders_*.vortex", tempdir.path().display());
593+
594+
let mut expected_rows: Vec<(i64, i64)> = expected_counts.into_iter().collect();
595+
expected_rows.sort_by(|(left_count, left_dist), (right_count, right_dist)| {
596+
right_dist
597+
.cmp(left_dist)
598+
.then_with(|| right_count.cmp(left_count))
599+
});
600+
601+
let conn = database_connection();
602+
conn.query(&format!(
603+
"CREATE OR REPLACE VIEW customer AS SELECT * FROM read_vortex('{customer_glob}')"
604+
))
605+
.unwrap();
606+
conn.query(&format!(
607+
"CREATE OR REPLACE VIEW orders AS SELECT * FROM read_vortex('{orders_glob}')"
608+
))
609+
.unwrap();
610+
611+
let result = conn
612+
.query(
613+
"
614+
SELECT
615+
c_count,
616+
count(*) AS custdist
617+
FROM (
618+
SELECT
619+
c_custkey,
620+
count(o_orderkey) AS c_count
621+
FROM
622+
customer
623+
LEFT OUTER JOIN orders
624+
ON c_custkey = o_custkey
625+
AND o_comment NOT LIKE '%special%requests%'
626+
GROUP BY
627+
c_custkey
628+
) AS c_orders(c_custkey, c_count)
629+
GROUP BY
630+
c_count
631+
ORDER BY
632+
custdist DESC,
633+
c_count DESC
634+
",
635+
)
636+
.unwrap();
637+
638+
let row_count = result.row_count();
639+
let mut actual_rows = Vec::new();
640+
for chunk in result {
641+
let len = chunk.len().as_();
642+
let counts = chunk.get_vector(0);
643+
let dists = chunk.get_vector(1);
644+
let counts = counts.as_slice_with_len::<i64>(len);
645+
let dists = dists.as_slice_with_len::<i64>(len);
646+
actual_rows.extend(counts.iter().copied().zip(dists.iter().copied()));
647+
}
648+
649+
assert_eq!(usize::try_from(row_count).unwrap(), expected_rows.len());
650+
assert_eq!(actual_rows.len(), expected_rows.len());
651+
assert_eq!(actual_rows, expected_rows);
652+
}
653+
353654
#[test]
354655
fn test_vortex_scan_over_http() {
355656
let file = RUNTIME.block_on(async {

vortex-duckdb/src/exporter/varbinview.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ fn to_ptr_binary_view<'a>(
123123
_ref: PtrRef {
124124
size: v.len(),
125125
prefix: view.prefix,
126-
// TODO(joe) verify this.
127126
ptr: unsafe {
128127
buffers[view.buffer_index as usize]
129128
.as_ptr()

0 commit comments

Comments
 (0)