Skip to content

Commit a4bc576

Browse files
refactor: Unify gbq execution as semi-executor (#16770)
1 parent a10a813 commit a4bc576

31 files changed

Lines changed: 569 additions & 570 deletions

packages/bigframes/bigframes/blob/_functions.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,14 @@ def _create_udf(self):
103103
\"\"\"
104104
"""
105105

106-
bf_io_bigquery.start_query_with_client(
106+
bf_io_bigquery.start_query_with_job(
107107
self._session.bqclient,
108108
sql,
109109
job_config=bigquery.QueryJobConfig(),
110110
metrics=self._session._metrics,
111111
location=None,
112112
project=None,
113113
timeout=None,
114-
query_with_job=True,
115114
publisher=self._session._publisher,
116115
)
117116

packages/bigframes/bigframes/core/bq_data.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,9 @@ def __post_init__(self):
253253
# Optimization field, must be correct if set, don't put maybe-stale number here
254254
n_rows: Optional[int] = None
255255

256+
def with_ordering(self, ordering: orderings.RowOrdering) -> BigqueryDataSource:
257+
return dataclasses.replace(self, ordering=ordering)
258+
256259

257260
_WORKER_TIME_INCREMENT = 0.05
258261

packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def from_table(
177177
project_id: str,
178178
dataset_id: str,
179179
table_id: str,
180-
uid_gen: guid.SequentialUIDGenerator,
180+
uid_gen: guid.SequentialUIDGenerator | None = None,
181181
columns: typing.Sequence[str] = (),
182182
sql_predicate: typing.Optional[str] = None,
183183
system_time: typing.Optional[datetime.datetime] = None,
@@ -202,6 +202,8 @@ def from_table(
202202
if system_time
203203
else None
204204
)
205+
if uid_gen is None:
206+
uid_gen = guid.SequentialUIDGenerator()
205207
table_alias = next(uid_gen.get_uid_stream("bft_"))
206208
table_expr = sge.Table(
207209
this=sql.identifier(table_id),

packages/bigframes/bigframes/core/nodes.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -846,10 +846,10 @@ def remap_refs(
846846
) -> ReadTableNode:
847847
return self
848848

849-
def with_order_cols(self):
849+
def pull_out_order(self):
850850
# Maybe the ordering should be required to always be in the scan list, and then we won't need this?
851851
if self.source.ordering is None:
852-
return self, orderings.RowOrdering()
852+
return self, RowOrdering()
853853

854854
order_cols = {col.sql for col in self.source.ordering.referenced_columns}
855855
scan_cols = {col.source_id for col in self.scan_list.items}
@@ -863,10 +863,18 @@ def with_order_cols(self):
863863
]
864864
new_scan_list = ScanList(items=(*self.scan_list.items, *new_scan_cols))
865865
new_order = self.source.ordering.remap_column_refs(
866-
{identifiers.ColumnId(item.source_id): item.id for item in new_scan_cols},
866+
{
867+
identifiers.ColumnId(item.source_id): item.id
868+
for item in new_scan_list.items
869+
},
867870
allow_partial_bindings=True,
868871
)
869-
return dataclasses.replace(self, scan_list=new_scan_list), new_order
872+
new_node = dataclasses.replace(
873+
self,
874+
scan_list=new_scan_list,
875+
source=self.source.with_ordering(RowOrdering()),
876+
)
877+
return new_node, new_order
870878

871879

872880
@dataclasses.dataclass(frozen=True, eq=False)

packages/bigframes/bigframes/core/rewrite/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from bigframes.core.rewrite.implicit_align import try_row_join
2020
from bigframes.core.rewrite.legacy_align import legacy_join_as_projection
2121
from bigframes.core.rewrite.nullity import simplify_join
22-
from bigframes.core.rewrite.order import bake_order, defer_order
22+
from bigframes.core.rewrite.order import bake_order, defer_order, pull_out_order
2323
from bigframes.core.rewrite.pruning import column_pruning
2424
from bigframes.core.rewrite.scan_reduction import (
2525
try_reduce_to_local_scan,
@@ -50,6 +50,7 @@
5050
"rewrite_range_rolling",
5151
"try_reduce_to_table_scan",
5252
"bake_order",
53+
"pull_out_order",
5354
"try_reduce_to_local_scan",
5455
"fold_row_counts",
5556
"pull_out_window_order",

packages/bigframes/bigframes/core/rewrite/order.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,15 @@ def bake_order(
4747
return node
4848

4949

50+
def pull_out_order(
51+
node: bigframes.core.nodes.BigFrameNode,
52+
) -> Tuple[bigframes.core.nodes.BigFrameNode, bigframes.core.ordering.RowOrdering]:
53+
import bigframes.core.rewrite.slices
54+
55+
node = node.bottom_up(bigframes.core.rewrite.slices.rewrite_slice)
56+
return _pull_up_order(node, order_root=True)
57+
58+
5059
# Makes ordering explicit in window definitions
5160
def _pull_up_order(
5261
root: bigframes.core.nodes.BigFrameNode,
@@ -153,7 +162,7 @@ def pull_up_order_inner(
153162
)
154163
elif isinstance(node, bigframes.core.nodes.ReadTableNode):
155164
if node.source.ordering is not None:
156-
return node.with_order_cols()
165+
return node.pull_out_order()
157166
else:
158167
# No defined ordering
159168
return node, bigframes.core.ordering.RowOrdering()
@@ -272,7 +281,7 @@ def pull_up_order_inner(
272281
offsets_id
273282
)
274283
return new_explode, child_order.join(inner_order)
275-
raise ValueError(f"Unexpected node: {node}")
284+
raise ValueError(f"Unexpected node type {type(node).__name__}")
276285

277286
def pull_order_concat(
278287
node: bigframes.core.nodes.ConcatNode,

packages/bigframes/bigframes/functions/_function_client.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,15 +142,14 @@ def _create_bq_function(self, create_function_ddl: str) -> None:
142142
# TODO(swast): plumb through the original, user-facing api_name.
143143
import bigframes.session._io.bigquery
144144

145-
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
145+
_, query_job = bigframes.session._io.bigquery.start_query_with_job(
146146
cast(bigquery.Client, self._session.bqclient),
147147
create_function_ddl,
148148
job_config=bigquery.QueryJobConfig(),
149149
location=None,
150150
project=None,
151151
timeout=None,
152152
metrics=None,
153-
query_with_job=True,
154153
publisher=self._session._publisher,
155154
)
156155
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")

packages/bigframes/bigframes/functions/function.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,10 +196,9 @@ def __call__(self, *args, **kwargs):
196196

197197
args_string = ", ".join([sg_sql.to_sql(sg_sql.literal(v)) for v in args])
198198
sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})"
199-
iter, job = bf_io_bigquery.start_query_with_client(
199+
iter, job = bf_io_bigquery.start_query_with_job(
200200
self._session.bqclient,
201201
sql=sql,
202-
query_with_job=True,
203202
job_config=bigquery.QueryJobConfig(),
204203
publisher=self._session._publisher,
205204
) # type: ignore

packages/bigframes/bigframes/session/__init__.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2310,15 +2310,14 @@ def _start_query_ml_ddl(
23102310
# so we must reset any encryption set in the job config
23112311
# https://cloud.google.com/bigquery/docs/customer-managed-encryption#encrypt-model
23122312
job_config.destination_encryption_configuration = None
2313-
iterator, query_job = bf_io_bigquery.start_query_with_client(
2313+
iterator, query_job = bf_io_bigquery.start_query_with_job(
23142314
self.bqclient,
23152315
sql,
23162316
job_config=job_config,
23172317
metrics=self._metrics,
23182318
location=None,
23192319
project=None,
23202320
timeout=None,
2321-
query_with_job=True,
23222321
job_retry=third_party_gcb_retry.DEFAULT_ML_JOB_RETRY,
23232322
publisher=self._publisher,
23242323
session=self,
@@ -2340,15 +2339,14 @@ def _create_object_table(self, path: str, connection: str) -> str:
23402339
uris = ['{path}']);
23412340
"""
23422341
)
2343-
bf_io_bigquery.start_query_with_client(
2342+
bf_io_bigquery.start_query_with_job(
23442343
self.bqclient,
23452344
sql,
23462345
job_config=bigquery.QueryJobConfig(),
23472346
metrics=self._metrics,
23482347
location=None,
23492348
project=None,
23502349
timeout=None,
2351-
query_with_job=True,
23522350
publisher=self._publisher,
23532351
session=self,
23542352
)

0 commit comments

Comments
 (0)