Skip to content

Commit 629bed6

Browse files
committed
Refresh PyVortex to be more Arrow focussed
Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent ce52b71 commit 629bed6

23 files changed

Lines changed: 618 additions & 59 deletions

File tree

docs/api/python/expr.rst

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ the following expression represents the set of rows for which the `age` column l
77

88
.. doctest::
99

10-
>>> import vortex.expr
11-
>>> age = vortex.expr.column("age")
10+
>>> import vortex as vx
11+
>>> age = vx.col("age")
1212
>>> (23 > age) & (age < 55) # doctest: +SKIP
1313

1414
.. autosummary::
1515
:nosignatures:
1616

1717
~vortex.expr.column
18+
~vortex.expr.col
19+
~vortex.expr.plan
1820
~vortex.expr.Expr
1921

2022
.. raw:: html
@@ -23,10 +25,20 @@ the following expression represents the set of rows for which the `age` column l
2325

2426
.. autofunction:: vortex.expr.column
2527

28+
.. autofunction:: vortex.expr.col
29+
2630
.. autofunction:: vortex.expr.not_
2731

2832
.. autofunction:: vortex.expr.and_
2933

34+
.. autofunction:: vortex.expr.cast
35+
36+
.. autofunction:: vortex.expr.is_null
37+
38+
.. autofunction:: vortex.expr.is_not_null
39+
40+
.. autofunction:: vortex.expr.plan
41+
3042
.. autofunction:: vortex.expr.root
3143

3244
.. autofunction:: vortex.expr.literal
@@ -64,4 +76,3 @@ the following expression represents the set of rows for which the `age` column l
6476
... .to_pylist()
6577
... )
6678
[{'x': 1, 'y': {'yy': 'a'}}]
67-

docs/getting-started/python.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ Use :func:`~vortex.open` to open and read the Vortex array from disk:
5656
.. doctest::
5757

5858
>>> import vortex as vx
59-
>>> cvtx = vx.open("example.vortex").scan().read_all() # doctest: +SKIP
59+
>>> table = vx.open("example.vortex").to_table() # doctest: +SKIP
6060

6161

6262
Vortex is architected to achieve fast random access, in many cases hundreds of times faster

