|
| 1 | +// SPDX-License-Identifier: Apache-2.0 |
| 2 | +// SPDX-FileCopyrightText: Copyright the Vortex contributors |
| 3 | + |
| 4 | +//! Real-data column loaders for the P7 layered-pco-bench. |
| 5 | +//! |
| 6 | +//! Each function returns a vector of `(column_name, dataset_label, PrimitiveArray)` |
| 7 | +//! tuples. Failures to load (e.g. missing fixtures) yield an empty vector plus a |
| 8 | +//! one-line note printed to stderr. The bench main loop concatenates these onto |
| 9 | +//! the synthetic columns from P6 and runs the existing 5-variant measurement. |
| 10 | +
|
| 11 | +use arrow_array::cast::AsArray; |
| 12 | +use arrow_array::types::Date32Type; |
| 13 | +use arrow_array::types::Decimal128Type; |
| 14 | +use arrow_array::types::Int64Type; |
| 15 | +use tpchgen::generators::LineItemGenerator; |
| 16 | +use tpchgen::generators::OrderGenerator; |
| 17 | +use tpchgen_arrow::LineItemArrow; |
| 18 | +use tpchgen_arrow::OrderArrow; |
| 19 | +use vortex_array::arrays::PrimitiveArray; |
| 20 | +use vortex_array::validity::Validity; |
| 21 | +use vortex_buffer::BufferMut; |
| 22 | +use vortex_error::VortexResult; |
| 23 | + |
| 24 | +/// A single column under test. |
| 25 | +pub struct DatasetColumn { |
| 26 | + /// Dataset label, e.g. "tpch_sf0p1_lineitem". |
| 27 | + pub dataset: &'static str, |
| 28 | + /// Column label, e.g. "l_orderkey". |
| 29 | + pub column: &'static str, |
| 30 | + /// Encoded as i64 for uniform handling by the variant runners. |
| 31 | + pub array: PrimitiveArray, |
| 32 | +} |
| 33 | + |
| 34 | +/// Maximum rows to feed each variant. Limits the per-column runtime. |
| 35 | +pub const MAX_ROWS: usize = 1_000_000; |
| 36 | + |
| 37 | +/// Build all the TPC-H columns. Returns empty + stderr note if generation fails. |
| 38 | +pub fn tpch_columns() -> Vec<DatasetColumn> { |
| 39 | + match build_tpch_inner() { |
| 40 | + Ok(cols) => cols, |
| 41 | + Err(e) => { |
| 42 | + eprintln!("tpch: skipped ({e})"); |
| 43 | + Vec::new() |
| 44 | + } |
| 45 | + } |
| 46 | +} |
| 47 | + |
| 48 | +fn build_tpch_inner() -> VortexResult<Vec<DatasetColumn>> { |
| 49 | + // SF=0.1 gives ~600k lineitem rows and ~150k orders rows. |
| 50 | + let scale_factor = 0.1; |
| 51 | + let lineitem_label: &'static str = "tpch_sf0p1_lineitem"; |
| 52 | + let orders_label: &'static str = "tpch_sf0p1_orders"; |
| 53 | + |
| 54 | + let mut out = Vec::new(); |
| 55 | + |
| 56 | + // ---- LineItem ---- |
| 57 | + { |
| 58 | + let generator = LineItemGenerator::new(scale_factor, 1, 1); |
| 59 | + let arrow_iter = LineItemArrow::new(generator).with_batch_size(65_536); |
| 60 | + let mut l_orderkey = BufferMut::<i64>::with_capacity(MAX_ROWS); |
| 61 | + let mut l_extendedprice_cents = BufferMut::<i64>::with_capacity(MAX_ROWS); |
| 62 | + let mut l_shipdate = BufferMut::<i32>::with_capacity(MAX_ROWS); |
| 63 | + let mut l_quantity_cents = BufferMut::<i64>::with_capacity(MAX_ROWS); |
| 64 | + |
| 65 | + for batch in arrow_iter { |
| 66 | + if l_orderkey.len() >= MAX_ROWS { |
| 67 | + break; |
| 68 | + } |
| 69 | + let remaining = MAX_ROWS - l_orderkey.len(); |
| 70 | + let take = batch.num_rows().min(remaining); |
| 71 | + |
| 72 | + let ok = batch |
| 73 | + .column_by_name("l_orderkey") |
| 74 | + .ok_or_else(|| vortex_error::vortex_err!("missing l_orderkey"))? |
| 75 | + .as_primitive::<Int64Type>(); |
| 76 | + extend_i64(&mut l_orderkey, ok, take); |
| 77 | + |
| 78 | + let ep = batch |
| 79 | + .column_by_name("l_extendedprice") |
| 80 | + .ok_or_else(|| vortex_error::vortex_err!("missing l_extendedprice"))? |
| 81 | + .as_primitive::<Decimal128Type>(); |
| 82 | + extend_decimal128_as_i64(&mut l_extendedprice_cents, ep, take); |
| 83 | + |
| 84 | + let sd = batch |
| 85 | + .column_by_name("l_shipdate") |
| 86 | + .ok_or_else(|| vortex_error::vortex_err!("missing l_shipdate"))? |
| 87 | + .as_primitive::<Date32Type>(); |
| 88 | + extend_i32(&mut l_shipdate, sd, take); |
| 89 | + |
| 90 | + let q = batch |
| 91 | + .column_by_name("l_quantity") |
| 92 | + .ok_or_else(|| vortex_error::vortex_err!("missing l_quantity"))? |
| 93 | + .as_primitive::<Decimal128Type>(); |
| 94 | + extend_decimal128_as_i64(&mut l_quantity_cents, q, take); |
| 95 | + } |
| 96 | + |
| 97 | + out.push(DatasetColumn { |
| 98 | + dataset: lineitem_label, |
| 99 | + column: "l_orderkey", |
| 100 | + array: PrimitiveArray::new(l_orderkey.freeze(), Validity::NonNullable), |
| 101 | + }); |
| 102 | + out.push(DatasetColumn { |
| 103 | + dataset: lineitem_label, |
| 104 | + column: "l_extendedprice_cents", |
| 105 | + array: PrimitiveArray::new(l_extendedprice_cents.freeze(), Validity::NonNullable), |
| 106 | + }); |
| 107 | + out.push(DatasetColumn { |
| 108 | + dataset: lineitem_label, |
| 109 | + column: "l_shipdate_i64", |
| 110 | + array: PrimitiveArray::new(i32_buf_to_i64(l_shipdate).freeze(), Validity::NonNullable), |
| 111 | + }); |
| 112 | + out.push(DatasetColumn { |
| 113 | + dataset: lineitem_label, |
| 114 | + column: "l_quantity_cents", |
| 115 | + array: PrimitiveArray::new(l_quantity_cents.freeze(), Validity::NonNullable), |
| 116 | + }); |
| 117 | + } |
| 118 | + |
| 119 | + // ---- Orders ---- |
| 120 | + { |
| 121 | + let generator = OrderGenerator::new(scale_factor, 1, 1); |
| 122 | + let arrow_iter = OrderArrow::new(generator).with_batch_size(65_536); |
| 123 | + let mut o_orderkey = BufferMut::<i64>::with_capacity(MAX_ROWS); |
| 124 | + for batch in arrow_iter { |
| 125 | + if o_orderkey.len() >= MAX_ROWS { |
| 126 | + break; |
| 127 | + } |
| 128 | + let remaining = MAX_ROWS - o_orderkey.len(); |
| 129 | + let take = batch.num_rows().min(remaining); |
| 130 | + let ok = batch |
| 131 | + .column_by_name("o_orderkey") |
| 132 | + .ok_or_else(|| vortex_error::vortex_err!("missing o_orderkey"))? |
| 133 | + .as_primitive::<Int64Type>(); |
| 134 | + extend_i64(&mut o_orderkey, ok, take); |
| 135 | + } |
| 136 | + out.push(DatasetColumn { |
| 137 | + dataset: orders_label, |
| 138 | + column: "o_orderkey", |
| 139 | + array: PrimitiveArray::new(o_orderkey.freeze(), Validity::NonNullable), |
| 140 | + }); |
| 141 | + } |
| 142 | + |
| 143 | + Ok(out) |
| 144 | +} |
| 145 | + |
| 146 | +fn extend_i64(buf: &mut BufferMut<i64>, arr: &arrow_array::PrimitiveArray<Int64Type>, take: usize) { |
| 147 | + let values: &[i64] = &arr.values()[..take.min(arr.len())]; |
| 148 | + buf.extend_from_slice(values); |
| 149 | +} |
| 150 | + |
| 151 | +fn extend_i32( |
| 152 | + buf: &mut BufferMut<i32>, |
| 153 | + arr: &arrow_array::PrimitiveArray<Date32Type>, |
| 154 | + take: usize, |
| 155 | +) { |
| 156 | + let values: &[i32] = &arr.values()[..take.min(arr.len())]; |
| 157 | + buf.extend_from_slice(values); |
| 158 | +} |
| 159 | + |
| 160 | +/// Cast a Decimal128 column to i64 by narrowing each i128 value. TPC-H |
| 161 | +/// decimals at scale (15, 2) are bounded well below `i64::MAX`, so the cast |
| 162 | +/// is lossless in practice. Out-of-range values would saturate; in the |
| 163 | +/// unlikely event of saturation we still produce a column the variants can |
| 164 | +/// round-trip. |
| 165 | +#[allow( |
| 166 | + clippy::cast_possible_truncation, |
| 167 | + reason = "TPC-H Decimal128(15,2) values fit in i64; saturation is acceptable" |
| 168 | +)] |
| 169 | +fn extend_decimal128_as_i64( |
| 170 | + buf: &mut BufferMut<i64>, |
| 171 | + arr: &arrow_array::PrimitiveArray<Decimal128Type>, |
| 172 | + take: usize, |
| 173 | +) { |
| 174 | + let n = take.min(arr.len()); |
| 175 | + let values: &[i128] = &arr.values()[..n]; |
| 176 | + buf.reserve(n); |
| 177 | + for &v in values { |
| 178 | + // The Decimal128(15, 2) range is roughly ±10^15, far inside i64. |
| 179 | + buf.push(v as i64); |
| 180 | + } |
| 181 | +} |
| 182 | + |
| 183 | +/// Widen an `i32` buffer to `i64` so all dataset columns share a single |
| 184 | +/// element type and the variant runner can dispatch uniformly. |
| 185 | +fn i32_buf_to_i64(src: BufferMut<i32>) -> BufferMut<i64> { |
| 186 | + let mut out = BufferMut::<i64>::with_capacity(src.len()); |
| 187 | + for v in src.as_slice() { |
| 188 | + out.push(*v as i64); |
| 189 | + } |
| 190 | + out |
| 191 | +} |
0 commit comments