Skip to content

Commit 5e55aaf

Browse files
fixes
1 parent 2aa0de4 commit 5e55aaf

6 files changed

Lines changed: 67 additions & 61 deletions

File tree

packages/bigframes/bigframes/core/bq_data.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,9 @@ def __post_init__(self):
253253
# Optimization field, must be correct if set, don't put maybe-stale number here
254254
n_rows: Optional[int] = None
255255

256+
def with_ordering(self, ordering: orderings.RowOrdering) -> BigqueryDataSource:
257+
return dataclasses.replace(self, ordering=ordering)
258+
256259

257260
_WORKER_TIME_INCREMENT = 0.05
258261

packages/bigframes/bigframes/core/nodes.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -846,10 +846,10 @@ def remap_refs(
846846
) -> ReadTableNode:
847847
return self
848848

849-
def with_order_cols(self):
849+
def pull_out_order(self):
850850
# Maybe the ordering should be required to always be in the scan list, and then we won't need this?
851851
if self.source.ordering is None:
852-
return self, orderings.RowOrdering()
852+
return self, RowOrdering()
853853

854854
order_cols = {col.sql for col in self.source.ordering.referenced_columns}
855855
scan_cols = {col.source_id for col in self.scan_list.items}
@@ -863,10 +863,18 @@ def with_order_cols(self):
863863
]
864864
new_scan_list = ScanList(items=(*self.scan_list.items, *new_scan_cols))
865865
new_order = self.source.ordering.remap_column_refs(
866-
{identifiers.ColumnId(item.source_id): item.id for item in new_scan_cols},
866+
{
867+
identifiers.ColumnId(item.source_id): item.id
868+
for item in new_scan_list.items
869+
},
867870
allow_partial_bindings=True,
868871
)
869-
return dataclasses.replace(self, scan_list=new_scan_list), new_order
872+
new_node = dataclasses.replace(
873+
self,
874+
scan_list=new_scan_list,
875+
source=self.source.with_ordering(RowOrdering()),
876+
)
877+
return new_node, new_order
870878

871879

872880
@dataclasses.dataclass(frozen=True, eq=False)

packages/bigframes/bigframes/core/rewrite/order.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ def pull_up_order_inner(
162162
)
163163
elif isinstance(node, bigframes.core.nodes.ReadTableNode):
164164
if node.source.ordering is not None:
165-
return node.with_order_cols()
165+
return node.pull_out_order()
166166
else:
167167
# No defined ordering
168168
return node, bigframes.core.ordering.RowOrdering()

packages/bigframes/bigframes/session/bq_caching_executor.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import annotations
1616

1717
import concurrent.futures
18+
import dataclasses
1819
import math
1920
import threading
2021
from typing import Literal, Mapping, Optional, Sequence, Tuple
@@ -28,8 +29,8 @@
2829
import bigframes.core
2930
import bigframes.core.events
3031
import bigframes.core.guid
31-
import bigframes.core.ordering
3232
import bigframes.core.nodes as nodes
33+
import bigframes.core.ordering
3334
import bigframes.core.schema as schemata
3435
import bigframes.core.tree_properties as tree_properties
3536
import bigframes.dtypes
@@ -43,14 +44,13 @@
4344
from bigframes.core.compile.sqlglot import sql as sg_sql
4445
from bigframes.core.compile.sqlglot import sqlglot_ir
4546
from bigframes.session import (
47+
direct_gbq_execution,
4648
executor,
4749
loader,
4850
local_scan_executor,
4951
read_api_execution,
5052
semi_executor,
51-
direct_gbq_execution,
5253
)
53-
import dataclasses
5454