docs/project/changelog/index.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,10 @@ For older releases, see the full [release history on GitHub](https://github.com/
66
---
77
maxdepth: 1
88
---
9+
v0.59.0
10+
v0.58.0
11+
v0.57.2
12+
v0.57.1
13+
v0.57.0
14+
v0.56.0
915
```

docs/user-guide/pyarrow.md

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,50 @@ Use {func}`~vortex.open` to lazily open a Vortex file:
2929

3030
### As an Arrow Table
3131

32-
{meth}`.VortexFile.to_arrow` returns a {class}`pyarrow.RecordBatchReader`. Call
33-
{meth}`~pyarrow.RecordBatchReader.read_all` to collect into a {class}`pyarrow.Table`:
32+
{meth}`.VortexFile.to_table` collects the scan into a {class}`pyarrow.Table`:
3433

3534
```{doctest} pycon
36-
>>> table = f.to_arrow().read_all()
35+
>>> table = f.to_table()
3736
>>> table.num_rows
3837
1000
3938
```
4039

40+
{meth}`.VortexFile.to_arrow` returns a streaming {class}`pyarrow.RecordBatchReader`.
41+
4142
### Column Projection
4243

4344
Read only the columns you need:
4445

4546
```{doctest} pycon
46-
>>> table = f.to_arrow(['tip_amount', 'fare_amount']).read_all()
47+
>>> table = f.to_table(columns=['tip_amount', 'fare_amount'])
4748
>>> table.column_names
4849
['tip_amount', 'fare_amount']
4950
```
5051

52+
### Filters
53+
54+
Vortex expressions are the stable pushdown API. PyVortex plans them against the file schema before
55+
the scan runs:
56+
57+
```{doctest} pycon
58+
>>> table = f.to_table(columns=['tip_amount'], filter=vx.col('tip_amount') > 10)
59+
>>> table.num_rows > 0
60+
True
61+
```
62+
63+
PyArrow compute expressions are accepted as compatibility input. PyVortex converts them through
64+
Substrait, then runs the same Vortex planner:
65+
66+
```{doctest} pycon
67+
>>> import pyarrow.compute as pc
68+
>>> table = f.to_table(columns=['tip_amount'], filter=pc.field('tip_amount') > 10)
69+
>>> table.num_rows > 0
70+
True
71+
```
72+
73+
Use `filter_policy="pushdown"` to raise when a PyArrow expression cannot be pushed into Vortex. Use
74+
`filter_policy="fallback"` to read the rows and apply the PyArrow filter after the scan.
75+
5176
### Streaming Record Batches
5277

5378
Iterate over record batches for streaming processing:

docs/user-guide/vortex-python.md

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,8 @@ Available types: {func}`~vortex.null`, {func}`~vortex.bool_`,
117117

118118
## Expressions
119119

120-
The `vortex.expr` module provides expressions for filtering and projecting. These
121-
are primarily used with {meth}`.VortexFile.scan` and {meth}`.VortexFile.to_arrow` but can also be
122-
applied directly:
120+
The `vortex.expr` module provides expressions for filtering and projecting. Use `vx.col` or
121+
`vortex.expr.col` to build the stable predicate DSL for pushdown:
123122

124123
```{doctest} pycon
125124
>>> import vortex.expr as ve
@@ -128,11 +127,21 @@ applied directly:
128127
... {'name': 'Bob', 'age': 25},
129128
... {'name': 'Carol', 'age': 35},
130129
... ])
131-
>>> expr = ve.column('age') > 28
130+
>>> expr = vx.col('age') > 28
132131
>>> arr.apply(expr).to_arrow_array().to_pylist()
133132
[True, False, True]
134133
```
135134

135+
When a filter is used to read a file, PyVortex plans it against the file schema. Planning inserts
136+
the casts required by the Vortex expression engine, simplifies the expression, and validates that
137+
filters return Boolean values. You can run the same step directly:
138+
139+
```{doctest} pycon
140+
>>> planned = ve.plan(vx.col('age') > 28, schema=arr.dtype.to_arrow_schema(), kind="filter")
141+
>>> isinstance(planned, ve.Expr)
142+
True
143+
```
144+
136145
## VortexFile
137146

138147
{func}`~vortex.open` lazily opens a Vortex file for reading:
@@ -146,19 +155,16 @@ applied directly:
146155
1000
147156
```
148157

149-
Use {meth}`.VortexFile.scan` to read data with optional projection, filtering, and limit:
158+
Use {meth}`.VortexFile.to_table` or {meth}`.VortexFile.to_arrow` to read Arrow data with optional
159+
column projection, filtering, and limit:
150160

151161
```{doctest} pycon
152-
>>> result = f.scan(['tip_amount'], limit=3).read_all()
153-
>>> result.to_arrow_array()
154-
<pyarrow.lib.StructArray object at ...>
155-
-- is_valid: all not null
156-
-- child 0 type: double
157-
[
158-
0,
159-
5.1,
160-
16.54
161-
]
162+
>>> table = f.to_table(columns=['tip_amount'], limit=3)
163+
>>> table.to_pydict()
164+
{'tip_amount': [0.0, 5.1, 16.54]}
165+
>>> filtered = f.to_table(columns=['tip_amount'], filter=vx.col('tip_amount') > 10)
166+
>>> filtered.num_rows > 0
167+
True
162168
```
163169

164170
## ArrayIterator

vortex-array/public-api.lock

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12402,6 +12402,10 @@ pub fn vortex_array::expr::or_collect<I>(iter: I) -> core::option::Option<vortex
1240212402

1240312403
pub fn vortex_array::expr::pack(elements: impl core::iter::traits::collect::IntoIterator<Item = (impl core::convert::Into<vortex_array::dtype::FieldName>, vortex_array::expr::Expression)>, nullability: vortex_array::dtype::Nullability) -> vortex_array::expr::Expression
1240412404

