Skip to content

Commit 076cb98

Browse files
refactor: Unify gbq execution as semi-executor
1 parent c422cbb commit 076cb98

9 files changed

Lines changed: 326 additions & 356 deletions

File tree

packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def from_table(
177177
project_id: str,
178178
dataset_id: str,
179179
table_id: str,
180-
uid_gen: guid.SequentialUIDGenerator,
180+
uid_gen: guid.SequentialUIDGenerator | None = None,
181181
columns: typing.Sequence[str] = (),
182182
sql_predicate: typing.Optional[str] = None,
183183
system_time: typing.Optional[datetime.datetime] = None,
@@ -202,6 +202,8 @@ def from_table(
202202
if system_time
203203
else None
204204
)
205+
if uid_gen is None:
206+
uid_gen = guid.SequentialUIDGenerator()
205207
table_alias = next(uid_gen.get_uid_stream("bft_"))
206208
table_expr = sge.Table(
207209
this=sql.identifier(table_id),

packages/bigframes/bigframes/session/bq_caching_executor.py

Lines changed: 130 additions & 289 deletions
Large diffs are not rendered by default.

packages/bigframes/bigframes/session/direct_gbq_execution.py

Lines changed: 142 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16-
from typing import Literal, Optional, Tuple
16+
from typing import Callable, Literal, Optional, Tuple
1717

1818
import google.cloud.bigquery.job as bq_job
1919
import google.cloud.bigquery.table as bq_table
@@ -23,76 +23,180 @@
2323
import bigframes.core.compile.sqlglot.compiler as sqlglot_compiler
2424
import bigframes.core.events
2525
import bigframes.session._io.bigquery as bq_io
26-
from bigframes.core import compile, nodes
27-
from bigframes.session import executor, semi_executor
26+
from bigframes.core import bq_data, compile, nodes
27+
from bigframes.session import executor, semi_executor, execution_spec
28+
from bigframes import exceptions as bfe
29+
import bigframes.core.schema as schemata
30+
31+
import google.api_core.exceptions
32+
33+
34+
_WRITE_DISPOSITIONS = {
35+
"fail": bigquery.WriteDisposition.WRITE_EMPTY,
36+
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
37+
"append": bigquery.WriteDisposition.WRITE_APPEND,
38+
}
2839

2940

30-
# used only in testing right now, BigQueryCachingExecutor is the fully featured engine
31-
# simplified, doesnt not do large >10 gb result queries, error handling, respect global config
32-
# or record metrics. Also avoids caching, and most pre-compile rewrites, to better serve as a
33-
# reference for validating more complex executors.
3441
class DirectGbqExecutor(semi_executor.SemiExecutor):
3542
def __init__(
3643
self,
3744
bqclient: bigquery.Client,
38-
compiler: Literal["ibis", "sqlglot"] = "ibis",
45+
compiler: Literal["ibis", "sqlglot"]
46+
| Callable[[compile.CompileRequest], executor.CompileResult] = "ibis",
3947
*,
4048
publisher: bigframes.core.events.Publisher,
4149
):
4250
self.bqclient = bqclient
43-
self._compile_fn = (
44-
ibis_compiler.compile_sql
45-
if compiler == "ibis"
46-
else sqlglot_compiler.compile_sql
47-
)
51+
if isinstance(compiler, str):
52+
self._compile_fn = (
53+
ibis_compiler.compile_sql
54+
if compiler == "ibis"
55+
else sqlglot_compiler.compile_sql
56+
)
57+
else:
58+
self._compile_fn = compiler
4859
self._publisher = publisher
4960

5061
def execute(
5162
self,
5263
plan: nodes.BigFrameNode,
53-
ordered: bool,
54-
peek: Optional[int] = None,
64+
spec: execution_spec.ExecutionSpec,
5565
) -> executor.ExecuteResult:
5666
"""Just execute whatever plan as is, without further caching or decomposition."""
57-
# TODO(swast): plumb through the api_name of the user-facing api that
58-
# caused this query.
5967

60-
compiled = self._compile_fn(
61-
compile.CompileRequest(plan, sort_rows=ordered, peek_count=peek)
68+
og_schema = plan.schema
69+
70+
compiled = compile.compiler().compile_sql(
71+
compile.CompileRequest(
72+
plan,
73+
sort_rows=spec.ordered,
74+
peek_count=spec.peek,
75+
)
6276
)
77+
# might have more columns than og schema, for hidden ordering columns
78+
compiled_schema = compiled.sql_schema
79+
80+
job_config = bigquery.QueryJobConfig()
81+
if isinstance(spec.destination_spec, execution_spec.TableOutputSpec):
82+
job_config.destination = spec.destination_spec.table
83+
job_config.write_disposition = _WRITE_DISPOSITIONS[spec.destination_spec.if_exists]
84+
job_config.clustering_fields = spec.destination_spec.cluster_cols
85+
elif isinstance(spec.destination_spec, execution_spec.TempTableSpec) and spec.destination_spec.lifetime == "ephemeral":
86+
pass
87+
elif spec.destination_spec is not None:
88+
raise ValueError(f"Direct GBQ Executor does not support destination: {spec.destination_spec}")
6389

90+
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
91+
can_skip_job = spec.destination_spec is None and spec.promise_under_10gb
6492
iterator, query_job = self._run_execute_query(
6593
sql=compiled.sql,
94+
job_config=job_config,
95+
query_with_job=(not can_skip_job),
6696
session=plan.session,
6797
)
6898

69-
# just immediately downlaod everything for simplicity
70-
return executor.LocalExecuteResult(
71-
data=iterator.to_arrow(),
72-
bf_schema=plan.schema,
73-
execution_metadata=executor.ExecutionMetadata.from_iterator_and_job(
74-
iterator, query_job
75-
),
99+
result_bq_data = None
100+
if query_job and query_job.destination:
101+
# we might add extra sql columns in compilation, esp if caching w ordering, infer a bigframes type for them
102+
result_bf_schema = _result_schema(og_schema, list(compiled.sql_schema))
103+
dst = query_job.destination
104+
result_bq_data = bq_data.BigqueryDataSource(
105+
table=bq_data.GbqNativeTable.from_ref_and_schema(
106+
dst,
107+
tuple(compiled_schema),
108+
cluster_cols=spec.destination_spec.cluster_cols,
109+
location=iterator.location or self.storage_manager.location,
110+
table_type="TABLE",
111+
),
112+
schema=result_bf_schema,
113+
ordering=compiled.row_order,
114+
n_rows=iterator.total_rows,
115+
)
116+
117+
execution_metadata = executor.ExecutionMetadata.from_iterator_and_job(
118+
iterator, query_job
76119
)
120+
result_mostly_cached = (
121+
hasattr(iterator, "_is_almost_completely_cached")
122+
and iterator._is_almost_completely_cached()
123+
)
124+
if result_bq_data is not None and not result_mostly_cached:
125+
return executor.BQTableExecuteResult(
126+
data=result_bq_data,
127+
project_id=self.bqclient.project,
128+
storage_client=self.bqstoragereadclient,
129+
execution_metadata=execution_metadata,
130+
selected_fields=tuple((col, col) for col in og_schema.names),
131+
)
132+
else:
133+
return executor.LocalExecuteResult(
134+
data=iterator.to_arrow().select(og_schema.names),
135+
bf_schema=plan.schema,
136+
execution_metadata=execution_metadata,
137+
)
77138

78139
def _run_execute_query(
79140
self,
80141
sql: str,
81142
job_config: Optional[bq_job.QueryJobConfig] = None,
143+
query_with_job: bool = True,
82144
session=None,
83145
) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]:
84146
"""
85147
Starts BigQuery query job and waits for results.
86148
"""
87-
return bq_io.start_query_with_client(
88-
self.bqclient,
89-
sql,
90-
job_config=job_config or bq_job.QueryJobConfig(),
91-
project=None,
92-
location=None,
93-
timeout=None,
94-
metrics=None,
95-
query_with_job=False,
96-
publisher=self._publisher,
97-
session=session,
98-
)
149+
job_config = bq_job.QueryJobConfig() if job_config is None else job_config
150+
if bigframes.options.compute.maximum_bytes_billed is not None:
151+
job_config.maximum_bytes_billed = (
152+
bigframes.options.compute.maximum_bytes_billed
153+
)
154+
155+
if self._labels:
156+
job_config.labels.update(self._labels)
157+
158+
try:
159+
# Trick the type checker into thinking we got a literal.
160+
if query_with_job:
161+
return bq_io.start_query_with_client(
162+
self.bqclient,
163+
sql,
164+
job_config=job_config,
165+
metrics=self.metrics,
166+
project=None,
167+
location=None,
168+
timeout=None,
169+
query_with_job=True,
170+
publisher=self._publisher,
171+
session=session,
172+
)
173+
else:
174+
return bq_io.start_query_with_client(
175+
self.bqclient,
176+
sql,
177+
job_config=job_config,
178+
metrics=self.metrics,
179+
project=None,
180+
location=None,
181+
timeout=None,
182+
query_with_job=False,
183+
publisher=self._publisher,
184+
session=session,
185+
)
186+
187+
except google.api_core.exceptions.BadRequest as e:
188+
# Unfortunately, this error type does not have a separate error code or exception type
189+
if "Resources exceeded during query execution" in e.message:
190+
new_message = "Computation is too complex to execute as a single query. Try using DataFrame.cache() on intermediate results, or setting bigframes.options.compute.enable_multi_query_execution."
191+
raise bfe.QueryComplexityError(new_message) from e
192+
else:
193+
raise
194+
195+
def _result_schema(
196+
logical_schema: schemata.ArraySchema, sql_schema: list[bigquery.SchemaField]
197+
) -> schemata.ArraySchema:
198+
inferred_schema = bigframes.dtypes.bf_type_from_type_kind(sql_schema)
199+
inferred_schema.update(logical_schema._mapping)
200+
return schemata.ArraySchema(
201+
tuple(schemata.SchemaItem(col, dtype) for col, dtype in inferred_schema.items())
202+
)

packages/bigframes/bigframes/session/execution_spec.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,43 @@
2222

2323
@dataclasses.dataclass(frozen=True)
2424
class ExecutionSpec:
25-
destination_spec: Union[TableOutputSpec, GcsOutputSpec, CacheSpec, None] = None
25+
# destination for the result of the operation. Executor may also incidentally create other temporary tables for its own purposes.
26+
destination_spec: Union[TableOutputSpec, GcsOutputSpec, TempTableSpec, None] = None
27+
# If set, the result will be truncated to the given number of rows. Which N rows is
28+
# implementation dependent and not stable.
2629
peek: Optional[int] = None
27-
ordered: bool = (
28-
False # ordered and promise_under_10gb must both be together for bq execution
29-
)
30+
# Controls whether output iterator is ordered. Cannot be true if destination is not
31+
# guaranteed to be ordered.
32+
ordered: bool = False
3033
# This is an optimization flag for gbq execution, it doesn't change semantics, but if promise is falsely made, errors may occur
3134
promise_under_10gb: bool = False
3235

3336

34-
# This one is temporary, in future, caching will not be done through immediate execution, but will label nodes
35-
# that will be cached only when a super-tree is executed
37+
# Used internally by execution
3638
@dataclasses.dataclass(frozen=True)
37-
class CacheSpec:
38-
cluster_cols: tuple[str, ...]
39+
class TempTableSpec:
40+
"""
41+
Specifies that the result of an operation should be a session temp table.
42+
The table will be automatically deleted after the session ends.
43+
"""
44+
cluster_cols: tuple[str, ...] # if empty, will cluster using order key if ordering_key is set
45+
lifetime: Literal["session", "ephemeral"] = "session"
46+
# Controls ordering and whether extra columns are materialized to preserve ordering
47+
# Any extra columns will be appended to the end of the schema.
48+
# None: ordering may be discarded entirely (ordering metadata will still be provided if ordering is derivable from materialized columns)
49+
# order_rows: the result iterator itself will be ordered. For gbq execution, result cannot exceed 10GB.
50+
# order_key: the result set ordered by a key, may materialize extra columns.
51+
# offsets_col: order the result set by an offsets column, materializes one extra column.
52+
ordering: Literal["order_rows", "offsets_col", "order_key"] | None = None
3953

4054

4155
@dataclasses.dataclass(frozen=True)
4256
class TableOutputSpec:
57+
"""
58+
Specifies that the result of an operation should be exported to a specific named table.
59+
60+
The executor is not responsible for managing lifecycle of the table.
61+
"""
4362
table: bigquery.TableReference
4463
cluster_cols: tuple[str, ...]
4564
if_exists: Literal["fail", "replace", "append"] = "fail"

packages/bigframes/bigframes/session/local_scan_executor.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from typing import Optional
1717

1818
from bigframes.core import bigframe_node, rewrite
19-
from bigframes.session import executor, semi_executor
19+
from bigframes.session import executor, semi_executor, execution_spec
2020

2121

2222
class LocalScanExecutor(semi_executor.SemiExecutor):
@@ -27,15 +27,17 @@ class LocalScanExecutor(semi_executor.SemiExecutor):
2727
def execute(
2828
self,
2929
plan: bigframe_node.BigFrameNode,
30-
ordered: bool,
31-
peek: Optional[int] = None,
30+
execution_spec: execution_spec.ExecutionSpec,
3231
) -> Optional[executor.ExecuteResult]:
32+
if execution_spec.destination_spec is not None:
33+
return None
34+
3335
reduced_result = rewrite.try_reduce_to_local_scan(plan)
3436
if not reduced_result:
3537
return None
3638

3739
node, limit = reduced_result
38-
40+
peek = execution_spec.peek
3941
if limit is not None:
4042
if peek is None or limit < peek:
4143
peek = limit

packages/bigframes/bigframes/session/polars_executor.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
numeric_ops,
3535
string_ops,
3636
)
37-
from bigframes.session import executor, semi_executor
37+
from bigframes.session import executor, semi_executor, execution_spec
3838

3939
if TYPE_CHECKING:
4040
import polars as pl
@@ -140,20 +140,20 @@ def __init__(self):
140140
def execute(
141141
self,
142142
plan: bigframe_node.BigFrameNode,
143-
ordered: bool,
144-
peek: Optional[int] = None,
143+
execution_spec: execution_spec.ExecutionSpec,
145144
) -> Optional[executor.ExecuteResult]:
146145
if not self._can_execute(plan):
147146
return None
148-
# Note: Ignoring ordered flag, as just executing totally ordered is fine.
147+
if execution_spec.destination_spec is not None:
148+
return None
149149
try:
150150
lazy_frame: pl.LazyFrame = self._compiler.compile(
151151
array_value.ArrayValue(plan).node
152152
)
153153
except Exception:
154154
return None
155-
if peek is not None:
156-
lazy_frame = lazy_frame.limit(peek)
155+
if execution_spec.peek is not None:
156+
lazy_frame = lazy_frame.limit(execution_spec.peek)
157157
pa_table = lazy_frame.collect().to_arrow()
158158
return executor.LocalExecuteResult(
159159
data=pa_table,

packages/bigframes/bigframes/session/read_api_execution.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from google.cloud import bigquery_storage_v1
1919

2020
from bigframes.core import bigframe_node, bq_data, nodes, rewrite
21-
from bigframes.session import executor, semi_executor
21+
from bigframes.session import executor, semi_executor, execution_spec
2222

2323

2424
class ReadApiSemiExecutor(semi_executor.SemiExecutor):
@@ -37,14 +37,16 @@ def __init__(
3737
def execute(
3838
self,
3939
plan: bigframe_node.BigFrameNode,
40-
ordered: bool,
41-
peek: Optional[int] = None,
40+
execution_spec: execution_spec.ExecutionSpec,
4241
) -> Optional[executor.ExecuteResult]:
43-
adapt_result = self._try_adapt_plan(plan, ordered)
42+
if execution_spec.destination_spec is not None:
43+
return None
44+
45+
adapt_result = self._try_adapt_plan(plan, execution_spec.ordered)
4446
if not adapt_result:
4547
return None
4648
node, limit = adapt_result
47-
if node.explicitly_ordered and ordered:
49+
if node.explicitly_ordered and execution_spec.ordered:
4850
return None
4951

5052
if not isinstance(node.source.table, bq_data.GbqNativeTable):
@@ -53,6 +55,7 @@ def execute(
5355
if not node.source.table.is_physically_stored:
5456
return None
5557

58+
peek = execution_spec.peek
5659
if limit is not None:
5760
if peek is None or limit < peek:
5861
peek = limit

0 commit comments

Comments
 (0)