Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 61c17e3

Browse files
refactor: Define sql nodes and transform (#2438)
1 parent 59cbc5d commit 61c17e3

File tree

293 files changed

+2562
-6200
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

293 files changed

+2562
-6200
lines changed

bigframes/core/compile/compiled.py

Lines changed: 5 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16-
import functools
1716
import itertools
1817
import typing
1918
from typing import Literal, Optional, Sequence
@@ -27,7 +26,7 @@
2726
from google.cloud import bigquery
2827
import pyarrow as pa
2928

30-
from bigframes.core import agg_expressions
29+
from bigframes.core import agg_expressions, rewrite
3130
import bigframes.core.agg_expressions as ex_types
3231
import bigframes.core.compile.googlesql
3332
import bigframes.core.compile.ibis_compiler.aggregate_compiler as agg_compiler
@@ -38,8 +37,6 @@
3837
import bigframes.core.sql
3938
from bigframes.core.window_spec import WindowSpec
4039
import bigframes.dtypes
41-
import bigframes.operations as ops
42-
import bigframes.operations.aggregations as agg_ops
4340

4441
op_compiler = op_compilers.scalar_op_compiler
4542

@@ -424,59 +421,11 @@ def project_window_op(
424421
output_name,
425422
)
426423

427-
if expression.op.order_independent and window_spec.is_unbounded:
428-
# notably percentile_cont does not support ordering clause
429-
window_spec = window_spec.without_order()
430-
431-
# TODO: Turn this logic into a true rewriter
432-
result_expr: ex.Expression = agg_expressions.WindowExpression(
433-
expression, window_spec
424+
rewritten_expr = rewrite.simplify_complex_windows(
425+
agg_expressions.WindowExpression(expression, window_spec)
434426
)
435-
clauses: list[tuple[ex.Expression, ex.Expression]] = []
436-
if window_spec.min_periods and len(expression.inputs) > 0:
437-
if not expression.op.nulls_count_for_min_values:
438-
is_observation = ops.notnull_op.as_expr()
439-
440-
# Most operations do not count NULL values towards min_periods
441-
per_col_does_count = (
442-
ops.notnull_op.as_expr(input) for input in expression.inputs
443-
)
444-
# All inputs must be non-null for observation to count
445-
is_observation = functools.reduce(
446-
lambda x, y: ops.and_op.as_expr(x, y), per_col_does_count
447-
)
448-
observation_sentinel = ops.AsTypeOp(bigframes.dtypes.INT_DTYPE).as_expr(
449-
is_observation
450-
)
451-
observation_count_expr = agg_expressions.WindowExpression(
452-
ex_types.UnaryAggregation(agg_ops.sum_op, observation_sentinel),
453-
window_spec,
454-
)
455-
else:
456-
# Operations like count treat even NULLs as valid observations for the sake of min_periods
457-
# notnull is just used to convert null values to non-null (FALSE) values to be counted
458-
is_observation = ops.notnull_op.as_expr(expression.inputs[0])
459-
observation_count_expr = agg_expressions.WindowExpression(
460-
agg_ops.count_op.as_expr(is_observation),
461-
window_spec,
462-
)
463-
clauses.append(
464-
(
465-
ops.lt_op.as_expr(
466-
observation_count_expr, ex.const(window_spec.min_periods)
467-
),
468-
ex.const(None),
469-
)
470-
)
471-
if clauses:
472-
case_inputs = [
473-
*itertools.chain.from_iterable(clauses),
474-
ex.const(True),
475-
result_expr,
476-
]
477-
result_expr = ops.CaseWhenOp().as_expr(*case_inputs)
478-
479-
ibis_expr = op_compiler.compile_expression(result_expr, self._ibis_bindings)
427+
428+
ibis_expr = op_compiler.compile_expression(rewritten_expr, self._ibis_bindings)
480429

481430
return UnorderedIR(self._table, (*self.columns, ibis_expr.name(output_name)))
482431

bigframes/core/compile/sqlglot/aggregate_compiler.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
ordered_unary_compiler,
2323
unary_compiler,
2424
)
25+
import bigframes.core.compile.sqlglot.expression_compiler as expression_compiler
2526
from bigframes.core.compile.sqlglot.expressions import typed_expr
26-
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
2727

2828

2929
def compile_aggregate(
@@ -35,7 +35,7 @@ def compile_aggregate(
3535
return nullary_compiler.compile(aggregate.op)
3636
if isinstance(aggregate, agg_expressions.UnaryAggregation):
3737
column = typed_expr.TypedExpr(
38-
scalar_compiler.scalar_op_compiler.compile_expression(aggregate.arg),
38+
expression_compiler.expression_compiler.compile_expression(aggregate.arg),
3939
aggregate.arg.output_type,
4040
)
4141
if not aggregate.op.order_independent:
@@ -46,11 +46,11 @@ def compile_aggregate(
4646
return unary_compiler.compile(aggregate.op, column)
4747
elif isinstance(aggregate, agg_expressions.BinaryAggregation):
4848
left = typed_expr.TypedExpr(
49-
scalar_compiler.scalar_op_compiler.compile_expression(aggregate.left),
49+
expression_compiler.expression_compiler.compile_expression(aggregate.left),
5050
aggregate.left.output_type,
5151
)
5252
right = typed_expr.TypedExpr(
53-
scalar_compiler.scalar_op_compiler.compile_expression(aggregate.right),
53+
expression_compiler.expression_compiler.compile_expression(aggregate.right),
5454
aggregate.right.output_type,
5555
)
5656
return binary_compiler.compile(aggregate.op, left, right)
@@ -66,7 +66,7 @@ def compile_analytic(
6666
return nullary_compiler.compile(aggregate.op, window)
6767
if isinstance(aggregate, agg_expressions.UnaryAggregation):
6868
column = typed_expr.TypedExpr(
69-
scalar_compiler.scalar_op_compiler.compile_expression(aggregate.arg),
69+
expression_compiler.expression_compiler.compile_expression(aggregate.arg),
7070
aggregate.arg.output_type,
7171
)
7272
return unary_compiler.compile(aggregate.op, column, window)

bigframes/core/compile/sqlglot/aggregations/binary_compiler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ def compile(
3333
right: typed_expr.TypedExpr,
3434
window: typing.Optional[window_spec.WindowSpec] = None,
3535
) -> sge.Expression:
36+
if op.order_independent and (window is not None) and window.is_unbounded:
37+
window = window.without_order()
3638
return BINARY_OP_REGISTRATION[op](op, left, right, window=window)
3739

3840

bigframes/core/compile/sqlglot/aggregations/nullary_compiler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ def compile(
3030
op: agg_ops.WindowOp,
3131
window: typing.Optional[window_spec.WindowSpec] = None,
3232
) -> sge.Expression:
33+
if op.order_independent and (window is not None) and window.is_unbounded:
34+
window = window.without_order()
3335
return NULLARY_OP_REGISTRATION[op](op, window=window)
3436

3537

bigframes/core/compile/sqlglot/aggregations/unary_compiler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ def compile(
3636
column: typed_expr.TypedExpr,
3737
window: typing.Optional[window_spec.WindowSpec] = None,
3838
) -> sge.Expression:
39+
if op.order_independent and (window is not None) and window.is_unbounded:
40+
window = window.without_order()
3941
return UNARY_OP_REGISTRATION[op](op, column, window=window)
4042

4143

bigframes/core/compile/sqlglot/aggregations/windows.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import bigframes_vendored.sqlglot.expressions as sge
1919

2020
from bigframes.core import utils, window_spec
21-
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
21+
import bigframes.core.compile.sqlglot.expression_compiler as expression_compiler
2222
import bigframes.core.expression as ex
2323
import bigframes.core.ordering as ordering_spec
2424
import bigframes.dtypes as dtypes
@@ -116,7 +116,7 @@ def get_window_order_by(
116116

117117
order_by = []
118118
for ordering_spec_item in ordering:
119-
expr = scalar_compiler.scalar_op_compiler.compile_expression(
119+
expr = expression_compiler.expression_compiler.compile_expression(
120120
ordering_spec_item.scalar_expression
121121
)
122122
desc = not ordering_spec_item.direction.is_ascending
@@ -191,15 +191,15 @@ def _get_window_bounds(
191191

192192

193193
def _compile_group_by_key(key: ex.Expression) -> sge.Expression:
194-
expr = scalar_compiler.scalar_op_compiler.compile_expression(key)
194+
expr = expression_compiler.expression_compiler.compile_expression(key)
195195
# The group_by keys has been rewritten by bind_schema_to_node
196-
assert isinstance(key, ex.ResolvedDerefOp)
196+
assert key.is_scalar_expr and key.is_resolved
197197

198198
# Some types need to be converted to another type to enable groupby
199-
if key.dtype == dtypes.FLOAT_DTYPE:
199+
if key.output_type == dtypes.FLOAT_DTYPE:
200200
expr = sge.Cast(this=expr, to="STRING")
201-
elif key.dtype == dtypes.GEO_DTYPE:
201+
elif key.output_type == dtypes.GEO_DTYPE:
202202
expr = sge.func("ST_ASBINARY", expr)
203-
elif key.dtype == dtypes.JSON_DTYPE:
203+
elif key.output_type == dtypes.JSON_DTYPE:
204204
expr = sge.func("TO_JSON_STRING", expr)
205205
return expr

0 commit comments

Comments
 (0)