5555
# Max complexity that should be executed as a single query
5656
QUERY_COMPLEXITY_LIMIT = 1e7
@@ -189,9 +189,9 @@ def _execute_bigquery(
189189
self._export_result_gcs(results, dest_spec)
190190
return results
191191
elif isinstance(dest_spec, ex_spec.TableOutputSpec):
192-
return self._execute_gbq_table_output(array_value, execution_spec)
192+
return self._execute_gbq_table_export(array_value, execution_spec)
193193
# Force table creation if result might be large (and user explicitly allowed large results)
194-
elif isinstance(dest_spec, ex_spec.EphemeralTableSpec) or dest_spec is None:
194+
elif isinstance(dest_spec, ex_spec.EphemeralTableSpec) or (dest_spec is None):
195195
if not execution_spec.promise_under_10gb:
196196
table = self.storage_manager.create_temp_table(
197197
array_value.schema.to_bigquery()
@@ -202,12 +202,12 @@ def _execute_bigquery(
202202
table=table, if_exists="append"
203203
),
204204
)
205-
# We don't use _execute_gbq_table_output, because we want to skip slower DML path.
205+
# We don't use _execute_gbq_table_export, as this result is internal, not exported.
206206
return self._execute_gbq_query_only(array_value, execution_spec)
207207
# At this point, dst should be unspecified, a specific bq table, or an ephemeral temp table that fits in <10gb
208208
return self._execute_gbq_query_only(array_value, execution_spec)
209209

210-
def _execute_gbq_table_output(
210+
def _execute_gbq_table_export(
211211
self,
212212
array_value: bigframes.core.ArrayValue,
213213
execution_spec: ex_spec.ExecutionSpec,
@@ -224,9 +224,20 @@ def _execute_gbq_table_output(
224224
)
225225
results = self._execute_bigquery(array_value, execution_spec)
226226
self._export_gbq_with_dml(results, dest_spec)
227-
return results
228-
# If not compatible with DML path, just run query with destination unchanged
229-
return self._execute_gbq_query_only(array_value, execution_spec)
227+
result = results
228+
else:
229+
result = self._execute_gbq_query_only(array_value, execution_spec)
230+
231+
has_special_dtype_col = any(
232+
t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE)
233+
for t in array_value.schema.dtypes
234+
)
235+
if dest_spec.if_exists != "append" and has_special_dtype_col:
236+
table = self.bqclient.get_table(dest_spec.table)
237+
table.schema = array_value.schema.to_bigquery()
238+
self.bqclient.update_table(table, ["schema"])
239+
240+
return result
230241

231242
def _execute_gbq_query_only(
232243
self,
@@ -347,7 +358,9 @@ def _execute_to_cached_table(
347358
order_col_id = guid.generate_guid()
348359
plan = nodes.PromoteOffsetsNode(plan, identifiers.ColumnId(order_col_id))
349360
cluster_cols = [order_col_id]
350-
ordering = bigframes.core.ordering.TotalOrdering.from_offset_col(order_col_id)
361+
ordering = bigframes.core.ordering.TotalOrdering.from_offset_col(
362+
order_col_id
363+
)
351364
elif cache_spec.ordering == "order_key":
352365
plan, ordering = rewrite.pull_out_order(plan)
353366
destination_table = self.storage_manager.create_temp_table(
@@ -361,7 +374,7 @@ def _execute_to_cached_table(
361374
if_exists="replace",
362375
)
363376
)
364-
# We don't use _execute_gbq_table_output, because we want to skip slower DML path.
377+
# We don't use _execute_gbq_table_export, as this result is internal, not exported.
365378
result = self._execute_gbq_query_only(arr_value, execution_spec)
366379
result._data = dataclasses.replace(result._data, ordering=ordering)
367380
return result

packages/bigframes/bigframes/session/direct_gbq_execution.py

Lines changed: 20 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,23 @@
1515

1616
from typing import Callable, Literal, Mapping, Optional, Tuple
1717

18+
import google.api_core.exceptions
1819
import google.cloud.bigquery.job as bq_job
1920
import google.cloud.bigquery.table as bq_table
21+
import google.cloud.bigquery_storage_v1
2022
from google.cloud import bigquery
2123

24+
import bigframes.core.compile
2225
import bigframes.core.compile.ibis_compiler.ibis_compiler as ibis_compiler
2326
import bigframes.core.compile.sqlglot.compiler as sqlglot_compiler
2427
import bigframes.core.events
25-
import bigframes.session.metrics
28+
import bigframes.core.schema as schemata
2629
import bigframes.session._io.bigquery as bq_io
30+
import bigframes.session.metrics
31+
from bigframes import exceptions as bfe
2732
from bigframes.core import bq_data, compile, nodes
28-
import bigframes.core.compile
29-
from bigframes.session import executor, semi_executor, execution_spec
3033
from bigframes.core.compile.configs import CompileRequest, CompileResult
31-
from bigframes import exceptions as bfe
32-
import bigframes.core.schema as schemata
33-
import google.cloud.bigquery_storage_v1
34-
35-
import google.api_core.exceptions
36-
34+
from bigframes.session import execution_spec, executor, semi_executor
3735

3836
_WRITE_DISPOSITIONS = {
3937
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
@@ -66,20 +64,14 @@ def execute(
6664
spec: execution_spec.ExecutionSpec,
6765
) -> executor.ExecuteResult:
6866
"""Just execute whatever plan as is, without further caching or decomposition."""
69-
70-
og_schema = plan.schema
71-
compile_request = CompileRequest(
72-
plan,
73-
sort_rows=spec.ordered,
74-
peek_count=spec.peek,
75-
)
76-
7767
compiled = compile.compile_sql(
78-
compile_request, compiler_name=self._compiler_name
68+
CompileRequest(
69+
plan,
70+
sort_rows=spec.ordered,
71+
peek_count=spec.peek,
72+
),
73+
compiler_name=self._compiler_name,
7974
)
80-
# might have more columns than og schema, for hidden ordering columns
81-
compiled_schema = compiled.sql_schema
82-
8375
job_config = bigquery.QueryJobConfig()
8476
dest_spec = spec.destination_spec
8577
cluster_cols = None
@@ -110,18 +102,16 @@ def execute(
110102
)
111103
result_bq_data = None
112104
if query_job and query_job.destination:
113-
# we might add extra sql columns in compilation, esp if caching w ordering, infer a bigframes type for them
114-
result_bf_schema = _result_schema(og_schema, list(compiled.sql_schema))
115105
dst = query_job.destination
116106
result_bq_data = bq_data.BigqueryDataSource(
117107
table=bq_data.GbqNativeTable.from_ref_and_schema(
118108
dst,
119-
tuple(compiled_schema),
109+
tuple(compiled.sql_schema),
120110
cluster_cols=cluster_cols or (),
121111
location=iterator.location or self.bqclient.location,
122112
table_type="TABLE",
123113
),
124-
schema=result_bf_schema,
114+
schema=plan.schema,
125115
ordering=compiled.row_order,
126116
n_rows=iterator.total_rows,
127117
)
@@ -143,26 +133,25 @@ def execute(
143133
project_id=self.bqclient.project,
144134
storage_client=self._bqstoragereadclient,
145135
execution_metadata=execution_metadata,
146-
selected_fields=tuple((col, col) for col in og_schema.names),
136+
selected_fields=tuple((col, col) for col in plan.schema.names),
147137
)
148138
else:
149139
return executor.LocalExecuteResult(
150-
data=iterator.to_arrow().select(og_schema.names),
140+
data=iterator.to_arrow().select(plan.schema.names),
151141
bf_schema=plan.schema,
152142
execution_metadata=execution_metadata,
153143
)
154144

155145
def _run_execute_query(
156146
self,
157147
sql: str,
158-
job_config: Optional[bq_job.QueryJobConfig] = None,
159-
query_with_job: bool = True,
160-
session=None,
148+
job_config: bq_job.QueryJobConfig,
149+
query_with_job: bool,
150+
session,
161151
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
162152
"""
163153
Starts BigQuery query job and waits for results.
164154
"""
165-
job_config = bq_job.QueryJobConfig() if job_config is None else job_config
166155
if bigframes.options.compute.maximum_bytes_billed is not None:
167156
job_config.maximum_bytes_billed = (
168157
bigframes.options.compute.maximum_bytes_billed
@@ -188,13 +177,3 @@ def _run_execute_query(
188177
raise bfe.QueryComplexityError(new_message) from e
189178
else:
190179
raise
191-
192-
193-
def _result_schema(
194-
logical_schema: schemata.ArraySchema, sql_schema: list[bigquery.SchemaField]
195-
) -> schemata.ArraySchema:
196-
inferred_schema = bigframes.dtypes.bf_type_from_type_kind(sql_schema)
197-
inferred_schema.update(logical_schema._mapping)
198-
return schemata.ArraySchema(
199-
tuple(schemata.SchemaItem(col, dtype) for col, dtype in inferred_schema.items())
200-
)

packages/bigframes/tests/system/small/test_session.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,11 @@ def test_read_gbq_tokyo(
114114
df.sort_index(inplace=True)
115115
expected = scalars_pandas_df_index
116116

117-
# don't promise under 10gb, so table creation, and job creation are guaranteed
118117
exec_result = session_tokyo._executor.execute(
119118
df._block.expr,
120-
bigframes.session.execution_spec.ExecutionSpec(promise_under_10gb=False),
119+
bigframes.session.execution_spec.ExecutionSpec(
120+
destination_spec=bigframes.session.execution_spec.EphemeralTableSpec()
121+
),
121122
)
122123
assert exec_result.query_job is not None
123124
assert exec_result.query_job.location == tokyo_location
@@ -948,7 +949,9 @@ def test_read_pandas_tokyo(
948949

949950
result = session_tokyo._executor.execute(
950951
df._block.expr,
951-
bigframes.session.execution_spec.ExecutionSpec(promise_under_10gb=False),
952+
bigframes.session.execution_spec.ExecutionSpec(
953+
destination_spec=bigframes.session.execution_spec.EphemeralTableSpec()
954+
),
952955
)
953956
assert result.query_job is not None
954957
assert result.query_job.location == tokyo_location

0 commit comments

Comments
 (0)