Skip to content

Commit 58fd4cd

Browse files
committed
Normalize Execution
Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent 4545e5c commit 58fd4cd

2 files changed

Lines changed: 49 additions & 48 deletions

File tree

vortex-file/src/strategy.rs

Lines changed: 40 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use vortex_alp::ALP;
1010
// Compressed encodings from encoding crates
1111
// Canonical array encodings from vortex-array
1212
use vortex_alp::ALPRD;
13+
use vortex_array::ArrayId;
14+
use vortex_array::VTable;
1315
use vortex_array::arrays::Bool;
1416
use vortex_array::arrays::Chunked;
1517
use vortex_array::arrays::Constant;
@@ -26,8 +28,6 @@ use vortex_array::arrays::Struct;
2628
use vortex_array::arrays::VarBin;
2729
use vortex_array::arrays::VarBinView;
2830
use vortex_array::dtype::FieldPath;
29-
use vortex_array::session::ArrayRegistry;
30-
use vortex_array::session::ArraySession;
3131
use vortex_btrblocks::BtrBlocksCompressor;
3232
use vortex_btrblocks::BtrBlocksCompressorBuilder;
3333
use vortex_btrblocks::SchemeExt;
@@ -67,6 +67,7 @@ use vortex_btrblocks::{
6767
schemes::integer,
6868
schemes::string,
6969
};
70+
use vortex_utils::aliases::hash_set::HashSet;
7071
#[cfg(feature = "zstd")]
7172
use vortex_zstd::Zstd;
7273
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
@@ -78,49 +79,49 @@ const ONE_MEG: u64 = 1 << 20;
7879
///
7980
/// This includes all canonical encodings from vortex-array plus all compressed
8081
/// encodings from the various encoding crates.
81-
pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
82-
let session = ArraySession::empty();
82+
pub static ALLOWED_ENCODINGS: LazyLock<HashSet<ArrayId>> = LazyLock::new(|| {
83+
let mut allowed = HashSet::new();
8384

8485
// Canonical encodings from vortex-array
85-
session.register(Null);
86-
session.register(Bool);
87-
session.register(Primitive);
88-
session.register(Decimal);
89-
session.register(VarBin);
90-
session.register(VarBinView);
91-
session.register(List);
92-
session.register(ListView);
93-
session.register(FixedSizeList);
94-
session.register(Struct);
95-
session.register(Extension);
96-
session.register(Chunked);
97-
session.register(Constant);
98-
session.register(Masked);
99-
session.register(Dict);
86+
allowed.insert(Null.id());
87+
allowed.insert(Bool.id());
88+
allowed.insert(Primitive.id());
89+
allowed.insert(Decimal.id());
90+
allowed.insert(VarBin.id());
91+
allowed.insert(VarBinView.id());
92+
allowed.insert(List.id());
93+
allowed.insert(ListView.id());
94+
allowed.insert(FixedSizeList.id());
95+
allowed.insert(Struct.id());
96+
allowed.insert(Extension.id());
97+
allowed.insert(Chunked.id());
98+
allowed.insert(Constant.id());
99+
allowed.insert(Masked.id());
100+
allowed.insert(Dict.id());
100101

101102
// Compressed encodings from encoding crates
102-
session.register(ALP);
103-
session.register(ALPRD);
104-
session.register(BitPacked);
105-
session.register(ByteBool);
106-
session.register(DateTimeParts);
107-
session.register(DecimalByteParts);
108-
session.register(Delta);
109-
session.register(FoR);
110-
session.register(FSST);
111-
session.register(Pco);
112-
session.register(RLE);
113-
session.register(RunEnd);
114-
session.register(Sequence);
115-
session.register(Sparse);
116-
session.register(ZigZag);
103+
allowed.insert(ALP.id());
104+
allowed.insert(ALPRD.id());
105+
allowed.insert(BitPacked.id());
106+
allowed.insert(ByteBool.id());
107+
allowed.insert(DateTimeParts.id());
108+
allowed.insert(DecimalByteParts.id());
109+
allowed.insert(Delta.id());
110+
allowed.insert(FoR.id());
111+
allowed.insert(FSST.id());
112+
allowed.insert(Pco.id());
113+
allowed.insert(RLE.id());
114+
allowed.insert(RunEnd.id());
115+
allowed.insert(Sequence.id());
116+
allowed.insert(Sparse.id());
117+
allowed.insert(ZigZag.id());
117118

118119
#[cfg(feature = "zstd")]
119-
session.register(Zstd);
120+
allowed.insert(Zstd.id());
120121
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
121-
session.register(ZstdBuffers);
122+
allowed.insert(ZstdBuffers.id());
122123

123-
session.registry().clone()
124+
allowed
124125
});
125126

