Skip to content

Commit 1f5ec8f

Browse files
fix issues
1 parent b63b00a commit 1f5ec8f

8 files changed

Lines changed: 108 additions & 85 deletions

File tree

packages/bigframes/bigframes/core/rewrite/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from bigframes.core.rewrite.implicit_align import try_row_join
2020
from bigframes.core.rewrite.legacy_align import legacy_join_as_projection
2121
from bigframes.core.rewrite.nullity import simplify_join
22-
from bigframes.core.rewrite.order import bake_order, defer_order
22+
from bigframes.core.rewrite.order import bake_order, defer_order, pull_out_order
2323
from bigframes.core.rewrite.pruning import column_pruning
2424
from bigframes.core.rewrite.scan_reduction import (
2525
try_reduce_to_local_scan,
@@ -50,6 +50,7 @@
5050
"rewrite_range_rolling",
5151
"try_reduce_to_table_scan",
5252
"bake_order",
53+
"pull_out_order",
5354
"try_reduce_to_local_scan",
5455
"fold_row_counts",
5556
"pull_out_window_order",

packages/bigframes/bigframes/core/rewrite/order.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ def bake_order(
4747
return node
4848

4949

50+
def pull_out_order(
51+
node: bigframes.core.nodes.BigFrameNode,
52+
) -> Tuple[bigframes.core.nodes.BigFrameNode, bigframes.core.ordering.RowOrdering]:
53+
return _pull_up_order(node, order_root=False)
54+
55+
5056
# Makes ordering explicit in window definitions
5157
def _pull_up_order(
5258
root: bigframes.core.nodes.BigFrameNode,

packages/bigframes/bigframes/session/bq_caching_executor.py

Lines changed: 40 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import bigframes.session.metrics
4141
import bigframes.session.planner
4242
import bigframes.session.temporary_storage
43-
from bigframes.core import compile, guid, local_data, rewrite
43+
from bigframes.core import bq_data, compile, guid, local_data, rewrite
4444
from bigframes.core.compile.sqlglot import sql as sg_sql
4545
from bigframes.core.compile.sqlglot import sqlglot_ir
4646
from bigframes.session import (
@@ -176,59 +176,61 @@ def _execute_bigquery(
176176
) -> executor.ExecuteResult:
177177
dest_spec = execution_spec.destination_spec
178178
# Recursive handlers for different cases, maybe extract to explicit interface.
179+
if (
180+
isinstance(dest_spec, ex_spec.EphemeralTableSpec)
181+
and not execution_spec.promise_under_10gb
182+
):
183+
# Results over 10GB need to explicitly allocate a table.
184+
execution_spec = dataclasses.replace(
185+
execution_spec, destination_spec=ex_spec.SessionTableSpec()
186+
)
187+
return self._execute_bigquery(array_value, execution_spec)
179188
if isinstance(dest_spec, ex_spec.GcsOutputSpec):
180189
execution_spec = dataclasses.replace(
181-
execution_spec,
182-
destination_spec=ex_spec.TempTableSpec(
183-
cluster_cols=dest_spec.cluster_cols, lifetime="ephemeral"
184-
),
190+
execution_spec, destination_spec=ex_spec.EphemeralTableSpec()
185191
)
186192
results = self._execute_bigquery(array_value, execution_spec)
187193
self._export_result_gcs(results, dest_spec)
188194
return results
189-
if isinstance(dest_spec, ex_spec.TableOutputSpec):
195+
if isinstance(dest_spec, ex_spec.TableOutputSpec) and dest_spec.permit_dml:
190196
# Special DML path - maybe this should be configurable, dml vs query destination has tradeoffs
191197
existing_table = self._maybe_find_existing_table(dest_spec)
192-
if execution_spec.ordered:
193-
raise ValueError("Ordering not supported with table outputs")
194198
if (existing_table is not None) and _is_schema_match(
195199
existing_table.schema, array_value.schema
196200
):
197201
execution_spec = dataclasses.replace(
198-
execution_spec,
199-
destination_spec=ex_spec.TempTableSpec(
200-
cluster_cols=execution_spec.destination_spec.cluster_cols,
201-
lifetime="ephemeral",
202-
),
202+
execution_spec, destination_spec=ex_spec.EphemeralTableSpec()
203203
)
204204
results = self._execute_bigquery(array_value, execution_spec)
205205
self._export_gbq_with_dml(results, dest_spec)
206206
return results
207-
if isinstance(dest_spec, ex_spec.TempTableSpec):
207+
if isinstance(dest_spec, ex_spec.SessionTableSpec):
208208
# "ephemeral" temp tables created in the course of exeuction, don't need to be allocated
209209
# materialized ordering only really makes sense for internal temp tables used by caching
210210
cluster_cols = dest_spec.cluster_cols
211+
# Rewrite plan to materialize ordering as extra columns
211212
plan = array_value.node
212213
if dest_spec.ordering == "offsets_col":
213214
order_col_id = guid.generate_guid()
214215
plan = nodes.PromoteOffsetsNode(plan, order_col_id)
215216
cluster_cols = [order_col_id]
216217
elif dest_spec.ordering == "order_key":
217-
plan = nodes.defer_order(plan, output_hidden_row_keys=True)
218-
if dest_spec.lifetime == "session":
219-
destination_table = self.storage_manager.create_temp_table(
220-
plan.schema, cluster_cols
221-
)
222-
arr_value = bigframes.core.ArrayValue(plan)
223-
execution_spec = dataclasses.replace(
224-
execution_spec,
225-
destination_spec=ex_spec.TableOutputSpec(
226-
table=destination_table,
227-
cluster_cols=dest_spec.cluster_cols,
228-
if_exists="replace",
229-
),
230-
)
231-
return self._execute_bigquery(arr_value, execution_spec)
218+
plan, _ = rewrite.pull_out_order(plan)
219+
destination_table = self.storage_manager.create_temp_table(
220+
plan.schema.to_bigquery(), cluster_cols
221+
)
222+
arr_value = bigframes.core.ArrayValue(plan)
223+
execution_spec = dataclasses.replace(
224+
execution_spec,
225+
destination_spec=ex_spec.TableOutputSpec(
226+
table=destination_table,
227+
cluster_cols=dest_spec.cluster_cols,
228+
if_exists="replace",
229+
# Avoid loops, also dml is mostly used to avoid quotas on user-owned tables
230+
permit_dml=False,
231+
),
232+
)
233+
return self._execute_bigquery(arr_value, execution_spec)
232234

233235
# At this point, dst should be unspecified, a specific bq table, or an ephemeral temp table
234236
# Also, ordering mode will either be none or row-sorted
@@ -405,32 +407,28 @@ def _cache_with_cluster_cols(
405407
]
406408
cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS]
407409
execution_spec = ex_spec.ExecutionSpec(
408-
destination_spec=ex_spec.TempTableSpec(
409-
cluster_cols=tuple(cluster_cols),
410-
lifetime="session",
411-
ordering="order_key",
412-
)
410+
destination_spec=ex_spec.SessionTableSpec(cluster_cols=tuple(cluster_cols))
413411
)
414-
result_bq_data = self.execute(
412+
result = self.execute(
415413
array_value,
416414
execution_spec=execution_spec,
417415
)
418-
assert isinstance(result_bq_data, bigframes.core.BigqueryDataSource)
419-
self.cache.cache_results_table(array_value.node, result_bq_data)
416+
assert isinstance(result, executor.BQTableExecuteResult)
417+
self.cache.cache_results_table(array_value.node, result._data)
420418

421419
def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue):
422420
"""Executes the query and uses the resulting table to rewrite future executions."""
423421
execution_spec = ex_spec.ExecutionSpec(
424-
destination_spec=ex_spec.TempTableSpec(
425-
cluster_cols=(), lifetime="session", ordering="offsets_col"
422+
destination_spec=ex_spec.SessionTableSpec(
423+
cluster_cols=(), ordering="offsets_col"
426424
)
427425
)
428-
result_bq_data = self.execute(
426+
result = self.execute(
429427
array_value,
430428
execution_spec=execution_spec,
431429
)
432-
assert isinstance(result_bq_data, bigframes.core.BigqueryDataSource)
433-
self.cache.cache_results_table(array_value.node, result_bq_data)
430+
assert isinstance(result, executor.BQTableExecuteResult)
431+
self.cache.cache_results_table(array_value.node, result._data)
434432

435433
def _cache_with_session_awareness(
436434
self,

packages/bigframes/bigframes/session/direct_gbq_execution.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,25 +87,25 @@ def execute(
8787

8888
job_config = bigquery.QueryJobConfig()
8989
dest_spec = spec.destination_spec
90-
cluster_cols = ()
90+
cluster_cols = None
91+
can_skip_job = True
9192
if isinstance(dest_spec, execution_spec.TableOutputSpec):
93+
if spec.ordered:
94+
raise ValueError("Ordering not supported with destination table")
9295
job_config.destination = dest_spec.table
9396
job_config.write_disposition = _WRITE_DISPOSITIONS[dest_spec.if_exists]
94-
cluster_cols = dest_spec.cluster_cols
95-
job_config.clustering_fields = dest_spec.cluster_cols
96-
elif (
97-
isinstance(dest_spec, execution_spec.TempTableSpec)
98-
and dest_spec.lifetime == "ephemeral"
99-
):
100-
cluster_cols = dest_spec.cluster_cols
101-
job_config.clustering_fields = dest_spec.cluster_cols
97+
cluster_cols = dest_spec.cluster_cols if dest_spec.cluster_cols else None
98+
job_config.clustering_fields = cluster_cols
99+
can_skip_job = False
100+
elif isinstance(dest_spec, execution_spec.EphemeralTableSpec):
101+
# Need destination table, but jobless execution might not create a destination table
102+
can_skip_job = False
102103
elif dest_spec is not None:
103104
raise ValueError(
104105
f"Direct GBQ Executor does not support destination: {dest_spec}"
105106
)
106107

107108
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
108-
can_skip_job = dest_spec is None and spec.promise_under_10gb
109109
iterator, query_job = self._run_execute_query(
110110
sql=compiled.sql,
111111
job_config=job_config,
@@ -121,7 +121,7 @@ def execute(
121121
table=bq_data.GbqNativeTable.from_ref_and_schema(
122122
dst,
123123
tuple(compiled_schema),
124-
cluster_cols=cluster_cols,
124+
cluster_cols=cluster_cols or (),
125125
location=iterator.location or self.bqclient.location,
126126
table_type="TABLE",
127127
),
@@ -137,7 +137,10 @@ def execute(
137137
hasattr(iterator, "_is_almost_completely_cached")
138138
and iterator._is_almost_completely_cached()
139139
)
140-
if result_bq_data is not None and not result_mostly_cached:
140+
141+
if (isinstance(dest_spec, execution_spec.EphemeralTableSpec)) or (
142+
result_bq_data is not None and not result_mostly_cached
143+
):
141144
return executor.BQTableExecuteResult(
142145
data=result_bq_data,
143146
project_id=self.bqclient.project,

packages/bigframes/bigframes/session/execution_spec.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
@dataclasses.dataclass(frozen=True)
2424
class ExecutionSpec:
2525
# destination for the result of the operation. Executor may also incidentally create other temporary tables for its own purposes.
26-
destination_spec: Union[TableOutputSpec, GcsOutputSpec, TempTableSpec, None] = None
26+
destination_spec: Union[
27+
TableOutputSpec, GcsOutputSpec, EphemeralTableSpec, SessionTableSpec, None
28+
] = None
2729
# If set, the result will be truncated to the given number of rows. Which N rows is
2830
# implementation dependent and not stable.
2931
peek: Optional[int] = None
@@ -36,16 +38,28 @@ class ExecutionSpec:
3638

3739
# Used internally by execution
3840
@dataclasses.dataclass(frozen=True)
39-
class TempTableSpec:
41+
class EphemeralTableSpec:
42+
"""
43+
Specifies that the result of an operation should be a temporary table of some sort.
44+
45+
No guarantees on lifetime, may be a session temp table, or a bq-created temp table with <24hr life.
46+
47+
Used internally when results need temporary staging, because they are large (>10GB), or needed in subsequent operations.
48+
"""
49+
50+
pass
51+
52+
53+
@dataclasses.dataclass(frozen=True)
54+
class SessionTableSpec:
4055
"""
4156
Specifies that the result of an operation should be a session temp table.
4257
The table will be automatically deleted after the session ends.
4358
"""
4459

4560
cluster_cols: tuple[
4661
str, ...
47-
] # if empty, will cluster using order key if ordering_key is set
48-
lifetime: Literal["session", "ephemeral"] = "session"
62+
] = () # if empty, will cluster using order key if ordering_key is set
4963
# Controls ordering and whether extra columns are materialized to preserve ordering
5064
# Any extra columns will be appended to the end of the schema.
5165
# None: ordering may be discarded entirely (ordering metadata will still be provided if ordering is derivable from materialized columns)
@@ -66,6 +80,8 @@ class TableOutputSpec:
6680
table: bigquery.TableReference
6781
cluster_cols: tuple[str, ...]
6882
if_exists: Literal["fail", "replace", "append"] = "fail"
83+
# Allow DML to be used to populate table
84+
permit_dml: bool = True
6985

7086

7187
@dataclasses.dataclass(frozen=True)

packages/bigframes/tests/system/small/engines/conftest.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,31 +44,37 @@ def fake_session() -> Generator[bigframes.Session, None, None]:
4444
with bigframes.core.global_session._GlobalSessionContext(session):
4545
yield session
4646

47+
@pytest.fixture(scope="session")
48+
def sqlglot_engine(bigquery_client: bigquery.Client, bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient):
49+
return direct_gbq_execution.DirectGbqExecutor(
50+
bigquery_client,
51+
bqstoragereadclient=bigquery_storage_read_client,
52+
compiler="sqlglot",
53+
publisher=events.Publisher(),
54+
)
55+
56+
@pytest.fixture(scope="session")
57+
def bq_engine(bigquery_client: bigquery.Client, bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient):
58+
return direct_gbq_execution.DirectGbqExecutor(
59+
bigquery_client,
60+
bqstoragereadclient=bigquery_storage_read_client,
61+
publisher=events.Publisher(),
62+
)
4763

4864
@pytest.fixture(scope="session", params=["pyarrow", "polars", "bq", "bq-sqlglot"])
4965
def engine(
5066
request,
51-
bigquery_client: bigquery.Client,
52-
bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient,
67+
sqlglot_engine,
68+
bq_engine,
5369
) -> semi_executor.SemiExecutor:
5470
if request.param == "pyarrow":
5571
return local_scan_executor.LocalScanExecutor()
5672
if request.param == "polars":
5773
return polars_executor.PolarsExecutor()
58-
publisher = events.Publisher()
5974
if request.param == "bq":
60-
return direct_gbq_execution.DirectGbqExecutor(
61-
bigquery_client,
62-
bqstoragereadclient=bigquery_storage_read_client,
63-
publisher=publisher,
64-
)
75+
return bq_engine
6576
if request.param == "bq-sqlglot":
66-
return direct_gbq_execution.DirectGbqExecutor(
67-
bigquery_client,
68-
bqstoragereadclient=bigquery_storage_read_client,
69-
compiler="sqlglot",
70-
publisher=publisher,
71-
)
77+
return sqlglot_engine
7278
raise ValueError(f"Unrecognized param: {request.param}")
7379

7480

packages/bigframes/tests/system/small/engines/test_windowing.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ def test_engines_with_offsets(
4646
@pytest.mark.parametrize("agg_op", [agg_ops.sum_op, agg_ops.count_op])
4747
def test_engines_with_rows_window(
4848
scalars_array_value: array_value.ArrayValue,
49-
bigquery_client: bigquery.Client,
5049
agg_op,
50+
bq_engine,
51+
sqlglot_engine,
5152
):
5253
window = window_spec.WindowSpec(
5354
bounds=window_spec.RowsWindowBounds.from_window_size(3, "left"),
@@ -62,12 +63,4 @@ def test_engines_with_rows_window(
6263
),
6364
window_spec=window,
6465
)
65-
66-
publisher = events.Publisher()
67-
bq_executor = direct_gbq_execution.DirectGbqExecutor(
68-
bigquery_client, publisher=publisher
69-
)
70-
bq_sqlgot_executor = direct_gbq_execution.DirectGbqExecutor(
71-
bigquery_client, compiler="sqlglot", publisher=publisher
72-
)
73-
assert_equivalence_execution(window_node, bq_executor, bq_sqlgot_executor)
66+
assert_equivalence_execution(window_node, bq_engine, sqlglot_engine)

packages/bigframes/tests/system/small/test_session.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def test_read_gbq_tokyo(
118118
exec_result = session_tokyo._executor.execute(
119119
df._block.expr,
120120
bigframes.session.execution_spec.ExecutionSpec(
121-
bigframes.session.execution_spec.TempTableSpec(()), promise_under_10gb=False
121+
bigframes.session.execution_spec.EphemeralTableSpec(), promise_under_10gb=False
122122
),
123123
)
124124
assert exec_result.query_job is not None
@@ -951,7 +951,7 @@ def test_read_pandas_tokyo(
951951
result = session_tokyo._executor.execute(
952952
df._block.expr,
953953
bigframes.session.execution_spec.ExecutionSpec(
954-
bigframes.session.execution_spec.TempTableSpec(()), promise_under_10gb=False
954+
bigframes.session.execution_spec.EphemeralTableSpec(), promise_under_10gb=False
955955
),
956956
)
957957
assert result.query_job is not None

0 commit comments

Comments
 (0)