|
3 | 3 |
|
4 | 4 | //! This module contains tests for the `vortex_scan` table function. |
5 | 5 |
|
| 6 | +use std::collections::BTreeMap; |
6 | 7 | use std::ffi::CStr; |
7 | 8 | use std::io::Write; |
8 | 9 | use std::net::TcpListener; |
@@ -174,6 +175,28 @@ async fn write_vortex_file_to_dir( |
174 | 175 | temp_file_path |
175 | 176 | } |
176 | 177 |
|
| 178 | +async fn write_vortex_struct_file_to_dir( |
| 179 | + dir: &Path, |
| 180 | + prefix: &str, |
| 181 | + iter: impl Iterator<Item = (impl AsRef<str>, impl IntoArray)>, |
| 182 | +) -> NamedTempFile { |
| 183 | + let struct_array = StructArray::try_from_iter(iter).unwrap(); |
| 184 | + let temp_file_path = tempfile::Builder::new() |
| 185 | + .prefix(prefix) |
| 186 | + .suffix(".vortex") |
| 187 | + .tempfile_in(dir) |
| 188 | + .unwrap(); |
| 189 | + |
| 190 | + let mut file = async_fs::File::create(&temp_file_path).await.unwrap(); |
| 191 | + SESSION |
| 192 | + .write_options() |
| 193 | + .write(&mut file, struct_array.to_array_stream()) |
| 194 | + .await |
| 195 | + .unwrap(); |
| 196 | + |
| 197 | + temp_file_path |
| 198 | +} |
| 199 | + |
177 | 200 | #[test] |
178 | 201 | fn test_scan_function_registration() { |
179 | 202 | let conn = database_connection(); |
@@ -350,6 +373,284 @@ fn test_vortex_scan_multiple_files() { |
350 | 373 | assert_eq!(total_sum, 21); |
351 | 374 | } |
352 | 375 |
|
| 376 | +#[test] |
| 377 | +fn test_vortex_scan_tpch_q13_style_left_join_over_multifile_orders() { |
| 378 | + let (tempdir, _customer_file, _orders_file1, _orders_file2) = RUNTIME.block_on(async { |
| 379 | + let tempdir = tempfile::tempdir().unwrap(); |
| 380 | + |
| 381 | + let customer_file = write_vortex_struct_file_to_dir( |
| 382 | + tempdir.path(), |
| 383 | + "customer_", |
| 384 | + [ |
| 385 | + ("c_custkey", buffer![1i64, 2, 3, 4, 5].into_array()), |
| 386 | + ( |
| 387 | + "c_comment", |
| 388 | + VarBinArray::from(vec!["c1", "c2", "c3", "c4", "c5"]).into_array(), |
| 389 | + ), |
| 390 | + ] |
| 391 | + .into_iter(), |
| 392 | + ) |
| 393 | + .await; |
| 394 | + |
| 395 | + let orders_file1 = write_vortex_struct_file_to_dir( |
| 396 | + tempdir.path(), |
| 397 | + "orders_", |
| 398 | + [ |
| 399 | + ("o_orderkey", buffer![10i64, 20, 30].into_array()), |
| 400 | + ("o_custkey", buffer![1i64, 2, 3].into_array()), |
| 401 | + ( |
| 402 | + "o_comment", |
| 403 | + VarBinArray::from(vec![ |
| 404 | + "ordinary order", |
| 405 | + "special handling requests", |
| 406 | + "regular comment", |
| 407 | + ]) |
| 408 | + .into_array(), |
| 409 | + ), |
| 410 | + ] |
| 411 | + .into_iter(), |
| 412 | + ) |
| 413 | + .await; |
| 414 | + |
| 415 | + let orders_file2 = write_vortex_struct_file_to_dir( |
| 416 | + tempdir.path(), |
| 417 | + "orders_", |
| 418 | + [ |
| 419 | + ("o_orderkey", buffer![11i64, 21, 40].into_array()), |
| 420 | + ("o_custkey", buffer![1i64, 2, 4].into_array()), |
| 421 | + ( |
| 422 | + "o_comment", |
| 423 | + VarBinArray::from(vec![ |
| 424 | + "special service requests", |
| 425 | + "another normal order", |
| 426 | + "special packaging requests", |
| 427 | + ]) |
| 428 | + .into_array(), |
| 429 | + ), |
| 430 | + ] |
| 431 | + .into_iter(), |
| 432 | + ) |
| 433 | + .await; |
| 434 | + |
| 435 | + (tempdir, customer_file, orders_file1, orders_file2) |
| 436 | + }); |
| 437 | + |
| 438 | + let customer_glob = format!("{}/customer_*.vortex", tempdir.path().display()); |
| 439 | + let orders_glob = format!("{}/orders_*.vortex", tempdir.path().display()); |
| 440 | + |
| 441 | + let conn = database_connection(); |
| 442 | + conn.query(&format!( |
| 443 | + "CREATE OR REPLACE VIEW customer AS SELECT * FROM read_vortex('{customer_glob}') WHERE c_custkey IS NOT NULL" |
| 444 | + )) |
| 445 | + .unwrap(); |
| 446 | + conn.query(&format!( |
| 447 | + "CREATE OR REPLACE VIEW orders AS SELECT * FROM read_vortex('{orders_glob}') WHERE o_orderkey IS NOT NULL" |
| 448 | + )) |
| 449 | + .unwrap(); |
| 450 | + |
| 451 | + let result = conn |
| 452 | + .query( |
| 453 | + " |
| 454 | + SELECT |
| 455 | + c_count, |
| 456 | + count(*) AS custdist |
| 457 | + FROM ( |
| 458 | + SELECT |
| 459 | + c_custkey, |
| 460 | + count(o_orderkey) AS c_count |
| 461 | + FROM |
| 462 | + customer |
| 463 | + LEFT OUTER JOIN orders |
| 464 | + ON c_custkey = o_custkey |
| 465 | + AND o_comment NOT LIKE '%special%requests%' |
| 466 | + GROUP BY |
| 467 | + c_custkey |
| 468 | + ) AS c_orders(c_custkey, c_count) |
| 469 | + GROUP BY |
| 470 | + c_count |
| 471 | + ORDER BY |
| 472 | + custdist DESC, |
| 473 | + c_count DESC |
| 474 | + ", |
| 475 | + ) |
| 476 | + .unwrap(); |
| 477 | + |
| 478 | + let mut rows = Vec::new(); |
| 479 | + for chunk in result { |
| 480 | + let len = chunk.len().as_(); |
| 481 | + let counts = chunk.get_vector(0); |
| 482 | + let dists = chunk.get_vector(1); |
| 483 | + let counts = counts.as_slice_with_len::<i64>(len); |
| 484 | + let dists = dists.as_slice_with_len::<i64>(len); |
| 485 | + rows.extend(counts.iter().copied().zip(dists.iter().copied())); |
| 486 | + } |
| 487 | + |
| 488 | + assert_eq!(rows, vec![(1, 3), (0, 2)]); |
| 489 | +} |
| 490 | + |
| 491 | +#[test] |
| 492 | +fn test_vortex_scan_tpch_q13_style_large_multifile_orders_matches_expected() { |
| 493 | + const CUSTOMER_COUNT: i64 = 2048; |
| 494 | + const ORDER_FILE_COUNT: usize = 3; |
| 495 | + const INCLUDED_MODULUS: i64 = 64; |
| 496 | + const EXCLUDED_MODULUS: i64 = 7; |
| 497 | + |
| 498 | + let mut expected_counts = BTreeMap::<i64, i64>::new(); |
| 499 | + let (tempdir, _customer_file, _order_files) = RUNTIME.block_on(async { |
| 500 | + let tempdir = tempfile::tempdir().unwrap(); |
| 501 | + |
| 502 | + let customer_keys: Vec<i64> = (1..=CUSTOMER_COUNT).collect(); |
| 503 | + let customer_comments: Vec<String> = (1..=CUSTOMER_COUNT) |
| 504 | + .map(|custkey| format!("customer {custkey}")) |
| 505 | + .collect(); |
| 506 | + let customer_file = write_vortex_struct_file_to_dir( |
| 507 | + tempdir.path(), |
| 508 | + "customer_", |
| 509 | + [ |
| 510 | + ( |
| 511 | + "c_custkey", |
| 512 | + customer_keys |
| 513 | + .into_iter() |
| 514 | + .collect::<PrimitiveArray>() |
| 515 | + .into_array(), |
| 516 | + ), |
| 517 | + ( |
| 518 | + "c_comment", |
| 519 | + VarBinArray::from(customer_comments).into_array(), |
| 520 | + ), |
| 521 | + ] |
| 522 | + .into_iter(), |
| 523 | + ) |
| 524 | + .await; |
| 525 | + |
| 526 | + let mut order_keys_per_file = vec![Vec::new(); ORDER_FILE_COUNT]; |
| 527 | + let mut order_custkeys_per_file = vec![Vec::new(); ORDER_FILE_COUNT]; |
| 528 | + let mut order_comments_per_file = vec![Vec::new(); ORDER_FILE_COUNT]; |
| 529 | + let mut next_orderkey = 1_i64; |
| 530 | + |
| 531 | + for custkey in 1..=CUSTOMER_COUNT { |
| 532 | + let included = custkey % INCLUDED_MODULUS; |
| 533 | + let excluded = custkey % EXCLUDED_MODULUS; |
| 534 | + *expected_counts.entry(included).or_default() += 1; |
| 535 | + |
| 536 | + for _ in 0..included { |
| 537 | + let file_idx = usize::try_from(next_orderkey).unwrap() % ORDER_FILE_COUNT; |
| 538 | + order_keys_per_file[file_idx].push(next_orderkey); |
| 539 | + order_custkeys_per_file[file_idx].push(custkey); |
| 540 | + order_comments_per_file[file_idx].push("ordinary order".to_string()); |
| 541 | + next_orderkey += 1; |
| 542 | + } |
| 543 | + |
| 544 | + for _ in 0..excluded { |
| 545 | + let file_idx = usize::try_from(next_orderkey).unwrap() % ORDER_FILE_COUNT; |
| 546 | + order_keys_per_file[file_idx].push(next_orderkey); |
| 547 | + order_custkeys_per_file[file_idx].push(custkey); |
| 548 | + order_comments_per_file[file_idx].push("special handling requests".to_string()); |
| 549 | + next_orderkey += 1; |
| 550 | + } |
| 551 | + } |
| 552 | + |
| 553 | + let mut order_files = Vec::with_capacity(ORDER_FILE_COUNT); |
| 554 | + for file_idx in 0..ORDER_FILE_COUNT { |
| 555 | + order_files.push( |
| 556 | + write_vortex_struct_file_to_dir( |
| 557 | + tempdir.path(), |
| 558 | + "orders_", |
| 559 | + [ |
| 560 | + ( |
| 561 | + "o_orderkey", |
| 562 | + std::mem::take(&mut order_keys_per_file[file_idx]) |
| 563 | + .into_iter() |
| 564 | + .collect::<PrimitiveArray>() |
| 565 | + .into_array(), |
| 566 | + ), |
| 567 | + ( |
| 568 | + "o_custkey", |
| 569 | + std::mem::take(&mut order_custkeys_per_file[file_idx]) |
| 570 | + .into_iter() |
| 571 | + .collect::<PrimitiveArray>() |
| 572 | + .into_array(), |
| 573 | + ), |
| 574 | + ( |
| 575 | + "o_comment", |
| 576 | + VarBinArray::from(std::mem::take( |
| 577 | + &mut order_comments_per_file[file_idx], |
| 578 | + )) |
| 579 | + .into_array(), |
| 580 | + ), |
| 581 | + ] |
| 582 | + .into_iter(), |
| 583 | + ) |
| 584 | + .await, |
| 585 | + ); |
| 586 | + } |
| 587 | + |
| 588 | + (tempdir, customer_file, order_files) |
| 589 | + }); |
| 590 | + |
| 591 | + let customer_glob = format!("{}/customer_*.vortex", tempdir.path().display()); |
| 592 | + let orders_glob = format!("{}/orders_*.vortex", tempdir.path().display()); |
| 593 | + |
| 594 | + let mut expected_rows: Vec<(i64, i64)> = expected_counts.into_iter().collect(); |
| 595 | + expected_rows.sort_by(|(left_count, left_dist), (right_count, right_dist)| { |
| 596 | + right_dist |
| 597 | + .cmp(left_dist) |
| 598 | + .then_with(|| right_count.cmp(left_count)) |
| 599 | + }); |
| 600 | + |
| 601 | + let conn = database_connection(); |
| 602 | + conn.query(&format!( |
| 603 | + "CREATE OR REPLACE VIEW customer AS SELECT * FROM read_vortex('{customer_glob}')" |
| 604 | + )) |
| 605 | + .unwrap(); |
| 606 | + conn.query(&format!( |
| 607 | + "CREATE OR REPLACE VIEW orders AS SELECT * FROM read_vortex('{orders_glob}')" |
| 608 | + )) |
| 609 | + .unwrap(); |
| 610 | + |
| 611 | + let result = conn |
| 612 | + .query( |
| 613 | + " |
| 614 | + SELECT |
| 615 | + c_count, |
| 616 | + count(*) AS custdist |
| 617 | + FROM ( |
| 618 | + SELECT |
| 619 | + c_custkey, |
| 620 | + count(o_orderkey) AS c_count |
| 621 | + FROM |
| 622 | + customer |
| 623 | + LEFT OUTER JOIN orders |
| 624 | + ON c_custkey = o_custkey |
| 625 | + AND o_comment NOT LIKE '%special%requests%' |
| 626 | + GROUP BY |
| 627 | + c_custkey |
| 628 | + ) AS c_orders(c_custkey, c_count) |
| 629 | + GROUP BY |
| 630 | + c_count |
| 631 | + ORDER BY |
| 632 | + custdist DESC, |
| 633 | + c_count DESC |
| 634 | + ", |
| 635 | + ) |
| 636 | + .unwrap(); |
| 637 | + |
| 638 | + let row_count = result.row_count(); |
| 639 | + let mut actual_rows = Vec::new(); |
| 640 | + for chunk in result { |
| 641 | + let len = chunk.len().as_(); |
| 642 | + let counts = chunk.get_vector(0); |
| 643 | + let dists = chunk.get_vector(1); |
| 644 | + let counts = counts.as_slice_with_len::<i64>(len); |
| 645 | + let dists = dists.as_slice_with_len::<i64>(len); |
| 646 | + actual_rows.extend(counts.iter().copied().zip(dists.iter().copied())); |
| 647 | + } |
| 648 | + |
| 649 | + assert_eq!(usize::try_from(row_count).unwrap(), expected_rows.len()); |
| 650 | + assert_eq!(actual_rows.len(), expected_rows.len()); |
| 651 | + assert_eq!(actual_rows, expected_rows); |
| 652 | +} |
| 653 | + |
353 | 654 | #[test] |
354 | 655 | fn test_vortex_scan_over_http() { |
355 | 656 | let file = RUNTIME.block_on(async { |
|
0 commit comments