Skip to content

Commit f149bd7

Browse files
refactor(bigframes): Modularize compiler routing as proxy executor (googleapis#16907)
1 parent 2bed78e commit f149bd7

6 files changed

Lines changed: 468 additions & 237 deletions

File tree

packages/bigframes/bigframes/session/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
from bigframes import version
8181
from bigframes.core import blocks, utils
8282
from bigframes.core.logging import log_adapter
83-
from bigframes.session import bigquery_session, bq_caching_executor, executor
83+
from bigframes.session import bigquery_session, executor, proxy_executor
8484

8585
# Avoid circular imports.
8686
if typing.TYPE_CHECKING:
@@ -327,15 +327,15 @@ def __init__(
327327
if not self._strictly_ordered:
328328
labels["bigframes-mode"] = "unordered"
329329

330-
self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor(
330+
self._executor: executor.Executor = proxy_executor.DualCompilerProxyExecutor(
331331
bqclient=self._clients_provider.bqclient,
332332
bqstoragereadclient=self._clients_provider.bqstoragereadclient,
333333
loader=self._loader,
334334
storage_manager=self._temp_storage_manager,
335335
metrics=self._metrics,
336336
enable_polars_execution=context.enable_polars_execution,
337337
publisher=self._publisher,
338-
labels=labels,
338+
labels=tuple(labels.items()),
339339
)
340340

341341
def __del__(self):

packages/bigframes/bigframes/session/bq_caching_executor.py

Lines changed: 88 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,12 @@
1717
import concurrent.futures
1818
import math
1919
import threading
20-
import uuid
21-
import warnings
2220
from typing import Literal, Mapping, Optional, Sequence, Tuple
2321

2422
import google.api_core.exceptions
2523
import google.cloud.bigquery.job as bq_job
2624
import google.cloud.bigquery.table as bq_table
2725
import google.cloud.bigquery_storage_v1
28-
import google.cloud.exceptions
2926
from google.cloud import bigquery
3027

3128
import bigframes
@@ -83,11 +80,15 @@ def __init__(
8380
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
8481
enable_polars_execution: bool = False,
8582
publisher: bigframes.core.events.Publisher,
86-
labels: Mapping[str, str] = {},
83+
labels: tuple[tuple[str, str], ...] = (),
84+
compiler_name: Literal["ibis", "sqlglot"] = "sqlglot",
85+
cache: Optional[execution_cache.ExecutionCache] = None,
8786
):
8887
self.bqclient = bqclient
8988
self.storage_manager = storage_manager
90-
self.cache: execution_cache.ExecutionCache = execution_cache.ExecutionCache()
89+
self.cache: execution_cache.ExecutionCache = (
90+
cache or execution_cache.ExecutionCache()
91+
)
9192
self.metrics = metrics
9293
self.loader = loader
9394
self.bqstoragereadclient = bqstoragereadclient
@@ -111,6 +112,7 @@ def __init__(
111112
polars_executor.PolarsExecutor(),
112113
)
113114
self._upload_lock = threading.Lock()
115+
self._compiler_name = compiler_name
114116

115117
def to_sql(
116118
self,
@@ -127,7 +129,10 @@ def to_sql(
127129
else array_value.node
128130
)
129131
node = self._substitute_large_local_sources(node)
130-
compiled = self._compile(node, ordered=ordered)
132+
compiled = compile.compile_sql(
133+
compile.CompileRequest(node, sort_rows=ordered),
134+
compiler_name=self._compiler_name,
135+
)
131136
return compiled.sql
132137

133138
def execute(
@@ -158,7 +163,11 @@ def execute(
158163
"Ordering and peeking not supported for gbq export"
159164
)
160165
# separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml
161-
result = self._export_gbq(array_value, execution_spec.destination_spec)
166+
result = self._export_gbq(
167+
array_value,
168+
execution_spec.destination_spec,
169+
extra_labels=execution_spec.labels,
170+
)
162171
self._publisher.publish(
163172
bigframes.core.events.ExecutionFinished(
164173
result=result,
@@ -174,6 +183,7 @@ def execute(
174183
if isinstance(execution_spec.destination_spec, ex_spec.CacheSpec)
175184
else None,
176185
must_create_table=not execution_spec.promise_under_10gb,
186+
extra_labels=execution_spec.labels,
177187
)
178188
# post steps: export
179189
if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec):
@@ -233,7 +243,10 @@ def _maybe_find_existing_table(
233243
return None
234244

235245
def _export_gbq(
236-
self, array_value: bigframes.core.ArrayValue, spec: ex_spec.TableOutputSpec
246+
self,
247+
array_value: bigframes.core.ArrayValue,
248+
spec: ex_spec.TableOutputSpec,
249+
extra_labels: tuple[tuple[str, str], ...] = (),
237250
) -> executor.ExecuteResult:
238251
"""
239252
Export the ArrayValue to an existing BigQuery table.
@@ -243,55 +256,48 @@ def _export_gbq(
243256
# validate destination table
244257
existing_table = self._maybe_find_existing_table(spec)
245258

246-
def run_with_compiler(compiler_name, compiler_id=None):
247-
compiled = self._compile(plan, ordered=False, compiler_name=compiler_name)
248-
sql = compiled.sql
259+
compiled = compile.compile_sql(
260+
compile.CompileRequest(plan, sort_rows=False),
261+
compiler_name=self._compiler_name,
262+
)
263+
sql = compiled.sql
249264

250-
if (existing_table is not None) and _is_schema_match(
251-
existing_table.schema, array_value.schema
252-
):
253-
# b/409086472: Uses DML for table appends and replacements to avoid
254-
# BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits:
255-
# https://cloud.google.com/bigquery/quotas#standard_tables
256-
job_config = bigquery.QueryJobConfig()
257-
258-
ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql)
259-
if spec.if_exists == "append":
260-
sql = sg_sql.to_sql(
261-
sg_sql.insert(ir.expr.as_select_all(), spec.table)
262-
)
263-
else: # for "replace"
264-
assert spec.if_exists == "replace"
265-
sql = sg_sql.to_sql(
266-
sg_sql.replace(ir.expr.as_select_all(), spec.table)
267-
)
268-
else:
269-
dispositions = {
270-
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
271-
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
272-
"append": bigquery.WriteDisposition.WRITE_APPEND,
273-
}
274-
job_config = bigquery.QueryJobConfig(
275-
write_disposition=dispositions[spec.if_exists],
276-
destination=spec.table,
277-
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
278-
)
265+
if (existing_table is not None) and _is_schema_match(
266+
existing_table.schema, array_value.schema
267+
):
268+
# b/409086472: Uses DML for table appends and replacements to avoid
269+
# BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits:
270+
# https://cloud.google.com/bigquery/quotas#standard_tables
271+
job_config = bigquery.QueryJobConfig()
279272

280-
# Attach data type usage to the job labels
281-
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
282-
job_config.labels["bigframes-compiler"] = (
283-
f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name
284-
)
285-
# TODO(swast): plumb through the api_name of the user-facing api that
286-
# caused this query.
287-
iterator, job = self._run_execute_query(
288-
sql=sql,
289-
job_config=job_config,
290-
session=array_value.session,
273+
ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql)
274+
if spec.if_exists == "append":
275+
sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table))
276+
else: # for "replace"
277+
assert spec.if_exists == "replace"
278+
sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table))
279+
else:
280+
dispositions = {
281+
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
282+
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
283+
"append": bigquery.WriteDisposition.WRITE_APPEND,
284+
}
285+
job_config = bigquery.QueryJobConfig(
286+
write_disposition=dispositions[spec.if_exists],
287+
destination=spec.table,
288+
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
291289
)
292-
return iterator, job
293290

294-
iterator, job = self._compile_with_fallback(run_with_compiler)
291+
# Attach data type usage to the job labels
292+
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
293+
# TODO(swast): plumb through the api_name of the user-facing api that
294+
# caused this query.
295+
iterator, job = self._run_execute_query(
296+
sql=sql,
297+
job_config=job_config,
298+
session=array_value.session,
299+
extra_labels=extra_labels,
300+
)
295301

296302
has_special_dtype_col = any(
297303
t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE)
@@ -359,6 +365,7 @@ def _run_execute_query(
359365
job_config: Optional[bq_job.QueryJobConfig] = None,
360366
query_with_job: bool = True,
361367
session=None,
368+
extra_labels: tuple[tuple[str, str], ...] = (),
362369
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
363370
"""
364371
Starts BigQuery query job and waits for results.
@@ -371,6 +378,8 @@ def _run_execute_query(
371378

372379
if self._labels:
373380
job_config.labels.update(self._labels)
381+
if extra_labels:
382+
job_config.labels.update(extra_labels)
374383

375384
try:
376385
# Trick the type checker into thinking we got a literal.
@@ -420,43 +429,6 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue):
420429
self.prepare_plan(array_value.node)
421430
)
422431

423-
def _compile(
424-
self,
425-
node: nodes.BigFrameNode,
426-
*,
427-
ordered: bool = False,
428-
peek: Optional[int] = None,
429-
materialize_all_order_keys: bool = False,
430-
compiler_name: Literal["sqlglot", "ibis"] = "sqlglot",
431-
) -> compile.CompileResult:
432-
return compile.compile_sql(
433-
compile.CompileRequest(
434-
node,
435-
sort_rows=ordered,
436-
peek_count=peek,
437-
materialize_all_order_keys=materialize_all_order_keys,
438-
),
439-
compiler_name=compiler_name,
440-
)
441-
442-
def _compile_with_fallback(self, run_fn):
443-
compiler_option = bigframes.options.experiments.sql_compiler
444-
if compiler_option == "legacy":
445-
return run_fn("ibis")
446-
elif compiler_option == "experimental":
447-
return run_fn("sqlglot")
448-
else: # stable
449-
compiler_id = f"{uuid.uuid1().hex[:12]}"
450-
try:
451-
return run_fn("sqlglot", compiler_id=compiler_id)
452-
except google.cloud.exceptions.BadRequest as e:
453-
msg = bfe.format_message(
454-
f"Compiler ID {compiler_id}: BadRequest on sqlglot. "
455-
f"Falling back to ibis. Details: {e.message}"
456-
)
457-
warnings.warn(msg, category=UserWarning)
458-
return run_fn("ibis", compiler_id=compiler_id)
459-
460432
def prepare_plan(
461433
self,
462434
plan: nodes.BigFrameNode,
@@ -622,6 +594,7 @@ def _execute_plan_gbq(
622594
peek: Optional[int] = None,
623595
cache_spec: Optional[ex_spec.CacheSpec] = None,
624596
must_create_table: bool = True,
597+
extra_labels: tuple[tuple[str, str], ...] = (),
625598
) -> executor.ExecuteResult:
626599
"""Just execute whatever plan as is, without further caching or decomposition."""
627600
# TODO(swast): plumb through the api_name of the user-facing api that
@@ -652,43 +625,36 @@ def _execute_plan_gbq(
652625
]
653626
cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS]
654627

655-
def run_with_compiler(compiler_name, compiler_id=None):
656-
compiled = self._compile(
628+
compiled = compile.compile_sql(
629+
compile.CompileRequest(
657630
plan,
658-
ordered=ordered,
659-
peek=peek,
631+
sort_rows=ordered,
632+
peek_count=peek,
660633
materialize_all_order_keys=(cache_spec is not None),
661-
compiler_name=compiler_name,
662-
)
663-
# might have more columns than og schema, for hidden ordering columns
664-
compiled_schema = compiled.sql_schema
665-
666-
destination_table: Optional[bigquery.TableReference] = None
634+
),
635+
compiler_name=self._compiler_name,
636+
)
637+
# might have more columns than og schema, for hidden ordering columns
638+
compiled_schema = compiled.sql_schema
667639

668-
job_config = bigquery.QueryJobConfig()
669-
if create_table:
670-
destination_table = self.storage_manager.create_temp_table(
671-
compiled_schema, cluster_cols
672-
)
673-
job_config.destination = destination_table
640+
destination_table: Optional[bigquery.TableReference] = None
674641

675-
# Attach data type usage to the job labels
676-
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
677-
job_config.labels["bigframes-compiler"] = (
678-
f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name
679-
)
680-
iterator, query_job = self._run_execute_query(
681-
sql=compiled.sql,
682-
job_config=job_config,
683-
query_with_job=(destination_table is not None),
684-
session=plan.session,
642+
job_config = bigquery.QueryJobConfig()
643+
if create_table:
644+
destination_table = self.storage_manager.create_temp_table(
645+
compiled_schema, cluster_cols
685646
)
686-
return iterator, query_job, compiled
687-
688-
iterator, query_job, compiled = self._compile_with_fallback(run_with_compiler)
689-
690-
# might have more columns than og schema, for hidden ordering columns
691-
compiled_schema = compiled.sql_schema
647+
job_config.destination = destination_table
648+
649+
# Attach data type usage to the job labels
650+
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
651+
iterator, query_job = self._run_execute_query(
652+
sql=compiled.sql,
653+
job_config=job_config,
654+
query_with_job=(destination_table is not None),
655+
session=plan.session,
656+
extra_labels=extra_labels,
657+
)
692658

693659
# we could actually cache even when caching is not explicitly requested, but being conservative for now
694660
result_bq_data = None

packages/bigframes/bigframes/session/execution_spec.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from __future__ import annotations
1616

1717
import dataclasses
18-
from typing import Literal, Optional, Union
18+
from typing import Literal, Mapping, Optional, Union
1919

2020
from google.cloud import bigquery
2121

@@ -30,6 +30,11 @@ class ExecutionSpec:
3030
# This is an optimization flag for gbq execution, it doesn't change semantics, but if promise is falsely made, errors may occur
3131
promise_under_10gb: bool = False
3232

33+
labels: tuple[tuple[str, str], ...] = ()
34+
35+
def add_labels(self, labels: Mapping[str, str]) -> ExecutionSpec:
36+
return dataclasses.replace(self, labels=self.labels + tuple(labels.items()))
37+
3338

3439
# This one is temporary, in future, caching will not be done through immediate execution, but will label nodes
3540
# that will be cached only when a super-tree is executed

0 commit comments

Comments
 (0)