Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit e7c4eb4

Browse files
refactor: Define window column expression type
1 parent 12e4380 commit e7c4eb4

File tree

10 files changed

+320
-206
lines changed

10 files changed

+320
-206
lines changed

bigframes/core/agg_expressions.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from typing import Callable, Mapping, TypeVar
2323

2424
from bigframes import dtypes
25-
from bigframes.core import expression
25+
from bigframes.core import expression, window_spec
2626
import bigframes.core.identifiers as ids
2727
import bigframes.operations.aggregations as agg_ops
2828

@@ -149,3 +149,68 @@ def replace_args(
149149
self, larg: expression.Expression, rarg: expression.Expression
150150
) -> BinaryAggregation:
151151
return BinaryAggregation(self.op, larg, rarg)
152+
153+
154+
@dataclasses.dataclass(frozen=True)
155+
class WindowExpression(expression.Expression):
156+
analytic_expr: Aggregation
157+
window: window_spec.WindowSpec
158+
159+
@property
160+
def column_references(self) -> typing.Tuple[ids.ColumnId, ...]:
161+
return tuple(
162+
itertools.chain.from_iterable(
163+
map(lambda x: x.column_references, self.inputs)
164+
)
165+
)
166+
167+
@functools.cached_property
168+
def is_resolved(self) -> bool:
169+
return all(input.is_resolved for input in self.inputs)
170+
171+
@property
172+
def output_type(self) -> dtypes.ExpressionType:
173+
return self.analytic_expr.output_type
174+
175+
@property
176+
def inputs(
177+
self,
178+
) -> typing.Tuple[expression.Expression, ...]:
179+
return (self.analytic_expr, *self.window.expressions)
180+
181+
@property
182+
def free_variables(self) -> typing.Tuple[str, ...]:
183+
return tuple(
184+
itertools.chain.from_iterable(map(lambda x: x.free_variables, self.inputs))
185+
)
186+
187+
@property
188+
def is_const(self) -> bool:
189+
return all(child.is_const for child in self.inputs)
190+
191+
def transform_children(
192+
self: WindowExpression,
193+
t: Callable[[expression.Expression], expression.Expression],
194+
) -> WindowExpression:
195+
return WindowExpression(
196+
self.analytic_expr.transform_children(t),
197+
self.window.transform_exprs(t),
198+
)
199+
200+
def bind_variables(
201+
self: WindowExpression,
202+
bindings: Mapping[str, expression.Expression],
203+
allow_partial_bindings: bool = False,
204+
) -> WindowExpression:
205+
return self.transform_children(
206+
lambda x: x.bind_variables(bindings, allow_partial_bindings)
207+
)
208+
209+
def bind_refs(
210+
self: WindowExpression,
211+
bindings: Mapping[ids.ColumnId, expression.Expression],
212+
allow_partial_bindings: bool = False,
213+
) -> WindowExpression:
214+
return self.transform_children(
215+
lambda x: x.bind_refs(bindings, allow_partial_bindings)
216+
)

