Skip to content

Commit 7e38d7a

Browse files
authored
perf: Allow running as streaming node on the streaming engine (#343)
1 parent be94364 commit 7e38d7a

4 files changed

Lines changed: 38 additions & 8 deletions

File tree

dataframely/_plugin.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def all_rules_horizontal(rules: IntoExpr | Iterable[IntoExpr]) -> pl.Expr:
3030
function_name="all_rules_horizontal",
3131
args=rules,
3232
use_abs_path=True,
33+
is_elementwise=True,
3334
)
3435

3536

@@ -61,9 +62,14 @@ def all_rules_required(
6162
) -> pl.Expr:
6263
"""Execute :mod:`~polars.all_horizontal` and `.all` for a set of rules.
6364
64-
Contrary to :meth:`all_rules`, this method raises a
65-
:mod:`~polars.exceptions.ComputeError` at execution time if any rule indicates a
66-
validation failure. The `ComputeError` includes a helpful error message.
65+
This method differs from :meth:`all_rules` in two ways:
66+
67+
- It raises a :mod:`~polars.exceptions.ComputeError` at execution time if any
68+
rule indicates a validation failure. The `ComputeError` includes a helpful error
69+
message.
70+
- It broadcasts the resulting boolean series to the length of the input. This allows
71+
element-wise evaluation and making this a non-blocking operation on the streaming
72+
engine.
6773
6874
Args:
6975
rules: The rules to evaluate.
@@ -80,5 +86,10 @@ def all_rules_required(
8086
args=rules,
8187
kwargs={"null_is_valid": null_is_valid, "schema_name": schema_name},
8288
use_abs_path=True,
83-
returns_scalar=True,
89+
# NOTE: Conceptually, we're reducing the input to a single boolean value here.
90+
# However, we set this option to ensure that the plugin does not become
91+
# blocking on the streaming engine. A single boolean value is simply
92+
# broadcast and we're indifferent to actually finding all validation failures
93+
# during `validate` (and simply fail-fast).
94+
is_elementwise=True,
8495
)

dataframely/schema.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,12 @@ def validate(
549549
should raise upon failure. If `False`, the returned lazy frame will
550550
fail to collect if the validation does not pass.
551551
552+
Note:
553+
If running on the streaming engine, lazy validation will potentially
554+
not surface *all* validation issues as the validation is aborted
555+
once the first failure is encountered. Likewise, the reported
556+
validation failure can be non-deterministic.
557+
552558
Returns:
553559
The input eager or lazy frame, wrapped in a generic version of the
554560
input's data frame type to reflect schema adherence. Columns not defined

src/polars_plugin/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ pub fn all_rules_required(
8181
// neither actually runs the filter logic, nor does it copy any data. It's essentially a no-op
8282
// that is not optimized away in a lazy frame.
8383
if failures.is_empty() {
84-
return Ok(BooleanChunked::new(PlSmallStr::EMPTY, [true]).into_series());
84+
let column = Column::new_scalar(PlSmallStr::EMPTY, Scalar::from(true), 1);
85+
return Ok(column.take_materialized_series());
8586
}
8687

8788
// Aggregate failure counts into a validation error.

tests/schema/test_validate.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def test_invalid_column_contents(
115115
df = df_type({"a": [1, 2, 3], "b": ["x", "longtext", None], "c": ["1", None, "3"]})
116116
with pytest.raises(
117117
ValidationError if eager else plexc.ComputeError,
118-
match=r"2 rules failed validation",
118+
match=r"2 rules failed validation" if eager else None,
119119
):
120120
_validate_and_collect(MySchema, df, eager=eager)
121121
assert not MySchema.is_valid(df)
@@ -143,7 +143,7 @@ def test_violated_custom_rule(
143143
df = df_type({"a": [1, 1, 2, 3, 3], "b": [2, 2, 2, 4, 5]})
144144
with pytest.raises(
145145
ValidationError if eager else plexc.ComputeError,
146-
match=r"2 rules failed validation",
146+
match=r"2 rules failed validation" if eager else None,
147147
):
148148
_validate_and_collect(MyComplexSchema, df, eager=eager)
149149
assert not MyComplexSchema.is_valid(df)
@@ -285,7 +285,19 @@ def test_multiple_unique_columns_both_invalid(
285285
df = df_type({"a": [1, 1, 3], "b": ["x", "y", "y"]})
286286
with pytest.raises(
287287
ValidationError if eager else plexc.ComputeError,
288-
match=r"2 rules failed validation",
288+
match=r"2 rules failed validation" if eager else None,
289289
):
290290
_validate_and_collect(MultiUniqueSchema, df, eager=eager)
291291
assert not MultiUniqueSchema.is_valid(df)
292+
293+
294+
# ----------------------------------- PERFORMANCE ------------------------------------ #
295+
296+
297+
def test_lazy_validate_does_not_block_streaming_engine() -> None:
298+
schema = create_schema("test", {"a": dy.Int64(), "b": dy.Int64()})
299+
lf = pl.LazyFrame({"a": [1, 2, 3], "b": [2, 3, 4]}).lazy()
300+
out = schema.validate(lf, eager=False)
301+
graph = out.show_graph(engine="streaming", plan_stage="physical", raw_output=True)
302+
assert graph is not None
303+
assert "in-memory-map" not in graph

0 commit comments

Comments
 (0)