This repository was archived by the owner on Apr 1, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 67
Expand file tree
/
Copy pathwindows.py
More file actions
205 lines (175 loc) · 7.19 KB
/
windows.py
File metadata and controls
205 lines (175 loc) · 7.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import typing
import bigframes_vendored.sqlglot.expressions as sge
from bigframes.core import utils, window_spec
import bigframes.core.compile.sqlglot.expression_compiler as expression_compiler
import bigframes.core.expression as ex
import bigframes.core.ordering as ordering_spec
import bigframes.dtypes as dtypes
def apply_window_if_present(
value: sge.Expression,
window: typing.Optional[window_spec.WindowSpec] = None,
include_framing_clauses: bool = True,
order_by_override: typing.Optional[typing.List[sge.Ordered]] = None,
) -> sge.Expression:
if window is None:
return value
if window.is_row_bounded and not window.ordering:
raise ValueError("No ordering provided for ordered analytic function")
elif (
not window.is_row_bounded
and not window.is_range_bounded
and not window.ordering
):
# Unbound grouping window.
order_by = None
elif window.is_range_bounded:
order_by = get_window_order_by((window.ordering[0],))
order_by = remove_null_ordering_for_range_windows(order_by)
else:
order_by = get_window_order_by(window.ordering)
order = None
if order_by_override is not None and len(order_by_override) > 0:
order = sge.Order(expressions=order_by_override)
elif order_by:
order = sge.Order(expressions=order_by)
group_by = (
[_compile_group_by_key(key) for key in window.grouping_keys]
if window.grouping_keys
else None
)
# This is the key change. Don't create a spec for the default window frame
# if there's no ordering. This avoids generating an `ORDER BY NULL` clause.
if window.is_unbounded and not order:
return sge.Window(this=value, partition_by=group_by)
if window.is_unbounded and not include_framing_clauses:
return sge.Window(this=value, partition_by=group_by, order=order)
kind = (
"RANGE" if isinstance(window.bounds, window_spec.RangeWindowBounds) else "ROWS"
)
start: typing.Union[int, float, None] = None
end: typing.Union[int, float, None] = None
if isinstance(window.bounds, window_spec.RangeWindowBounds):
if window.bounds.start is not None:
start = utils.timedelta_to_micros(window.bounds.start)
if window.bounds.end is not None:
end = utils.timedelta_to_micros(window.bounds.end)
elif window.bounds:
start = window.bounds.start
end = window.bounds.end
start_value, start_side = _get_window_bounds(start, is_preceding=True)
end_value, end_side = _get_window_bounds(end, is_preceding=False)
spec = sge.WindowSpec(
kind=kind,
start=start_value,
start_side=start_side,
end=end_value,
end_side=end_side,
over="OVER",
)
return sge.Window(this=value, partition_by=group_by, order=order, spec=spec)
def get_window_order_by(
ordering: typing.Tuple[ordering_spec.OrderingExpression, ...],
override_null_order: bool = False,
) -> typing.Optional[tuple[sge.Ordered, ...]]:
"""Returns the SQL order by clause for a window specification.
Args:
ordering (Tuple[ordering_spec.OrderingExpression, ...]):
A tuple of ordering specification objects.
override_null_order (bool):
If True, overrides BigQuery's default null ordering behavior, which
is sometimes incompatible with ordered aggregations. The generated SQL
will include extra expressions to correctly enforce NULL FIRST/LAST.
"""
if not ordering:
return None
order_by = []
for ordering_spec_item in ordering:
expr = expression_compiler.expression_compiler.compile_expression(
ordering_spec_item.scalar_expression
)
desc = not ordering_spec_item.direction.is_ascending
nulls_first = not ordering_spec_item.na_last
if override_null_order:
is_null_expr = sge.Is(this=expr, expression=sge.Null())
if nulls_first and desc:
order_by.append(
sge.Ordered(
this=is_null_expr,
desc=desc,
nulls_first=nulls_first,
)
)
elif (not nulls_first) and (not desc):
order_by.append(
sge.Ordered(
this=is_null_expr,
desc=desc,
nulls_first=nulls_first,
)
)
order_by.append(
sge.Ordered(
this=expr,
desc=desc,
nulls_first=nulls_first,
)
)
return tuple(order_by)
def remove_null_ordering_for_range_windows(
order_by: typing.Optional[tuple[sge.Ordered, ...]],
) -> typing.Optional[tuple[sge.Ordered, ...]]:
"""Removes NULL FIRST/LAST from ORDER BY expressions in RANGE windows.
Here's the support matrix:
✅ sum(x) over (order by y desc nulls last)
🚫 sum(x) over (order by y asc nulls last)
✅ sum(x) over (order by y asc nulls first)
🚫 sum(x) over (order by y desc nulls first)
"""
if order_by is None:
return None
new_order_by = []
for key in order_by:
kargs = key.args
if kargs.get("desc") is True and kargs.get("nulls_first", False):
kargs["nulls_first"] = False
elif kargs.get("desc") is False and not kargs.setdefault("nulls_first", True):
kargs["nulls_first"] = True
new_order_by.append(sge.Ordered(**kargs))
return tuple(new_order_by)
def _get_window_bounds(
value, is_preceding: bool
) -> tuple[typing.Union[str, sge.Expression], typing.Optional[str]]:
"""Compiles a single boundary value into its SQL components."""
if value is None:
side = "PRECEDING" if is_preceding else "FOLLOWING"
return "UNBOUNDED", side
if value == 0:
return "CURRENT ROW", None
side = "PRECEDING" if value < 0 else "FOLLOWING"
return sge.convert(abs(value)), side
def _compile_group_by_key(key: ex.Expression) -> sge.Expression:
expr = expression_compiler.expression_compiler.compile_expression(key)
# The group_by keys has been rewritten by bind_schema_to_node
assert key.is_scalar_expr and key.is_resolved
# Some types need to be converted to another type to enable groupby
if key.output_type == dtypes.FLOAT_DTYPE:
expr = sge.Cast(this=expr, to="STRING")
elif key.output_type == dtypes.GEO_DTYPE:
expr = sge.func("ST_ASBINARY", expr)
elif key.output_type == dtypes.JSON_DTYPE:
expr = sge.func("TO_JSON_STRING", expr)
return expr