Skip to content

Commit 2aa0de4

Browse files
fixes
1 parent 2a945be commit 2aa0de4

4 files changed

Lines changed: 21 additions & 10 deletions

File tree

packages/bigframes/bigframes/core/rewrite/order.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def pull_out_order(
5151
node: bigframes.core.nodes.BigFrameNode,
5252
) -> Tuple[bigframes.core.nodes.BigFrameNode, bigframes.core.ordering.RowOrdering]:
5353
import bigframes.core.rewrite.slices
54+
5455
node = node.bottom_up(bigframes.core.rewrite.slices.rewrite_slice)
5556
return _pull_up_order(node, order_root=True)
5657

packages/bigframes/bigframes/session/bq_caching_executor.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -347,19 +347,19 @@ def _execute_to_cached_table(
347347
order_col_id = guid.generate_guid()
348348
plan = nodes.PromoteOffsetsNode(plan, identifiers.ColumnId(order_col_id))
349349
cluster_cols = [order_col_id]
350+
ordering = bigframes.core.ordering.TotalOrdering.from_offset_col(order_col_id)
350351
elif cache_spec.ordering == "order_key":
351352
plan, ordering = rewrite.pull_out_order(plan)
352353
destination_table = self.storage_manager.create_temp_table(
353354
plan.schema.to_bigquery(), cluster_cols
354355
)
355356
arr_value = bigframes.core.ArrayValue(plan)
356-
execution_spec = dataclasses.replace(
357-
execution_spec,
357+
execution_spec = ex_spec.ExecutionSpec(
358358
destination_spec=ex_spec.TableOutputSpec(
359359
table=destination_table,
360-
cluster_cols=cache_spec.cluster_cols,
360+
cluster_cols=cluster_cols,
361361
if_exists="replace",
362-
),
362+
)
363363
)
364364
# We don't use _execute_gbq_table_output, because we want to skip slower DML path.
365365
result = self._execute_gbq_query_only(arr_value, execution_spec)
@@ -427,7 +427,7 @@ def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue):
427427
"""Executes the query and uses the resulting table to rewrite future executions."""
428428
result = self._execute_to_cached_table(
429429
array_value.node,
430-
ex_spec.CacheSpec(cluster_cols=tuple(cluster_cols), ordering="order_key"),
430+
ex_spec.CacheSpec(ordering="offsets_col"),
431431
)
432432
assert isinstance(result, executor.BQTableExecuteResult)
433433
assert result._data.ordering is not None

packages/bigframes/bigframes/session/direct_gbq_execution.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16-
from typing import Callable, Literal, Optional, Tuple
16+
from typing import Callable, Literal, Mapping, Optional, Tuple
1717

1818
import google.cloud.bigquery.job as bq_job
1919
import google.cloud.bigquery.table as bq_table
@@ -135,8 +135,9 @@ def execute(
135135
)
136136

137137
if (isinstance(dest_spec, execution_spec.EphemeralTableSpec)) or (
138-
result_bq_data is not None and not result_mostly_cached
138+
(result_bq_data is not None) and not result_mostly_cached
139139
):
140+
assert result_bq_data is not None, "expected result table but none exists"
140141
return executor.BQTableExecuteResult(
141142
data=result_bq_data,
142143
project_id=self.bqclient.project,

packages/bigframes/tests/system/small/engines/conftest.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414
import pathlib
1515
from typing import Generator
1616

17+
import google.cloud.bigquery_storage_v1
1718
import pandas as pd
1819
import pytest
1920
from google.cloud import bigquery
2021

2122
import bigframes
22-
import google.cloud.bigquery_storage_v1
2323
from bigframes.core import ArrayValue, events, local_data
2424
from bigframes.session import (
2525
direct_gbq_execution,
@@ -44,23 +44,32 @@ def fake_session() -> Generator[bigframes.Session, None, None]:
4444
with bigframes.core.global_session._GlobalSessionContext(session):
4545
yield session
4646

47+
4748
@pytest.fixture(scope="session")
48-
def sqlglot_engine(bigquery_client: bigquery.Client, bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient):
49+
def sqlglot_engine(
50+
bigquery_client: bigquery.Client,
51+
bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient,
52+
):
4953
return direct_gbq_execution.DirectGbqExecutor(
5054
bigquery_client,
5155
bqstoragereadclient=bigquery_storage_read_client,
5256
compiler="sqlglot",
5357
publisher=events.Publisher(),
5458
)
5559

60+
5661
@pytest.fixture(scope="session")
57-
def bq_engine(bigquery_client: bigquery.Client, bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient):
62+
def bq_engine(
63+
bigquery_client: bigquery.Client,
64+
bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient,
65+
):
5866
return direct_gbq_execution.DirectGbqExecutor(
5967
bigquery_client,
6068
bqstoragereadclient=bigquery_storage_read_client,
6169
publisher=events.Publisher(),
6270
)
6371

72+
6473
@pytest.fixture(scope="session", params=["pyarrow", "polars", "bq", "bq-sqlglot"])
6574
def engine(
6675
request,

0 commit comments

Comments
 (0)