Skip to content

Commit 186fe3a

Browse files
more fixes
1 parent 1f5ec8f commit 186fe3a

5 files changed

Lines changed: 28 additions & 31 deletions

File tree

packages/bigframes/bigframes/core/rewrite/order.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ def bake_order(
5050
def pull_out_order(
5151
node: bigframes.core.nodes.BigFrameNode,
5252
) -> Tuple[bigframes.core.nodes.BigFrameNode, bigframes.core.ordering.RowOrdering]:
53-
return _pull_up_order(node, order_root=False)
53+
import bigframes.core.rewrite.slices
54+
node = node.bottom_up(bigframes.core.rewrite.slices.rewrite_slice)
55+
return _pull_up_order(node, order_root=True)
5456

5557

5658
# Makes ordering explicit in window definitions
@@ -273,7 +275,7 @@ def pull_up_order_inner(
273275
offsets_id
274276
)
275277
return new_explode, child_order.join(inner_order)
276-
raise ValueError(f"Unexpected node: {node}")
278+
raise ValueError(f"Unexpected node type {type(node).__name__}")
277279

278280
def pull_order_concat(
279281
node: bigframes.core.nodes.ConcatNode,

packages/bigframes/bigframes/session/bq_caching_executor.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import bigframes.core
2929
import bigframes.core.events
3030
import bigframes.core.guid
31-
import bigframes.core.identifiers
3231
import bigframes.core.ordering
3332
import bigframes.core.nodes as nodes
3433
import bigframes.core.schema as schemata
@@ -40,7 +39,7 @@
4039
import bigframes.session.metrics
4140
import bigframes.session.planner
4241
import bigframes.session.temporary_storage
43-
from bigframes.core import bq_data, compile, guid, local_data, rewrite
42+
from bigframes.core import bq_data, compile, guid, identifiers, local_data, rewrite
4443
from bigframes.core.compile.sqlglot import sql as sg_sql
4544
from bigframes.core.compile.sqlglot import sqlglot_ir
4645
from bigframes.session import (
@@ -176,23 +175,14 @@ def _execute_bigquery(
176175
) -> executor.ExecuteResult:
177176
dest_spec = execution_spec.destination_spec
178177
# Recursive handlers for different cases, maybe extract to explicit interface.
179-
if (
180-
isinstance(dest_spec, ex_spec.EphemeralTableSpec)
181-
and not execution_spec.promise_under_10gb
182-
):
183-
# Results over 10GB need to explicitly allocate a table.
184-
execution_spec = dataclasses.replace(
185-
execution_spec, destination_spec=ex_spec.SessionTableSpec()
186-
)
187-
return self._execute_bigquery(array_value, execution_spec)
188178
if isinstance(dest_spec, ex_spec.GcsOutputSpec):
189179
execution_spec = dataclasses.replace(
190180
execution_spec, destination_spec=ex_spec.EphemeralTableSpec()
191181
)
192182
results = self._execute_bigquery(array_value, execution_spec)
193183
self._export_result_gcs(results, dest_spec)
194184
return results
195-
if isinstance(dest_spec, ex_spec.TableOutputSpec) and dest_spec.permit_dml:
185+
elif isinstance(dest_spec, ex_spec.TableOutputSpec) and dest_spec.permit_dml:
196186
# Special DML path - maybe this should be configurable, dml vs query destination has tradeoffs
197187
existing_table = self._maybe_find_existing_table(dest_spec)
198188
if (existing_table is not None) and _is_schema_match(
@@ -204,18 +194,18 @@ def _execute_bigquery(
204194
results = self._execute_bigquery(array_value, execution_spec)
205195
self._export_gbq_with_dml(results, dest_spec)
206196
return results
207-
if isinstance(dest_spec, ex_spec.SessionTableSpec):
197+
elif isinstance(dest_spec, ex_spec.SessionTableSpec):
208198
# "ephemeral" temp tables created in the course of exeuction, don't need to be allocated
209199
# materialized ordering only really makes sense for internal temp tables used by caching
210200
cluster_cols = dest_spec.cluster_cols
211201
# Rewrite plan to materialize ordering as extra columns
212202
plan = array_value.node
213203
if dest_spec.ordering == "offsets_col":
214204
order_col_id = guid.generate_guid()
215-
plan = nodes.PromoteOffsetsNode(plan, order_col_id)
205+
plan = nodes.PromoteOffsetsNode(plan, identifiers.ColumnId(order_col_id))
216206
cluster_cols = [order_col_id]
217207
elif dest_spec.ordering == "order_key":
218-
plan, _ = rewrite.pull_out_order(plan)
208+
plan, ordering = rewrite.pull_out_order(plan)
219209
destination_table = self.storage_manager.create_temp_table(
220210
plan.schema.to_bigquery(), cluster_cols
221211
)
@@ -230,7 +220,16 @@ def _execute_bigquery(
230220
permit_dml=False,
231221
),
232222
)
233-
return self._execute_bigquery(arr_value, execution_spec)
223+
result = self._execute_bigquery(arr_value, execution_spec)
224+
result._data = dataclasses.replace(result._data, ordering=ordering)
225+
return result
226+
# Force table creation if result might be large (and user explicitly allowed large results)
227+
elif isinstance(dest_spec, ex_spec.EphemeralTableSpec) or dest_spec is None:
228+
if not execution_spec.promise_under_10gb:
229+
execution_spec = dataclasses.replace(
230+
execution_spec, destination_spec=ex_spec.SessionTableSpec()
231+
)
232+
return self._execute_bigquery(array_value, execution_spec)
234233

235234
# At this point, dst should be unspecified, a specific bq table, or an ephemeral temp table
236235
# Also, ordering mode will either be none or row-sorted
@@ -315,6 +314,7 @@ def _export_gbq_with_dml(
315314
job_config=bigquery.QueryJobConfig(),
316315
metrics=self.metrics,
317316
publisher=self._publisher,
317+
query_with_job=True,
318318
)
319319

320320
def dry_run(
@@ -407,13 +407,14 @@ def _cache_with_cluster_cols(
407407
]
408408
cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS]
409409
execution_spec = ex_spec.ExecutionSpec(
410-
destination_spec=ex_spec.SessionTableSpec(cluster_cols=tuple(cluster_cols))
410+
destination_spec=ex_spec.SessionTableSpec(cluster_cols=tuple(cluster_cols), ordering="order_key")
411411
)
412412
result = self.execute(
413413
array_value,
414414
execution_spec=execution_spec,
415415
)
416416
assert isinstance(result, executor.BQTableExecuteResult)
417+
assert result._data.ordering is not None
417418
self.cache.cache_results_table(array_value.node, result._data)
418419

419420
def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue):
@@ -428,6 +429,7 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue):
428429
execution_spec=execution_spec,
429430
)
430431
assert isinstance(result, executor.BQTableExecuteResult)
432+
assert result._data.ordering is not None
431433
self.cache.cache_results_table(array_value.node, result._data)
432434

433435
def _cache_with_session_awareness(

packages/bigframes/bigframes/session/direct_gbq_execution.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,6 @@ def execute(
9090
cluster_cols = None
9191
can_skip_job = True
9292
if isinstance(dest_spec, execution_spec.TableOutputSpec):
93-
if spec.ordered:
94-
raise ValueError("Ordering not supported with destination table")
9593
job_config.destination = dest_spec.table
9694
job_config.write_disposition = _WRITE_DISPOSITIONS[dest_spec.if_exists]
9795
cluster_cols = dest_spec.cluster_cols if dest_spec.cluster_cols else None

packages/bigframes/tests/system/small/engines/test_aggregation.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,19 +128,14 @@ def test_engines_unary_variance_aggregates(
128128
def test_sql_engines_median_op_aggregates(
129129
scalars_array_value: array_value.ArrayValue,
130130
bigquery_client: bigquery.Client,
131+
bq_engine,
132+
sqlglot_engine,
131133
):
132134
node = apply_agg_to_all_valid(
133135
scalars_array_value,
134136
agg_ops.MedianOp(),
135137
).node
136-
publisher = events.Publisher()
137-
left_engine = direct_gbq_execution.DirectGbqExecutor(
138-
bigquery_client, publisher=publisher
139-
)
140-
right_engine = direct_gbq_execution.DirectGbqExecutor(
141-
bigquery_client, compiler="sqlglot", publisher=publisher
142-
)
143-
assert_equivalence_execution(node, left_engine, right_engine)
138+
assert_equivalence_execution(node, bq_engine, sqlglot_engine)
144139

145140

146141
@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True)

packages/bigframes/tests/system/small/engines/test_sorting.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import bigframes.operations as bf_ops
1818
from bigframes.core import array_value, nodes, ordering
19-
from bigframes.session import polars_executor
19+
from bigframes.session import polars_executor, execution_spec
2020
from bigframes.testing.engine_utils import assert_equivalence_execution
2121

2222
pytest.importorskip("polars")
@@ -96,7 +96,7 @@ def test_polars_engines_skips_unrecognized_order_expr(
9696
),
9797
)
9898
node = nodes.OrderByNode(node, ORDER_EXPRESSIONS)
99-
assert engine.execute(node, ordered=True) is None
99+
assert engine.execute(node, execution_spec.ExecutionSpec(ordered=True)) is None
100100

101101

102102
def apply_reverse(node: nodes.BigFrameNode) -> nodes.BigFrameNode:

0 commit comments

Comments
 (0)