forked from vortex-data/vortex
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlib.rs
More file actions
132 lines (113 loc) · 4.16 KB
/
lib.rs
File metadata and controls
132 lines (113 loc) · 4.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
//! Connectors to enable [DataFusion](https://docs.rs/datafusion/latest/datafusion/) to read [`Vortex`](https://docs.rs/crate/vortex/latest) data.
#![deny(missing_docs)]
use std::fmt::Debug;
use datafusion_common::stats::Precision as DFPrecision;
use vortex::expr::stats::Precision;
mod convert;
mod persistent;
pub mod vendor;
pub use convert::exprs::DefaultExpressionConvertor;
pub use convert::exprs::ExpressionConvertor;
pub use persistent::*;
/// Extension trait to convert our [`Precision`](vortex::stats::Precision) to Datafusion's [`Precision`](datafusion_common::stats::Precision)
trait PrecisionExt<T>
where
T: Debug + Clone + PartialEq + Eq + PartialOrd,
{
/// Convert `Precision` to the datafusion equivalent.
fn to_df(self) -> DFPrecision<T>;
}
impl<T> PrecisionExt<T> for Precision<T>
where
T: Debug + Clone + PartialEq + Eq + PartialOrd,
{
fn to_df(self) -> DFPrecision<T> {
match self {
Precision::Exact(v) => DFPrecision::Exact(v),
Precision::Inexact(v) => DFPrecision::Inexact(v),
}
}
}
impl<T> PrecisionExt<T> for Option<Precision<T>>
where
T: Debug + Clone + PartialEq + Eq + PartialOrd,
{
fn to_df(self) -> DFPrecision<T> {
match self {
Some(v) => v.to_df(),
None => DFPrecision::Absent,
}
}
}
#[cfg(test)]
mod common_tests {
use std::sync::Arc;
use std::sync::LazyLock;
use datafusion::arrow::array::RecordBatch;
use datafusion::datasource::provider::DefaultTableFactory;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use datafusion_common::GetExt;
use object_store::ObjectStore;
use object_store::memory::InMemory;
use url::Url;
use vortex::VortexSessionDefault;
use vortex::array::ArrayRef;
use vortex::array::arrow::FromArrowArray;
use vortex::file::WriteOptionsSessionExt;
use vortex::io::ObjectStoreWriter;
use vortex::io::VortexWrite;
use vortex::session::VortexSession;
use crate::VortexFormatFactory;
use crate::VortexOptions;
static VX_SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::default);
pub struct TestSessionContext {
pub store: Arc<dyn ObjectStore>,
pub session: SessionContext,
}
impl Default for TestSessionContext {
fn default() -> Self {
Self::new(false)
}
}
impl TestSessionContext {
/// Create a new test session context with the given projection pushdown setting.
pub fn new(projection_pushdown: bool) -> Self {
let store = Arc::new(InMemory::new());
let opts = VortexOptions {
projection_pushdown,
..Default::default()
};
let factory = Arc::new(VortexFormatFactory::new().with_options(opts));
let mut session_state_builder = SessionStateBuilder::new()
.with_default_features()
.with_table_factory(
factory.get_ext().to_uppercase(),
Arc::new(DefaultTableFactory::new()),
)
.with_object_store(&Url::try_from("file://").unwrap(), store.clone());
if let Some(file_formats) = session_state_builder.file_formats() {
file_formats.push(factory as _);
}
let session: SessionContext =
SessionContext::new_with_state(session_state_builder.build()).enable_url_table();
Self { store, session }
}
/// Write arrow data into a vortex file.
pub async fn write_arrow_batch<P>(&self, path: P, batch: &RecordBatch) -> anyhow::Result<()>
where
P: Into<object_store::path::Path>,
{
let array = ArrayRef::from_arrow(batch, false);
let mut write = ObjectStoreWriter::new(self.store.clone(), &path.into()).await?;
VX_SESSION
.write_options()
.write(&mut write, array.to_array_stream())
.await?;
write.shutdown().await?;
Ok(())
}
}
}