126127
/// Build a new [writer strategy][LayoutStrategy] to compress and reorganize chunks of a Vortex file.
@@ -132,7 +133,7 @@ pub struct WriteStrategyBuilder {
132133
compressor_override: Option<Arc<dyn CompressorPlugin>>,
133134
row_block_size: usize,
134135
field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
135-
allow_encodings: Option<ArrayRegistry>,
136+
allow_encodings: Option<HashSet<ArrayId>>,
136137
flat_strategy: Option<Arc<dyn LayoutStrategy>>,
137138
}
138139

@@ -169,7 +170,7 @@ impl WriteStrategyBuilder {
169170
}
170171

171172
/// Override the allowed array encodings for normalization.
172-
pub fn with_allow_encodings(mut self, allow_encodings: ArrayRegistry) -> Self {
173+
pub fn with_allow_encodings(mut self, allow_encodings: HashSet<ArrayId>) -> Self {
173174
self.allow_encodings = Some(allow_encodings);
174175
self
175176
}

vortex-layout/src/layouts/flat/writer.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use async_trait::async_trait;
55
use futures::StreamExt;
66
use vortex_array::ArrayContext;
7+
use vortex_array::ArrayId;
78
use vortex_array::dtype::DType;
89
use vortex_array::expr::stats::Precision;
910
use vortex_array::expr::stats::Stat;
@@ -15,7 +16,6 @@ use vortex_array::scalar::ScalarTruncation;
1516
use vortex_array::scalar::lower_bound;
1617
use vortex_array::scalar::upper_bound;
1718
use vortex_array::serde::SerializeOptions;
18-
use vortex_array::session::ArrayRegistry;
1919
use vortex_array::stats::StatsSetRef;
2020
use vortex_buffer::BufferString;
2121
use vortex_buffer::ByteBuffer;
@@ -24,6 +24,7 @@ use vortex_error::VortexResult;
2424
use vortex_error::vortex_bail;
2525
use vortex_io::runtime::Handle;
2626
use vortex_session::registry::ReadContext;
27+
use vortex_utils::aliases::hash_set::HashSet;
2728

2829
use crate::IntoLayout;
2930
use crate::LayoutRef;
@@ -42,7 +43,7 @@ pub struct FlatLayoutStrategy {
4243
pub max_variable_length_statistics_size: usize,
4344
/// Optional set of allowed array encodings for normalization.
4445
/// If None, then all are allowed.
45-
pub allowed_encodings: Option<ArrayRegistry>,
46+
pub allowed_encodings: Option<HashSet<ArrayId>>,
4647
}
4748

4849
impl Default for FlatLayoutStrategy {
@@ -69,7 +70,7 @@ impl FlatLayoutStrategy {
6970
}
7071

7172
/// Set the allowed array encodings for normalization.
72-
pub fn with_allow_encodings(mut self, allow_encodings: ArrayRegistry) -> Self {
73+
pub fn with_allow_encodings(mut self, allow_encodings: HashSet<ArrayId>) -> Self {
7374
self.allowed_encodings = Some(allow_encodings);
7475
self
7576
}
@@ -226,6 +227,7 @@ mod tests {
226227
use vortex_io::runtime::single::block_on;
227228
use vortex_mask::AllOr;
228229
use vortex_mask::Mask;
230+
use vortex_utils::aliases::hash_set::HashSet;
229231

230232
use crate::LayoutStrategy;
231233
use crate::layouts::flat::writer::FlatLayoutStrategy;
@@ -424,9 +426,8 @@ mod tests {
424426
let (layout, _segments) = {
425427
let segments = Arc::new(TestSegments::default());
426428
let (ptr, eof) = SequenceId::root().split();
427-
// Only allow primitive encodings - filter arrays should fail.
428-
let allowed = ArrayRegistry::default();
429-
allowed.register(Primitive::ID, Arc::new(Primitive) as DynVTableRef);
429+
// Only allow canonical encodings - filter arrays should fail.
430+
let allowed = HashSet::default();
430431
let layout = FlatLayoutStrategy::default()
431432
.with_allow_encodings(allowed)
432433
.write_stream(
@@ -466,9 +467,8 @@ mod tests {
466467
let segments = Arc::new(TestSegments::default());
467468
let (ptr, eof) = SequenceId::root().split();
468469
// Only allow primitive encodings - filter arrays should fail.
469-
let allowed = ArrayRegistry::default();
470-
allowed.register(Primitive.id(), Arc::new(Primitive) as DynVTableRef);
471-
allowed.register(Dict.id(), Arc::new(Dict) as DynVTableRef);
470+
let mut allowed = HashSet::default();
471+
allowed.insert(Dict.id());
472472
let layout = FlatLayoutStrategy::default()
473473
.with_allow_encodings(allowed)
474474
.write_stream(

0 commit comments

Comments
 (0)