12405+
pub fn vortex_array::expr::plan_expression(expr: vortex_array::expr::Expression, scope: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::expr::Expression>
12406+
12407+
pub fn vortex_array::expr::plan_filter_expression(expr: vortex_array::expr::Expression, scope: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_array::expr::Expression>
12408+
1240512409
pub fn vortex_array::expr::root() -> vortex_array::expr::Expression
1240612410

1240712411
pub fn vortex_array::expr::select(field_names: impl core::convert::Into<vortex_array::dtype::FieldNames>, child: vortex_array::expr::Expression) -> vortex_array::expr::Expression

vortex-array/src/expr/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ mod exprs;
3333
pub(crate) mod field;
3434
pub mod forms;
3535
mod optimize;
36+
mod plan;
3637
pub mod proto;
3738
pub mod pruning;
3839
pub mod stats;
@@ -42,6 +43,7 @@ pub mod traversal;
4243
pub use analysis::*;
4344
pub use expression::*;
4445
pub use exprs::*;
46+
pub use plan::*;
4547
pub use pruning::StatsCatalog;
4648

4749
pub trait VortexExprExt {

vortex-array/src/expr/plan.rs

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! Expression planning against an input scope.
5+
6+
use vortex_error::VortexResult;
7+
use vortex_error::vortex_bail;
8+
9+
use crate::dtype::DType;
10+
use crate::expr::Expression;
11+
use crate::expr::transform::coerce_expression;
12+
13+
/// Plan an expression against an input [`DType`].
14+
///
15+
/// Planning typeifies the expression by inserting casts required by scalar functions, simplifies
16+
/// the resulting tree, and verifies that the planned expression has a valid return type for the
17+
/// provided scope.
18+
pub fn plan_expression(expr: Expression, scope: &DType) -> VortexResult<Expression> {
19+
let expr = coerce_expression(expr, scope)?;
20+
let expr = expr.optimize_recursive(scope)?;
21+
expr.return_dtype(scope)?;
22+
Ok(expr)
23+
}
24+
25+
/// Plan a filter expression against an input [`DType`].
26+
///
27+
/// This performs the same planning pass as [`plan_expression`] and then requires the expression to
28+
/// return a Boolean value.
29+
pub fn plan_filter_expression(expr: Expression, scope: &DType) -> VortexResult<Expression> {
30+
let expr = plan_expression(expr, scope)?;
31+
let dtype = expr.return_dtype(scope)?;
32+
if !matches!(dtype, DType::Bool(_)) {
33+
vortex_bail!("filter expression must return bool, got {}", dtype);
34+
}
35+
Ok(expr)
36+
}
37+
38+
#[cfg(test)]
39+
mod tests {
40+
use vortex_error::VortexResult;
41+
42+
use crate::dtype::DType;
43+
use crate::dtype::Nullability::NonNullable;
44+
use crate::dtype::Nullability::Nullable;
45+
use crate::dtype::PType;
46+
use crate::dtype::StructFields;
47+
use crate::expr::col;
48+
use crate::expr::lit;
49+
use crate::expr::plan_expression;
50+
use crate::expr::plan_filter_expression;
51+
use crate::scalar::Scalar;
52+
use crate::scalar_fn::ScalarFnVTableExt;
53+
use crate::scalar_fn::fns::binary::Binary;
54+
use crate::scalar_fn::fns::cast::Cast;
55+
use crate::scalar_fn::fns::operators::Operator;
56+
57+
fn scope() -> DType {
58+
DType::Struct(
59+
StructFields::new(
60+
["i32", "i64", "u8", "flag"].into(),
61+
vec![
62+
DType::Primitive(PType::I32, NonNullable),
63+
DType::Primitive(PType::I64, NonNullable),
64+
DType::Primitive(PType::U8, NonNullable),
65+
DType::Bool(NonNullable),
66+
],
67+
),
68+
NonNullable,
69+
)
70+
}
71+
72+
#[test]
73+
fn mixed_numeric_comparison_inserts_cast() -> VortexResult<()> {
74+
let scope = scope();
75+
let expr = Binary.new_expr(Operator::Lt, [col("i32"), col("i64")]);
76+
77+
let planned = plan_filter_expression(expr, &scope)?;
78+
79+
assert!(planned.child(0).is::<Cast>());
80+
assert_eq!(
81+
planned.child(0).return_dtype(&scope)?,
82+
DType::Primitive(PType::I64, NonNullable)
83+
);
84+
assert!(!planned.child(1).is::<Cast>());
85+
Ok(())
86+
}
87+
88+
#[test]
89+
fn mixed_numeric_arithmetic_inserts_casts() -> VortexResult<()> {
90+
let scope = scope();
91+
let expr = Binary.new_expr(Operator::Add, [col("u8"), col("i32")]);
92+
93+
let planned = plan_expression(expr, &scope)?;
94+
95+
assert!(planned.child(0).is::<Cast>());
96+
assert_eq!(
97+
planned.return_dtype(&scope)?,
98+
DType::Primitive(PType::I64, NonNullable)
99+
);
100+
Ok(())
101+
}
102+
103+
#[test]
104+
fn literal_values_are_coerced_against_column_types() -> VortexResult<()> {
105+
let scope = scope();
106+
let expr = Binary.new_expr(Operator::Eq, [col("i32"), lit(1i64)]);
107+
108+
let planned = plan_filter_expression(expr, &scope)?;
109+
110+
assert!(!planned.child(0).is::<Cast>());
111+
assert!(planned.child(1).is::<Cast>());
112+
assert_eq!(
113+
planned.child(1).return_dtype(&scope)?,
114+
DType::Primitive(PType::I32, NonNullable)
115+
);
116+
Ok(())
117+
}
118+
119+
#[test]
120+
fn null_literals_are_typed_from_context() -> VortexResult<()> {
121+
let scope = scope();
122+
let expr = Binary.new_expr(Operator::Eq, [col("i32"), lit(Scalar::null(DType::Null))]);
123+
124+
let planned = plan_filter_expression(expr, &scope)?;
125+
126+
assert!(planned.child(1).is::<Cast>());
127+
assert_eq!(
128+
planned.child(1).return_dtype(&scope)?,
129+
DType::Primitive(PType::I32, Nullable)
130+
);
131+
Ok(())
132+
}
133+
134+
#[test]
135+
fn boolean_and_preserves_boolean_inputs() -> VortexResult<()> {
136+
let scope = scope();
137+
let expr = Binary.new_expr(Operator::And, [col("flag"), col("flag")]);
138+
139+
let planned = plan_filter_expression(expr, &scope)?;
140+
141+
assert_eq!(planned.return_dtype(&scope)?, DType::Bool(NonNullable));
142+
assert!(!planned.child(0).is::<Cast>());
143+
assert!(!planned.child(1).is::<Cast>());
144+
Ok(())
145+
}
146+
147+
#[test]
148+
fn filter_planning_rejects_non_boolean_outputs() {
149+
let scope = scope();
150+
let expr = Binary.new_expr(Operator::Add, [col("i32"), lit(1i32)]);
151+
152+
let err = plan_filter_expression(expr, &scope).unwrap_err();
153+
154+
assert!(
155+
err.to_string()
156+
.contains("filter expression must return bool")
157+
);
158+
}
159+
160+
#[test]
161+
fn logical_operators_reject_non_boolean_inputs() {
162+
let scope = scope();
163+
let expr = Binary.new_expr(Operator::And, [col("i32"), col("i64")]);
164+
165+
let err = plan_filter_expression(expr, &scope).unwrap_err();
166+
167+
assert!(
168+
err.to_string()
169+
.contains("logical operation requires boolean operands")
170+
);
171+
}
172+
}

0 commit comments

Comments
 (0)