diff --git a/Cargo.lock b/Cargo.lock index 9a31e702e77..adcab4a2864 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10741,6 +10741,7 @@ dependencies = [ "rstest", "rustc-hash", "sketches-ddsketch 0.4.0", + "temp-env", "termtree", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index fc77bcbb9ce..a6429bf04e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -236,6 +236,7 @@ taffy = "0.9.0" take_mut = "0.2.2" tar = "0.4" target-lexicon = "0.13" +temp-env = "0.3" tempfile = "3" termtree = { version = "1.0" } test-with = "0.15" diff --git a/encodings/alp/src/lib.rs b/encodings/alp/src/lib.rs index 05ce75763b2..7da676d6c6c 100644 --- a/encodings/alp/src/lib.rs +++ b/encodings/alp/src/lib.rs @@ -22,7 +22,7 @@ use vortex_array::ArrayVTable; use vortex_array::aggregate_fn::AggregateFnVTable; use vortex_array::aggregate_fn::fns::nan_count::NanCount; use vortex_array::aggregate_fn::session::AggregateFnSessionExt; -use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES; +use vortex_array::arrays::patched::use_experimental_patches; use vortex_array::session::ArraySessionExt; use vortex_session::VortexSession; @@ -33,7 +33,7 @@ mod alp_rd; pub fn initialize(session: &VortexSession) { // If we're using the experimental Patched encoding, register a shim // for ALP with interior patches to decode as Patched array. - if *USE_EXPERIMENTAL_PATCHES { + if use_experimental_patches() { session.arrays().register(ALPPatchedPlugin); } else { session.arrays().register(ALP); diff --git a/encodings/fastlanes/src/lib.rs b/encodings/fastlanes/src/lib.rs index 217d05d73b8..bed17366676 100644 --- a/encodings/fastlanes/src/lib.rs +++ b/encodings/fastlanes/src/lib.rs @@ -29,7 +29,7 @@ use vortex_array::aggregate_fn::AggregateFnVTable; use vortex_array::aggregate_fn::fns::is_constant::IsConstant; use vortex_array::aggregate_fn::fns::is_sorted::IsSorted; use vortex_array::aggregate_fn::session::AggregateFnSessionExt; -use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES; +use vortex_array::arrays::patched::use_experimental_patches; use vortex_array::session::ArraySessionExt; use vortex_session::VortexSession; @@ -37,7 +37,7 @@ use vortex_session::VortexSession; pub fn initialize(session: &VortexSession) { // If we're using the experimental Patched encoding, register a shim // for BitPacked with interior patches decode as Patched array. - if *USE_EXPERIMENTAL_PATCHES { + if use_experimental_patches() { session.arrays().register(BitPackedPatchedPlugin); } else { session.arrays().register(BitPacked); diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index a849acdb20e..243c59cafa0 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -3466,8 +3466,6 @@ pub fn vortex_array::arrays::patched::PatchedSlotsView<'a>::fmt(&self, f: &mut c impl<'a> core::marker::Copy for vortex_array::arrays::patched::PatchedSlotsView<'a> -pub static vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES: std::sync::lazy_lock::LazyLock - pub trait vortex_array::arrays::patched::PatchedArrayExt: vortex_array::arrays::patched::PatchedArraySlotsExt pub fn vortex_array::arrays::patched::PatchedArrayExt::lane_range(&self, chunk: usize, lane: usize) -> vortex_error::VortexResult> @@ -3512,6 +3510,8 @@ pub fn T::patch_values(&self) -> &vortex_array::ArrayRef pub fn T::slots_view(&self) -> vortex_array::arrays::patched::PatchedSlotsView<'_> +pub fn vortex_array::arrays::patched::use_experimental_patches() -> bool + pub type vortex_array::arrays::patched::PatchedArray = vortex_array::Array pub mod vortex_array::arrays::primitive diff --git a/vortex-array/src/aggregate_fn/accumulator.rs b/vortex-array/src/aggregate_fn/accumulator.rs index 71ed56b3110..9c864eea6ca 100644 --- a/vortex-array/src/aggregate_fn/accumulator.rs +++ b/vortex-array/src/aggregate_fn/accumulator.rs @@ -14,7 +14,7 @@ use crate::aggregate_fn::AggregateFnRef; use crate::aggregate_fn::AggregateFnVTable; use crate::aggregate_fn::session::AggregateFnSessionExt; use crate::dtype::DType; -use crate::executor::MAX_ITERATIONS; +use crate::executor::max_iterations; use crate::scalar::Scalar; /// Reference-counted type-erased accumulator. @@ -108,7 +108,7 @@ impl DynAccumulator for Accumulator { let kernels = &session.aggregate_fns().kernels; let mut batch = batch.clone(); - for _ in 0..*MAX_ITERATIONS { + for _ in 0..max_iterations() { if batch.is::() { break; } diff --git a/vortex-array/src/aggregate_fn/accumulator_grouped.rs b/vortex-array/src/aggregate_fn/accumulator_grouped.rs index 24f8f0157ec..665dcec0747 100644 --- a/vortex-array/src/aggregate_fn/accumulator_grouped.rs +++ b/vortex-array/src/aggregate_fn/accumulator_grouped.rs @@ -32,7 +32,7 @@ use crate::builders::builder_with_capacity; use crate::builtins::ArrayBuiltins; use crate::dtype::DType; use crate::dtype::IntegerPType; -use crate::executor::MAX_ITERATIONS; +use crate::executor::max_iterations; use crate::match_each_integer_ptype; /// Reference-counted type-erased grouped accumulator. @@ -165,7 +165,7 @@ impl GroupedAccumulator { let session = ctx.session().clone(); let kernels = &session.aggregate_fns().grouped_kernels; - for _ in 0..*MAX_ITERATIONS { + for _ in 0..max_iterations() { if elements.is::() { break; } diff --git a/vortex-array/src/arrays/patched/mod.rs b/vortex-array/src/arrays/patched/mod.rs index b0453c68677..0ba25ab7929 100644 --- a/vortex-array/src/arrays/patched/mod.rs +++ b/vortex-array/src/arrays/patched/mod.rs @@ -108,5 +108,8 @@ const fn patch_lanes() -> usize { /// array, and eliminate the interior patches. /// /// The builtin compressor will also generate Patched arrays. -pub static USE_EXPERIMENTAL_PATCHES: LazyLock = - LazyLock::new(|| env::var("VORTEX_EXPERIMENTAL_PATCHED_ARRAY").is_ok()); +pub fn use_experimental_patches() -> bool { + static USE_EXPERIMENTAL_PATCHES: LazyLock = + LazyLock::new(|| env::var("VORTEX_EXPERIMENTAL_PATCHED_ARRAY").is_ok_and(|v| v == "1")); + *USE_EXPERIMENTAL_PATCHES +} diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 6233295958b..74a013bf5bb 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -38,18 +38,21 @@ use crate::memory::HostAllocatorRef; use crate::memory::MemorySessionExt; use crate::optimizer::ArrayOptimizer; -/// Maximum number of iterations to attempt when executing an array before giving up and returning -/// an error. -pub(crate) static MAX_ITERATIONS: LazyLock = - LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") { - Ok(val) => val - .parse::() - .unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")), - Err(VarError::NotPresent) => 128, - Err(VarError::NotUnicode(_)) => { - vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string") - } - }); +/// Returns the maximum number of iterations to attempt when executing an array before giving up and returning +/// an error, can be by the `VORTEX_MAX_ITERATIONS` env variables, otherwise defaults to 128. +pub(crate) fn max_iterations() -> usize { + static MAX_ITERATIONS: LazyLock = + LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") { + Ok(val) => val.parse::().unwrap_or_else(|e| { + vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}") + }), + Err(VarError::NotPresent) => 128, + Err(VarError::NotUnicode(_)) => { + vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string") + } + }); + *MAX_ITERATIONS +} /// Marker trait for types that an [`ArrayRef`] can be executed into. /// @@ -95,22 +98,11 @@ impl ArrayRef { /// For safety, we will error when the number of execution iterations reaches a configurable /// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`). pub fn execute_until(self, ctx: &mut ExecutionCtx) -> VortexResult { - static MAX_ITERATIONS: LazyLock = - LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") { - Ok(val) => val.parse::().unwrap_or_else(|e| { - vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}") - }), - Err(VarError::NotPresent) => 128, - Err(VarError::NotUnicode(_)) => { - vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string") - } - }); - let mut current = self.optimize()?; // Stack frames: (parent, slot_idx, done_predicate_for_slot) let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new(); - for _ in 0..*MAX_ITERATIONS { + for _ in 0..max_iterations() { // Check for termination: use the stack frame's done predicate, or the root matcher. let is_done = stack .last() @@ -179,7 +171,7 @@ impl ArrayRef { vortex_bail!( "Exceeded maximum execution iterations ({}) while executing array", - *MAX_ITERATIONS, + max_iterations(), ) } } diff --git a/vortex-btrblocks/src/schemes/float.rs b/vortex-btrblocks/src/schemes/float.rs index 6c204c7263a..f6d2389f08b 100644 --- a/vortex-btrblocks/src/schemes/float.rs +++ b/vortex-btrblocks/src/schemes/float.rs @@ -17,7 +17,7 @@ use vortex_array::LEGACY_SESSION; use vortex_array::ToCanonical; use vortex_array::VortexSessionExecute; use vortex_array::arrays::Patched; -use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES; +use vortex_array::arrays::patched::use_experimental_patches; use vortex_array::arrays::primitive::PrimitiveArrayExt; use vortex_array::dtype::PType; use vortex_compressor::estimate::CompressionEstimate; @@ -120,7 +120,7 @@ impl Scheme for ALPScheme { let alp_stats = alp_encoded.as_array().statistics().to_owned(); let exponents = alp_encoded.exponents(); - if *USE_EXPERIMENTAL_PATCHES { + if use_experimental_patches() { let patches = alp_encoded.patches(); // Create ALP array without interior patches. diff --git a/vortex-btrblocks/src/schemes/integer.rs b/vortex-btrblocks/src/schemes/integer.rs index 47fc52225aa..cb765fbec5a 100644 --- a/vortex-btrblocks/src/schemes/integer.rs +++ b/vortex-btrblocks/src/schemes/integer.rs @@ -11,7 +11,7 @@ use vortex_array::ToCanonical; use vortex_array::VortexSessionExecute; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::Patched; -use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES; +use vortex_array::arrays::patched::use_experimental_patches; use vortex_array::arrays::primitive::PrimitiveArrayExt; use vortex_array::scalar::Scalar; use vortex_compressor::builtins::FloatDictScheme; @@ -348,7 +348,7 @@ impl Scheme for BitPackingScheme { let ptype = packed.dtype().as_ptype(); let mut parts = BitPacked::into_parts(packed); - let array = if *USE_EXPERIMENTAL_PATCHES { + let array = if use_experimental_patches() { let patches = parts.patches.take(); // Transpose patches into G-ALP style PatchedArray, wrapping an inner BitPackedArray. let array = BitPacked::try_new( diff --git a/vortex-error/Cargo.toml b/vortex-error/Cargo.toml index 2856a2724ae..fd85dd770dd 100644 --- a/vortex-error/Cargo.toml +++ b/vortex-error/Cargo.toml @@ -29,7 +29,7 @@ tokio = { workspace = true, features = ["rt"], optional = true } [dev-dependencies] serial_test = { version = "3.3.1" } -temp-env = "0.3" +temp-env = { workspace = true } [lints] workspace = true diff --git a/vortex-error/src/lib.rs b/vortex-error/src/lib.rs index 156ab574706..e48dafcf6f9 100644 --- a/vortex-error/src/lib.rs +++ b/vortex-error/src/lib.rs @@ -20,6 +20,7 @@ use std::io; use std::num::TryFromIntError; use std::ops::Deref; use std::sync::Arc; +use std::sync::LazyLock; /// A string that can be used as an error message. #[derive(Debug)] @@ -38,7 +39,7 @@ where reason = "intentionally panic in debug mode when VORTEX_PANIC_ON_ERR is set" )] fn from(msg: T) -> Self { - if env::var("VORTEX_PANIC_ON_ERR").as_deref().unwrap_or("") == "1" { + if panic_on_err() { panic!("{}\nBacktrace:\n{}", msg.into(), Backtrace::capture()); } else { Self(msg.into()) @@ -46,6 +47,12 @@ where } } +fn panic_on_err() -> bool { + static PANIC_ON_ERR: LazyLock = + LazyLock::new(|| env::var("VORTEX_PANIC_ON_ERR").is_ok_and(|v| v == "1")); + *PANIC_ON_ERR +} + impl AsRef for ErrString { fn as_ref(&self) -> &str { &self.0 diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index 0a3e31f32e3..41d9e3c2ad2 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -26,7 +26,7 @@ use vortex_array::arrays::Primitive; use vortex_array::arrays::Struct; use vortex_array::arrays::VarBin; use vortex_array::arrays::VarBinView; -use vortex_array::arrays::patched::USE_EXPERIMENTAL_PATCHES; +use vortex_array::arrays::patched::use_experimental_patches; use vortex_array::dtype::FieldPath; use vortex_btrblocks::BtrBlocksCompressorBuilder; use vortex_btrblocks::SchemeExt; @@ -90,7 +90,7 @@ pub static ALLOWED_ENCODINGS: LazyLock> = LazyLock::new(|| { allowed.insert(Masked.id()); allowed.insert(Dict.id()); - if *USE_EXPERIMENTAL_PATCHES { + if use_experimental_patches() { allowed.insert(Patched.id()); } diff --git a/vortex-layout/Cargo.toml b/vortex-layout/Cargo.toml index 8bbccf09b91..30a0953444a 100644 --- a/vortex-layout/Cargo.toml +++ b/vortex-layout/Cargo.toml @@ -55,10 +55,10 @@ vortex-utils = { workspace = true, features = ["dashmap"] } [dev-dependencies] futures = { workspace = true, features = ["executor"] } rstest = { workspace = true } +temp-env = { workspace = true } tokio = { workspace = true, features = ["rt", "macros"] } vortex-array = { path = "../vortex-array", features = ["_test-harness"] } vortex-io = { path = "../vortex-io", features = ["tokio"] } -vortex-utils = { workspace = true, features = ["_test-harness"] } [features] _test-harness = [] diff --git a/vortex-layout/src/display.rs b/vortex-layout/src/display.rs index 39605dfb5d1..5acf685e946 100644 --- a/vortex-layout/src/display.rs +++ b/vortex-layout/src/display.rs @@ -231,7 +231,6 @@ mod tests { use vortex_buffer::buffer; use vortex_io::runtime::single::block_on; use vortex_io::session::RuntimeSessionExt; - use vortex_utils::env::EnvVarGuard; use crate::IntoLayout; use crate::OwnedLayoutChildren; @@ -248,231 +247,244 @@ mod tests { /// Test display_tree with inline array_tree metadata (no segment source needed). #[test] fn test_display_tree_inline_array_tree() { - let _guard = EnvVarGuard::set("FLAT_LAYOUT_INLINE_ARRAY_NODE", "1"); - block_on(|handle| async move { - let session = SESSION.clone().with_handle(handle); - let ctx = ArrayContext::empty(); - let segments = Arc::new(TestSegments::default()); - - // Create nullable i64 array (2 buffers: data + validity) - let (ptr1, eof1) = SequenceId::root().split(); - let mut validity_builder = BitBufferMut::with_capacity(5); - for b in [true, false, true, true, false] { - validity_builder.append(b); - } - let validity = Validity::Array( - BoolArray::new(validity_builder.freeze(), Validity::NonNullable).into_array(), - ); - let array1 = PrimitiveArray::new(buffer![1i64, 2, 3, 4, 5], validity); - let layout1 = FlatLayoutStrategy::default() - .write_stream( - ctx.clone(), - Arc::::clone(&segments), - array1.into_array().to_array_stream().sequenced(ptr1), - eof1, - &session, - ) - .await - .unwrap(); - - // Create utf8 array (2 buffers: views + data) - let (ptr2, eof2) = SequenceId::root().split(); - let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(NonNullable), 5); - for s in [ - "hello world this is long", - "another long string", - "short", - "medium str", - "x", - ] { - builder.append_value(s); - } - let layout2 = FlatLayoutStrategy::default() - .write_stream( - ctx.clone(), - Arc::::clone(&segments), - builder - .finish() - .into_array() - .to_array_stream() - .sequenced(ptr2), - eof2, - &session, - ) - .await - .unwrap(); - - // Create struct layout - let struct_layout = StructLayout::new( - 5, - DType::Struct( - StructFields::new( - vec![FieldName::from("numbers"), FieldName::from("strings")].into(), + // LazyLock caches the env var on first read, so only nextest (separate processes) can isolate it. + if std::env::var("NEXTEST_RUN_ID").is_ok() { + temp_env::with_var("FLAT_LAYOUT_INLINE_ARRAY_NODE", Some("1"), || { + block_on(|handle| async move { + let session = SESSION.clone().with_handle(handle); + let ctx = ArrayContext::empty(); + let segments = Arc::new(TestSegments::default()); + + // Create nullable i64 array (2 buffers: data + validity) + let (ptr1, eof1) = SequenceId::root().split(); + let mut validity_builder = BitBufferMut::with_capacity(5); + for b in [true, false, true, true, false] { + validity_builder.append(b); + } + let validity = Validity::Array( + BoolArray::new(validity_builder.freeze(), Validity::NonNullable) + .into_array(), + ); + let array1 = PrimitiveArray::new(buffer![1i64, 2, 3, 4, 5], validity); + let layout1 = FlatLayoutStrategy::default() + .write_stream( + ctx.clone(), + Arc::::clone(&segments), + array1.into_array().to_array_stream().sequenced(ptr1), + eof1, + &session, + ) + .await + .unwrap(); + + // Create utf8 array (2 buffers: views + data) + let (ptr2, eof2) = SequenceId::root().split(); + let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(NonNullable), 5); + for s in [ + "hello world this is long", + "another long string", + "short", + "medium str", + "x", + ] { + builder.append_value(s); + } + let layout2 = FlatLayoutStrategy::default() + .write_stream( + ctx.clone(), + Arc::::clone(&segments), + builder + .finish() + .into_array() + .to_array_stream() + .sequenced(ptr2), + eof2, + &session, + ) + .await + .unwrap(); + + // Create struct layout + let struct_layout = StructLayout::new( + 5, + DType::Struct( + StructFields::new( + vec![FieldName::from("numbers"), FieldName::from("strings")].into(), + vec![ + DType::Primitive(PType::I64, Nullability::Nullable), + DType::Utf8(NonNullable), + ], + ), + NonNullable, + ), vec![ - DType::Primitive(PType::I64, Nullability::Nullable), - DType::Utf8(NonNullable), + ChunkedLayout::new( + 5, + DType::Primitive(PType::I64, Nullability::Nullable), + OwnedLayoutChildren::layout_children(vec![layout1]), + ) + .into_layout(), + layout2, ], - ), - NonNullable, - ), - vec![ - ChunkedLayout::new( - 5, - DType::Primitive(PType::I64, Nullability::Nullable), - OwnedLayoutChildren::layout_children(vec![layout1]), ) - .into_layout(), - layout2, - ], - ) - .into_layout(); + .into_layout(); - let output = format!("{}", struct_layout.display_tree_verbose(true)); + let output = format!("{}", struct_layout.display_tree_verbose(true)); - let expected = "\ + let expected = "\ vortex.struct, dtype: {numbers=i64?, strings=utf8}, children: 2, rows: 5 ├── numbers: vortex.chunked, dtype: i64?, children: 1, rows: 5 │ └── [0]: vortex.flat, dtype: i64?, metadata: 171 bytes, rows: 5, segment 0, buffers=[40B, 1B], total=41B └── strings: vortex.flat, dtype: utf8, metadata: 110 bytes, rows: 5, segment 1, buffers=[43B, 80B], total=123B "; - assert_eq!(output, expected); - }) + assert_eq!(output, expected); + }) + }) + } } /// Test display_tree_with_segments using async segment source to fetch buffer sizes. #[test] fn test_display_tree_with_segment_source() { - // Ensure inline array node is disabled for this test - let _guard = EnvVarGuard::remove("FLAT_LAYOUT_INLINE_ARRAY_NODE"); - block_on(|handle| async move { - let session = SESSION.clone().with_handle(handle); - let ctx = ArrayContext::empty(); - let segments = Arc::new(TestSegments::default()); - - // Create simple i32 array - let (ptr1, eof1) = SequenceId::root().split(); - let array1 = PrimitiveArray::new(buffer![1i32, 2, 3, 4, 5], Validity::NonNullable); - let layout1 = FlatLayoutStrategy::default() - .write_stream( - ctx.clone(), - Arc::::clone(&segments), - array1.into_array().to_array_stream().sequenced(ptr1), - eof1, - &session, - ) - .await - .unwrap(); - - // Create another i32 array - let (ptr2, eof2) = SequenceId::root().split(); - let array2 = PrimitiveArray::new(buffer![6i32, 7, 8, 9, 10], Validity::NonNullable); - let layout2 = FlatLayoutStrategy::default() - .write_stream( - ctx.clone(), - Arc::::clone(&segments), - array2.into_array().to_array_stream().sequenced(ptr2), - eof2, - &session, - ) - .await - .unwrap(); - - // Create chunked layout - let chunked_layout = ChunkedLayout::new( - 10, - DType::Primitive(PType::I32, NonNullable), - OwnedLayoutChildren::layout_children(vec![layout1, layout2]), - ) - .into_layout(); - - let output = chunked_layout - .display_tree_with_segments(segments) - .await - .unwrap(); - - let expected = "\ + if std::env::var("NEXTEST_RUN_ID").is_ok() { + temp_env::with_var("FLAT_LAYOUT_INLINE_ARRAY_NODE", None::<&str>, || { + block_on(|handle| async move { + let session = SESSION.clone().with_handle(handle); + let ctx = ArrayContext::empty(); + let segments = Arc::new(TestSegments::default()); + + // Create simple i32 array + let (ptr1, eof1) = SequenceId::root().split(); + let array1 = + PrimitiveArray::new(buffer![1i32, 2, 3, 4, 5], Validity::NonNullable); + let layout1 = FlatLayoutStrategy::default() + .write_stream( + ctx.clone(), + Arc::::clone(&segments), + array1.into_array().to_array_stream().sequenced(ptr1), + eof1, + &session, + ) + .await + .unwrap(); + + // Create another i32 array + let (ptr2, eof2) = SequenceId::root().split(); + let array2 = + PrimitiveArray::new(buffer![6i32, 7, 8, 9, 10], Validity::NonNullable); + let layout2 = FlatLayoutStrategy::default() + .write_stream( + ctx.clone(), + Arc::::clone(&segments), + array2.into_array().to_array_stream().sequenced(ptr2), + eof2, + &session, + ) + .await + .unwrap(); + + // Create chunked layout + let chunked_layout = ChunkedLayout::new( + 10, + DType::Primitive(PType::I32, NonNullable), + OwnedLayoutChildren::layout_children(vec![layout1, layout2]), + ) + .into_layout(); + + let output = chunked_layout + .display_tree_with_segments(segments) + .await + .unwrap(); + + let expected = "\ vortex.chunked, dtype: i32, children: 2, rows: 10 ├── [0]: vortex.flat, dtype: i32, rows: 5, segment 0, buffers=[20B], total=20B └── [1]: vortex.flat, dtype: i32, rows: 5, segment 1, buffers=[20B], total=20B "; - assert_eq!(output.to_string(), expected); - }) + assert_eq!(output.to_string(), expected); + }) + }) + } } /// Test display_array_tree with inline array node metadata. #[test] fn test_display_array_tree_with_inline_node() { - let _guard = EnvVarGuard::set("FLAT_LAYOUT_INLINE_ARRAY_NODE", "1"); - - let ctx = ArrayContext::empty(); - let segments = Arc::new(TestSegments::default()); - let (ptr, eof) = SequenceId::root().split(); - - // Create a simple primitive array - let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4, 5], Validity::AllValid); - let layout = block_on(|handle| async { - let session = SESSION.clone().with_handle(handle); - FlatLayoutStrategy::default() - .write_stream( - ctx.clone(), - Arc::::clone(&segments), - array.into_array().to_array_stream().sequenced(ptr), - eof, - &session, - ) - .await - .unwrap() - }); - - let flat_layout = layout.as_::(); - - let array_tree = flat_layout - .array_tree() - .expect("array_tree should be populated when FLAT_LAYOUT_INLINE_ARRAY_NODE is set"); - - let parts = SerializedArray::from_array_tree(array_tree.as_ref().to_vec()) - .expect("should parse array_tree"); - assert_eq!(parts.buffer_lengths(), vec![20]); // 5 i32 values = 20 bytes - - assert_eq!( - layout.display_tree().to_string(), - "\ + if std::env::var("NEXTEST_RUN_ID").is_ok() { + temp_env::with_var("FLAT_LAYOUT_INLINE_ARRAY_NODE", Some("1"), || { + let ctx = ArrayContext::empty(); + let segments = Arc::new(TestSegments::default()); + let (ptr, eof) = SequenceId::root().split(); + + // Create a simple primitive array + let array = PrimitiveArray::new(buffer![1i32, 2, 3, 4, 5], Validity::AllValid); + let layout = block_on(|handle| async { + let session = SESSION.clone().with_handle(handle); + FlatLayoutStrategy::default() + .write_stream( + ctx.clone(), + Arc::::clone(&segments), + array.into_array().to_array_stream().sequenced(ptr), + eof, + &session, + ) + .await + .unwrap() + }); + + let flat_layout = layout.as_::(); + + let array_tree = flat_layout.array_tree().expect( + "array_tree should be populated when FLAT_LAYOUT_INLINE_ARRAY_NODE is set", + ); + + let parts = SerializedArray::from_array_tree(array_tree.as_ref().to_vec()) + .expect("should parse array_tree"); + assert_eq!(parts.buffer_lengths(), vec![20]); // 5 i32 values = 20 bytes + + assert_eq!( + layout.display_tree().to_string(), + "\ vortex.flat, dtype: i32?, segment 0, buffers=[20B], total=20B " - ); + ); + }) + } } /// Test display_tree without inline array node (shows segment ID). #[test] fn test_display_tree_without_inline_node() { - let _guard = EnvVarGuard::set("FLAT_LAYOUT_INLINE_ARRAY_NODE", "1"); - - let ctx = ArrayContext::empty(); - let segments = Arc::new(TestSegments::default()); - let (ptr, eof) = SequenceId::root().split(); - - // Create a simple primitive array - let array = PrimitiveArray::new(buffer![10i64, 20, 30], Validity::NonNullable); - let layout = block_on(|handle| async { - let session = SESSION.clone().with_handle(handle); - FlatLayoutStrategy::default() - .write_stream( - ctx, - Arc::::clone(&segments), - array.into_array().to_array_stream().sequenced(ptr), - eof, - &session, - ) - .await - .unwrap() - }); - - // Test display_tree exact output (with inline array_tree enabled by env var from other test) - assert_eq!( - layout.display_tree().to_string(), - "\ + if std::env::var("NEXTEST_RUN_ID").is_ok() { + temp_env::with_var("FLAT_LAYOUT_INLINE_ARRAY_NODE", Some("1"), || { + let ctx = ArrayContext::empty(); + let segments = Arc::new(TestSegments::default()); + let (ptr, eof) = SequenceId::root().split(); + + // Create a simple primitive array + let array = PrimitiveArray::new(buffer![10i64, 20, 30], Validity::NonNullable); + let layout = block_on(|handle| async { + let session = SESSION.clone().with_handle(handle); + FlatLayoutStrategy::default() + .write_stream( + ctx, + Arc::::clone(&segments), + array.into_array().to_array_stream().sequenced(ptr), + eof, + &session, + ) + .await + .unwrap() + }); + + // Test display_tree exact output (with inline array_tree enabled by env var from other test) + assert_eq!( + layout.display_tree().to_string(), + "\ vortex.flat, dtype: i64, segment 0, buffers=[24B], total=24B " - ); + ); + }) + } } } diff --git a/vortex-layout/src/layouts/flat/mod.rs b/vortex-layout/src/layouts/flat/mod.rs index 186ada897a7..b167e633170 100644 --- a/vortex-layout/src/layouts/flat/mod.rs +++ b/vortex-layout/src/layouts/flat/mod.rs @@ -6,6 +6,7 @@ pub mod writer; use std::env; use std::sync::Arc; +use std::sync::LazyLock; use vortex_array::DeserializeMetadata; use vortex_array::ProstMetadata; @@ -30,9 +31,10 @@ use crate::segments::SegmentSource; use crate::vtable; /// Check if inline array node is enabled. -/// This checks the env var each time to allow tests to toggle the behavior. pub(super) fn flat_layout_inline_array_node() -> bool { - env::var("FLAT_LAYOUT_INLINE_ARRAY_NODE").is_ok() + static FLAT_LAYOUT_INLINE_ARRAY_NODE: LazyLock = + LazyLock::new(|| env::var("FLAT_LAYOUT_INLINE_ARRAY_NODE").is_ok_and(|v| v == "1")); + *FLAT_LAYOUT_INLINE_ARRAY_NODE } vtable!(Flat); diff --git a/vortex-utils/Cargo.toml b/vortex-utils/Cargo.toml index 0a1dd1ad0c8..66b2576f98d 100644 --- a/vortex-utils/Cargo.toml +++ b/vortex-utils/Cargo.toml @@ -24,4 +24,3 @@ workspace = true [features] dyn-traits = [] -_test-harness = ["dashmap", "parking_lot"] diff --git a/vortex-utils/src/env.rs b/vortex-utils/src/env.rs deleted file mode 100644 index 19e78e1f12b..00000000000 --- a/vortex-utils/src/env.rs +++ /dev/null @@ -1,197 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -//! Environment variable utilities for testing. - -use dashmap::DashMap; -use parking_lot::Mutex; -use vortex_error::vortex_panic; - -/// Global registry of locks per environment variable key. -/// -/// Each mutex is lazily created and leaked to get a 'static lifetime. -/// This is acceptable for test utilities that live for the process duration. -static ENV_LOCKS: std::sync::LazyLock>> = - std::sync::LazyLock::new(DashMap::new); - -/// Get or create a static mutex for the given key. -fn get_or_create_lock(key: &'static str) -> &'static Mutex<()> { - *ENV_LOCKS - .entry(key) - .or_insert_with(|| Box::leak(Box::new(Mutex::new(())))) -} - -/// RAII guard to set/remove an environment variable for the duration of a scope. -/// -/// Removes the variable when dropped, ensuring test isolation. -/// -/// This guard holds a mutex lock for the specific environment variable key, -/// ensuring that only one guard can exist for a given key at a time. This -/// prevents tests from accidentally having overlapping guards for the same -/// env var. -/// -/// # Example -/// -/// ``` -/// use vortex_utils::env::EnvVarGuard; -/// -/// // Set an env var for the duration of this scope -/// let _guard = EnvVarGuard::set("MY_TEST_VAR", "1"); -/// assert_eq!(std::env::var("MY_TEST_VAR").ok(), Some("1".to_string())); -/// -/// // Or remove an env var -/// let _guard2 = EnvVarGuard::remove("OTHER_VAR"); -/// assert!(std::env::var("OTHER_VAR").is_err()); -/// ``` -/// -/// # Panics -/// -/// Panics if a guard already exists for the same environment variable key -/// (detected via mutex lock contention). -/// -/// # Safety -/// -/// Environment variable modification is inherently unsafe in multi-threaded contexts. -/// This guard is intended for use in tests that are run serially or where env var -/// races are acceptable. The per-key locking ensures that the same env var isn't -/// modified concurrently by multiple guards. -pub struct EnvVarGuard { - key: &'static str, - /// We store this to ensure the mutex stays locked for our lifetime. - /// The () is just a dummy value - we only care about the lock. - #[expect(dead_code)] - lock_guard: parking_lot::MutexGuard<'static, ()>, -} - -/// Timeout for waiting on an env var lock. -const LOCK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); - -impl EnvVarGuard { - /// Acquire the lock for this key, waiting up to 10 seconds. - /// - /// If another guard holds the lock, this will wait for it to be released. - /// If the lock isn't released within 10 seconds, this panics to avoid deadlocks. - fn acquire_lock(key: &'static str) -> parking_lot::MutexGuard<'static, ()> { - let mutex = get_or_create_lock(key); - match mutex.try_lock_for(LOCK_TIMEOUT) { - Some(guard) => guard, - None => vortex_panic!( - "EnvVarGuard: timed out after {LOCK_TIMEOUT:?} waiting for environment variable '{key}'. \ - This likely indicates a deadlock - ensure guards for the same key are properly scoped \ - taken in lexicographical order and dropped before acquiring a new one." - ), - } - } - - /// Set an environment variable for the duration of this guard's lifetime. - pub fn set(key: &'static str, value: &str) -> Self { - let lock_guard = Self::acquire_lock(key); - - // SAFETY: We hold an exclusive lock for this key. - unsafe { - std::env::set_var(key, value); - } - - Self { key, lock_guard } - } - - /// Remove an environment variable for the duration of this guard's lifetime. - pub fn remove(key: &'static str) -> Self { - let lock_guard = Self::acquire_lock(key); - - // SAFETY: We hold an exclusive lock for this key. - unsafe { - std::env::remove_var(key); - } - - Self { key, lock_guard } - } -} - -impl Drop for EnvVarGuard { - fn drop(&mut self) { - // SAFETY: We hold an exclusive lock for this key. - unsafe { - std::env::remove_var(self.key); - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_set_and_remove() { - let key = "VORTEX_TEST_ENV_VAR_SET"; - - // Initially not set - assert!(std::env::var(key).is_err()); - - { - let _guard = EnvVarGuard::set(key, "test_value"); - assert_eq!(std::env::var(key).unwrap(), "test_value"); - } - - // After guard drops, var is removed - assert!(std::env::var(key).is_err()); - } - - #[test] - fn test_remove() { - let key = "VORTEX_TEST_ENV_VAR_REMOVE"; - - // Set it first - unsafe { - std::env::set_var(key, "initial"); - } - - { - let _guard = EnvVarGuard::remove(key); - assert!(std::env::var(key).is_err()); - } - - // After guard drops, var is still removed - assert!(std::env::var(key).is_err()); - } - - /// Test that a second guard waits for the first to be released. - #[test] - fn test_second_guard_waits_for_first() { - use std::sync::Arc; - use std::sync::atomic::AtomicBool; - use std::sync::atomic::Ordering; - use std::thread; - use std::time::Duration; - - let key = "VORTEX_TEST_ENV_VAR_WAIT"; - let second_acquired = Arc::new(AtomicBool::new(false)); - let second_acquired_clone = Arc::clone(&second_acquired); - - // First guard in main thread - let _guard1 = EnvVarGuard::set(key, "first"); - assert_eq!(std::env::var(key).unwrap(), "first"); - - // Spawn thread that will wait for the lock - let handle = thread::spawn(move || { - let _guard2 = EnvVarGuard::set(key, "second"); - second_acquired_clone.store(true, Ordering::SeqCst); - assert_eq!(std::env::var(key).unwrap(), "second"); - }); - - // Give the thread time to start waiting - thread::sleep(Duration::from_millis(50)); - - // Second guard should NOT have acquired yet (still waiting) - assert!(!second_acquired.load(Ordering::SeqCst)); - - // Drop the first guard - this should allow the second to proceed - drop(_guard1); - - // Wait for second thread to complete - handle.join().unwrap(); - - // Now the second guard should have acquired and set the value - assert!(second_acquired.load(Ordering::SeqCst)); - } -} diff --git a/vortex-utils/src/lib.rs b/vortex-utils/src/lib.rs index ff1610ddead..cc650072eab 100644 --- a/vortex-utils/src/lib.rs +++ b/vortex-utils/src/lib.rs @@ -9,6 +9,4 @@ pub mod aliases; pub mod debug_with; #[cfg(feature = "dyn-traits")] pub mod dyn_traits; -#[cfg(feature = "_test-harness")] -pub mod env; pub mod iter;