bigframes/core/blocks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,7 @@ def apply_analytic(
11741174
block = self
11751175
if skip_null_groups:
11761176
for key in window.grouping_keys:
1177-
block = block.filter(ops.notnull_op.as_expr(key.id.name))
1177+
block = block.filter(ops.notnull_op.as_expr(key))
11781178
expr, result_id = block._expr.project_window_expr(
11791179
agg_expr,
11801180
window,

bigframes/core/compile/compiled.py

Lines changed: 42 additions & 193 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@
2121
import bigframes_vendored.ibis
2222
import bigframes_vendored.ibis.backends.bigquery.backend as ibis_bigquery
2323
import bigframes_vendored.ibis.common.deferred as ibis_deferred # type: ignore
24-
from bigframes_vendored.ibis.expr import builders as ibis_expr_builders
2524
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
26-
from bigframes_vendored.ibis.expr.operations import window as ibis_expr_window
2725
import bigframes_vendored.ibis.expr.operations as ibis_ops
2826
import bigframes_vendored.ibis.expr.types as ibis_types
2927
from google.cloud import bigquery
3028
import pyarrow as pa
3129

32-
from bigframes.core import utils
30+
from bigframes.core import agg_expressions
3331
import bigframes.core.agg_expressions as ex_types
3432
import bigframes.core.compile.googlesql
3533
import bigframes.core.compile.ibis_compiler.aggregate_compiler as agg_compiler
@@ -38,8 +36,9 @@
3836
import bigframes.core.expression as ex
3937
from bigframes.core.ordering import OrderingExpression
4038
import bigframes.core.sql
41-
from bigframes.core.window_spec import RangeWindowBounds, RowsWindowBounds, WindowSpec
39+
from bigframes.core.window_spec import WindowSpec
4240
import bigframes.dtypes
41+
import bigframes.operations as ops
4342
import bigframes.operations.aggregations as agg_ops
4443

4544
op_compiler = op_compilers.scalar_op_compiler
@@ -237,7 +236,9 @@ def aggregate(
237236
col_out: agg_compiler.compile_aggregate(
238237
aggregate,
239238
bindings,
240-
order_by=_convert_row_ordering_to_table_values(table, order_by),
239+
order_by=op_compiler._convert_row_ordering_to_table_values(
240+
table, order_by
241+
),
241242
)
242243
for aggregate, col_out in aggregations
243244
}
@@ -442,113 +443,63 @@ def project_window_op(
442443
if expression.op.order_independent and window_spec.is_unbounded:
443444
# notably percentile_cont does not support ordering clause
444445
window_spec = window_spec.without_order()
445-
window = self._ibis_window_from_spec(window_spec)
446-
bindings = {col: self._get_ibis_column(col) for col in self.column_ids}
447-
448-
window_op = agg_compiler.compile_analytic(
449-
expression,
450-
window,
451-
bindings=bindings,
452-
)
453446

454-
inputs = tuple(
455-
typing.cast(ibis_types.Column, self._compile_expression(ex.DerefOp(column)))
456-
for column in expression.column_references
447+
# TODO: Turn this logic into a true rewriter
448+
result_expr: ex.Expression = agg_expressions.WindowExpression(
449+
expression, window_spec
457450
)
458-
clauses = []
451+
clauses: list[tuple[ex.Expression, ex.Expression]] = []
459452
if expression.op.skips_nulls and not never_skip_nulls:
460-
for column in inputs:
461-
clauses.append((column.isnull(), ibis_types.null()))
462-
if window_spec.min_periods and len(inputs) > 0:
453+
for input in expression.inputs:
454+
clauses.append((ops.isnull_op.as_expr(input), ex.const(None)))
455+
if window_spec.min_periods and len(expression.inputs) > 0:
463456
if not expression.op.nulls_count_for_min_values:
457+
is_observation = ops.notnull_op.as_expr()
458+
464459
# Most operations do not count NULL values towards min_periods
465-
per_col_does_count = (column.notnull() for column in inputs)
460+
per_col_does_count = (
461+
ops.notnull_op.as_expr(input) for input in expression.inputs
462+
)
466463
# All inputs must be non-null for observation to count
467464
is_observation = functools.reduce(
468-
lambda x, y: x & y, per_col_does_count
469-
).cast(int)
470-
observation_count = agg_compiler.compile_analytic(
471-
ex_types.UnaryAggregation(
472-
agg_ops.sum_op, ex.deref("_observation_count")
473-
),
474-
window,
475-
bindings={"_observation_count": is_observation},
465+
lambda x, y: ops.and_op.as_expr(x, y), per_col_does_count
466+
)
467+
observation_sentinel = ops.AsTypeOp(bigframes.dtypes.INT_DTYPE).as_expr(
468+
is_observation
469+
)
470+
observation_count_expr = agg_expressions.WindowExpression(
471+
ex_types.UnaryAggregation(agg_ops.sum_op, observation_sentinel),
472+
window_spec,
476473
)
477474
else:
478475
# Operations like count treat even NULLs as valid observations for the sake of min_periods
479476
# notnull is just used to convert null values to non-null (FALSE) values to be counted
480-
is_observation = inputs[0].notnull()
481-
observation_count = agg_compiler.compile_analytic(
482-
ex_types.UnaryAggregation(
483-
agg_ops.count_op, ex.deref("_observation_count")
484-
),
485-
window,
486-
bindings={"_observation_count": is_observation},
477+
observation_count_expr = agg_expressions.WindowExpression(
478+
agg_ops.size_op.as_expr(),
479+
window_spec,
487480
)
488481
clauses.append(
489482
(
490-
observation_count < ibis_types.literal(window_spec.min_periods),
491-
ibis_types.null(),
483+
ops.lt_op.as_expr(
484+
observation_count_expr, ex.const(window_spec.min_periods)
485+
),
486+
ex.const(None),
492487
)
493488
)
494489
if clauses:
495-
case_statement = bigframes_vendored.ibis.case()
496-
for clause in clauses:
497-
case_statement = case_statement.when(clause[0], clause[1])
498-
case_statement = case_statement.else_(window_op).end() # type: ignore
499-
window_op = case_statement # type: ignore
500-
501-
return UnorderedIR(self._table, (*self.columns, window_op.name(output_name)))
502-
503-
def _compile_expression(self, expr: ex.Expression):
504-
return op_compiler.compile_expression(expr, self._ibis_bindings)
505-
506-
def _ibis_window_from_spec(self, window_spec: WindowSpec):
507-
group_by: typing.List[ibis_types.Value] = (
508-
[
509-
typing.cast(
510-
ibis_types.Column, _as_groupable(self._compile_expression(column))
511-
)
512-
for column in window_spec.grouping_keys
490+
case_inputs = [
491+
*itertools.chain.from_iterable(clauses),
492+
ex.const(True),
493+
result_expr,
513494
]
514-
if window_spec.grouping_keys
515-
else []
516-
)
495+
result_expr = ops.CaseWhenOp().as_expr(*case_inputs)
517496

518-
# Construct ordering. There are basically 3 main cases
519-
# 1. Order-independent op (aggregation, cut, rank) with unbound window - no ordering clause needed
520-
# 2. Order-independent op (aggregation, cut, rank) with range window - use ordering clause, ties allowed
521-
# 3. Order-depedenpent op (navigation functions, array_agg) or rows bounds - use total row order to break ties.
522-
if window_spec.is_row_bounded:
523-
if not window_spec.ordering:
524-
# If window spec has following or preceding bounds, we need to apply an unambiguous ordering.
525-
raise ValueError("No ordering provided for ordered analytic function")
526-
order_by = _convert_row_ordering_to_table_values(
527-
self._column_names,
528-
window_spec.ordering,
529-
)
497+
ibis_expr = op_compiler.compile_expression(result_expr, self._ibis_bindings)
530498

531-
elif window_spec.is_range_bounded:
532-
order_by = [
533-
_convert_range_ordering_to_table_value(
534-
self._column_names,
535-
window_spec.ordering[0],
536-
)
537-
]
538-
# The rest if branches are for unbounded windows
539-
elif window_spec.ordering:
540-
# Unbound grouping window. Suitable for aggregations but not for analytic function application.
541-
order_by = _convert_row_ordering_to_table_values(
542-
self._column_names,
543-
window_spec.ordering,
544-
)
545-
else:
546-
order_by = None
499+
return UnorderedIR(self._table, (*self.columns, ibis_expr.name(output_name)))
547500

548-
window = bigframes_vendored.ibis.window(order_by=order_by, group_by=group_by)
549-
if window_spec.bounds is not None:
550-
return _add_boundary(window_spec.bounds, window)
551-
return window
501+
def _compile_expression(self, expr: ex.Expression):
502+
return op_compiler.compile_expression(expr, self._ibis_bindings)
552503

553504

554505
def is_literal(column: ibis_types.Value) -> bool:
@@ -567,58 +518,6 @@ def is_window(column: ibis_types.Value) -> bool:
567518
return any(isinstance(op, ibis_ops.WindowFunction) for op in matches)
568519

569520

570-
def _convert_row_ordering_to_table_values(
571-
value_lookup: typing.Mapping[str, ibis_types.Value],
572-
ordering_columns: typing.Sequence[OrderingExpression],
573-
) -> typing.Sequence[ibis_types.Value]:
574-
column_refs = ordering_columns
575-
ordering_values = []
576-
for ordering_col in column_refs:
577-
expr = op_compiler.compile_expression(
578-
ordering_col.scalar_expression, value_lookup
579-
)
580-
ordering_value = (
581-
bigframes_vendored.ibis.asc(expr) # type: ignore
582-
if ordering_col.direction.is_ascending
583-
else bigframes_vendored.ibis.desc(expr) # type: ignore
584-
)
585-
# Bigquery SQL considers NULLS to be "smallest" values, but we need to override in these cases.
586-
if (not ordering_col.na_last) and (not ordering_col.direction.is_ascending):
587-
# Force nulls to be first
588-
is_null_val = typing.cast(ibis_types.Column, expr.isnull())
589-
ordering_values.append(bigframes_vendored.ibis.desc(is_null_val))
590-
elif (ordering_col.na_last) and (ordering_col.direction.is_ascending):
591-
# Force nulls to be last
592-
is_null_val = typing.cast(ibis_types.Column, expr.isnull())
593-
ordering_values.append(bigframes_vendored.ibis.asc(is_null_val))
594-
ordering_values.append(ordering_value)
595-
return ordering_values
596-
597-
598-
def _convert_range_ordering_to_table_value(
599-
value_lookup: typing.Mapping[str, ibis_types.Value],
600-
ordering_column: OrderingExpression,
601-
) -> ibis_types.Value:
602-
"""Converts the ordering for range windows to Ibis references.
603-
604-
Note that this method is different from `_convert_row_ordering_to_table_values` in
605-
that it does not arrange null values. There are two reasons:
606-
1. Manipulating null positions requires more than one ordering key, which is forbidden
607-
by SQL window syntax for range rolling.
608-
2. Pandas does not allow range rolling on timeseries with nulls.
609-
610-
Therefore, we opt for the simplest approach here: generate the simplest SQL and follow
611-
the BigQuery engine behavior.
612-
"""
613-
expr = op_compiler.compile_expression(
614-
ordering_column.scalar_expression, value_lookup
615-
)
616-
617-
if ordering_column.direction.is_ascending:
618-
return bigframes_vendored.ibis.asc(expr) # type: ignore
619-
return bigframes_vendored.ibis.desc(expr) # type: ignore
620-
621-
622521
def _string_cast_join_cond(
623522
lvalue: ibis_types.Column, rvalue: ibis_types.Column
624523
) -> ibis_types.BooleanColumn:
@@ -678,53 +577,3 @@ def _join_condition(
678577
else:
679578
return _string_cast_join_cond(lvalue, rvalue)
680579
return typing.cast(ibis_types.BooleanColumn, lvalue == rvalue)
681-
682-
683-
def _as_groupable(value: ibis_types.Value):
684-
from bigframes.core.compile.ibis_compiler import scalar_op_registry
685-
686-
# Some types need to be converted to another type to enable groupby
687-
if value.type().is_float64():
688-
return value.cast(ibis_dtypes.str)
689-
elif value.type().is_geospatial():
690-
return typing.cast(ibis_types.GeoSpatialColumn, value).as_binary()
691-
elif value.type().is_json():
692-
return scalar_op_registry.to_json_string(value)
693-
else:
694-
return value
695-
696-
697-
def _to_ibis_boundary(
698-
boundary: Optional[int],
699-
) -> Optional[ibis_expr_window.WindowBoundary]:
700-
if boundary is None:
701-
return None
702-
return ibis_expr_window.WindowBoundary(
703-
abs(boundary), preceding=boundary <= 0 # type:ignore
704-
)
705-
706-
707-
def _add_boundary(
708-
bounds: typing.Union[RowsWindowBounds, RangeWindowBounds],
709-
ibis_window: ibis_expr_builders.LegacyWindowBuilder,
710-
) -> ibis_expr_builders.LegacyWindowBuilder:
711-
if isinstance(bounds, RangeWindowBounds):
712-
return ibis_window.range(
713-
start=_to_ibis_boundary(
714-
None
715-
if bounds.start is None
716-
else utils.timedelta_to_micros(bounds.start)
717-
),
718-
end=_to_ibis_boundary(
719-
None if bounds.end is None else utils.timedelta_to_micros(bounds.end)
720-
),
721-
)
722-
if isinstance(bounds, RowsWindowBounds):
723-
if bounds.start is not None or bounds.end is not None:
724-
return ibis_window.rows(
725-
start=_to_ibis_boundary(bounds.start),
726-
end=_to_ibis_boundary(bounds.end),
727-
)
728-
return ibis_window
729-
else:
730-
raise ValueError(f"unrecognized window bounds {bounds}")

0 commit comments

Comments
 (0)