Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 647a61f

Browse files
authored
Merge branch 'main' into main_chelsealin_bbqjson2
2 parents dc8783b + a63fc02 commit 647a61f

File tree

12 files changed

+260
-12
lines changed

12 files changed

+260
-12
lines changed

bigframes/core/compile/sqlglot/compiler.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,28 @@ def compile_join(
244244
joins_nulls=node.joins_nulls,
245245
)
246246

247+
@_compile_node.register
248+
def compile_isin_join(
249+
self, node: nodes.InNode, left: ir.SQLGlotIR, right: ir.SQLGlotIR
250+
) -> ir.SQLGlotIR:
251+
conditions = (
252+
typed_expr.TypedExpr(
253+
scalar_compiler.compile_scalar_expression(node.left_col),
254+
node.left_col.output_type,
255+
),
256+
typed_expr.TypedExpr(
257+
scalar_compiler.compile_scalar_expression(node.right_col),
258+
node.right_col.output_type,
259+
),
260+
)
261+
262+
return left.isin_join(
263+
right,
264+
indicator_col=node.indicator_col.sql,
265+
conditions=conditions,
266+
joins_nulls=node.joins_nulls,
267+
)
268+
247269
@_compile_node.register
248270
def compile_concat(
249271
self, node: nodes.ConcatNode, *children: ir.SQLGlotIR

bigframes/core/compile/sqlglot/sqlglot_ir.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,68 @@ def join(
336336

337337
return SQLGlotIR(expr=new_expr, uid_gen=self.uid_gen)
338338

339+
def isin_join(
340+
self,
341+
right: SQLGlotIR,
342+
indicator_col: str,
343+
conditions: tuple[typed_expr.TypedExpr, typed_expr.TypedExpr],
344+
joins_nulls: bool = True,
345+
) -> SQLGlotIR:
346+
"""Joins the current query with another SQLGlotIR instance."""
347+
left_cte_name = sge.to_identifier(
348+
next(self.uid_gen.get_uid_stream("bfcte_")), quoted=self.quoted
349+
)
350+
351+
left_select = _select_to_cte(self.expr, left_cte_name)
352+
# Prefer subquery over CTE for the IN clause's right side to improve SQL readability.
353+
right_select = right.expr
354+
355+
left_ctes = left_select.args.pop("with", [])
356+
right_ctes = right_select.args.pop("with", [])
357+
merged_ctes = [*left_ctes, *right_ctes]
358+
359+
left_condition = typed_expr.TypedExpr(
360+
sge.Column(this=conditions[0].expr, table=left_cte_name),
361+
conditions[0].dtype,
362+
)
363+
364+
new_column: sge.Expression
365+
if joins_nulls:
366+
right_table_name = sge.to_identifier(
367+
next(self.uid_gen.get_uid_stream("bft_")), quoted=self.quoted
368+
)
369+
right_condition = typed_expr.TypedExpr(
370+
sge.Column(this=conditions[1].expr, table=right_table_name),
371+
conditions[1].dtype,
372+
)
373+
new_column = sge.Exists(
374+
this=sge.Select()
375+
.select(sge.convert(1))
376+
.from_(sge.Alias(this=right_select.subquery(), alias=right_table_name))
377+
.where(
378+
_join_condition(left_condition, right_condition, joins_nulls=True)
379+
)
380+
)
381+
else:
382+
new_column = sge.In(
383+
this=left_condition.expr,
384+
expressions=[right_select.subquery()],
385+
)
386+
387+
new_column = sge.Alias(
388+
this=new_column,
389+
alias=sge.to_identifier(indicator_col, quoted=self.quoted),
390+
)
391+
392+
new_expr = (
393+
sge.Select()
394+
.select(sge.Column(this=sge.Star(), table=left_cte_name), new_column)
395+
.from_(sge.Table(this=left_cte_name))
396+
)
397+
new_expr.set("with", sge.With(expressions=merged_ctes))
398+
399+
return SQLGlotIR(expr=new_expr, uid_gen=self.uid_gen)
400+
339401
def explode(
340402
self,
341403
column_names: tuple[str, ...],

bigframes/functions/_function_client.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import textwrap
2626
import types
2727
from typing import Any, cast, Optional, Sequence, Tuple, TYPE_CHECKING
28+
import warnings
2829

2930
import requests
3031

32+
import bigframes.exceptions as bfe
3133
import bigframes.formatting_helpers as bf_formatting
3234
import bigframes.functions.function_template as bff_template
3335

@@ -482,10 +484,16 @@ def create_cloud_function(
482484
function.service_config.max_instance_count = max_instance_count
483485
if vpc_connector is not None:
484486
function.service_config.vpc_connector = vpc_connector
487+
if vpc_connector_egress_settings is None:
488+
msg = bfe.format_message(
489+
"The 'vpc_connector_egress_settings' was not specified. Defaulting to 'private-ranges-only'.",
490+
)
491+
warnings.warn(msg, category=UserWarning)
492+
vpc_connector_egress_settings = "private-ranges-only"
485493
if vpc_connector_egress_settings not in _VPC_EGRESS_SETTINGS_MAP:
486494
raise bf_formatting.create_exception_with_feedback_link(
487495
ValueError,
488-
f"'{vpc_connector_egress_settings}' not one of the supported vpc egress settings values: {list(_VPC_EGRESS_SETTINGS_MAP)}",
496+
f"'{vpc_connector_egress_settings}' is not one of the supported vpc egress settings values: {list(_VPC_EGRESS_SETTINGS_MAP)}",
489497
)
490498
function.service_config.vpc_connector_egress_settings = cast(
491499
functions_v2.ServiceConfig.VpcConnectorEgressSettings,

bigframes/functions/_function_session.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,9 +245,9 @@ def remote_function(
245245
cloud_function_timeout: Optional[int] = 600,
246246
cloud_function_max_instances: Optional[int] = None,
247247
cloud_function_vpc_connector: Optional[str] = None,
248-
cloud_function_vpc_connector_egress_settings: Literal[
249-
"all", "private-ranges-only", "unspecified"
250-
] = "private-ranges-only",
248+
cloud_function_vpc_connector_egress_settings: Optional[
249+
Literal["all", "private-ranges-only", "unspecified"]
250+
] = None,
251251
cloud_function_memory_mib: Optional[int] = 1024,
252252
cloud_function_ingress_settings: Literal[
253253
"all", "internal-only", "internal-and-gclb"
@@ -514,6 +514,16 @@ def remote_function(
514514
" For more details see https://cloud.google.com/functions/docs/securing/cmek#before_you_begin.",
515515
)
516516

517+
# A VPC connector is required to specify VPC egress settings.
518+
if (
519+
cloud_function_vpc_connector_egress_settings is not None
520+
and cloud_function_vpc_connector is None
521+
):
522+
raise bf_formatting.create_exception_with_feedback_link(
523+
ValueError,
524+
"cloud_function_vpc_connector must be specified before cloud_function_vpc_connector_egress_settings.",
525+
)
526+
517527
if cloud_function_ingress_settings is None:
518528
cloud_function_ingress_settings = "internal-only"
519529
msg = bfe.format_message(

bigframes/pandas/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,9 @@ def remote_function(
8787
cloud_function_timeout: Optional[int] = 600,
8888
cloud_function_max_instances: Optional[int] = None,
8989
cloud_function_vpc_connector: Optional[str] = None,
90-
cloud_function_vpc_connector_egress_settings: Literal[
91-
"all", "private-ranges-only", "unspecified"
92-
] = "private-ranges-only",
90+
cloud_function_vpc_connector_egress_settings: Optional[
91+
Literal["all", "private-ranges-only", "unspecified"]
92+
] = None,
9393
cloud_function_memory_mib: Optional[int] = 1024,
9494
cloud_function_ingress_settings: Literal[
9595
"all", "internal-only", "internal-and-gclb"

bigframes/session/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,9 +1509,9 @@ def remote_function(
15091509
cloud_function_timeout: Optional[int] = 600,
15101510
cloud_function_max_instances: Optional[int] = None,
15111511
cloud_function_vpc_connector: Optional[str] = None,
1512-
cloud_function_vpc_connector_egress_settings: Literal[
1513-
"all", "private-ranges-only", "unspecified"
1514-
] = "private-ranges-only",
1512+
cloud_function_vpc_connector_egress_settings: Optional[
1513+
Literal["all", "private-ranges-only", "unspecified"]
1514+
] = None,
15151515
cloud_function_memory_mib: Optional[int] = 1024,
15161516
cloud_function_ingress_settings: Literal[
15171517
"all", "internal-only", "internal-and-gclb"

noxfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ def prerelease(session: nox.sessions.Session, tests_path, extra_pytest_options=(
665665
session.install(
666666
"--upgrade",
667667
"-e",
668-
"git+https://github.com/googleapis/python-bigquery-storage.git#egg=google-cloud-bigquery-storage",
668+
"git+https://github.com/googleapis/google-cloud-python.git#egg=google-cloud-bigquery-storage&subdirectory=packages/google-cloud-bigquery-storage",
669669
)
670670
already_installed.add("google-cloud-bigquery-storage")
671671
session.install(

tests/system/large/functions/test_remote_function.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,6 +1512,46 @@ def square_num(x):
15121512
)
15131513

15141514

1515+
@pytest.mark.flaky(retries=2, delay=120)
1516+
def test_remote_function_no_vpc_connector(session):
1517+
def foo(x):
1518+
return x
1519+
1520+
with pytest.raises(
1521+
ValueError,
1522+
match="^cloud_function_vpc_connector must be specified before cloud_function_vpc_connector_egress_settings",
1523+
):
1524+
session.remote_function(
1525+
input_types=[int],
1526+
output_type=int,
1527+
reuse=False,
1528+
cloud_function_service_account="default",
1529+
cloud_function_vpc_connector=None,
1530+
cloud_function_vpc_connector_egress_settings="all",
1531+
cloud_function_ingress_settings="all",
1532+
)(foo)
1533+
1534+
1535+
@pytest.mark.flaky(retries=2, delay=120)
1536+
def test_remote_function_wrong_vpc_egress_value(session):
1537+
def foo(x):
1538+
return x
1539+
1540+
with pytest.raises(
1541+
ValueError,
1542+
match="^'wrong-egress-value' is not one of the supported vpc egress settings values:",
1543+
):
1544+
session.remote_function(
1545+
input_types=[int],
1546+
output_type=int,
1547+
reuse=False,
1548+
cloud_function_service_account="default",
1549+
cloud_function_vpc_connector="dummy-value",
1550+
cloud_function_vpc_connector_egress_settings="wrong-egress-value",
1551+
cloud_function_ingress_settings="all",
1552+
)(foo)
1553+
1554+
15151555
@pytest.mark.parametrize(
15161556
("max_batching_rows"),
15171557
[

tests/unit/core/compile/sqlglot/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def scalar_types_table_schema() -> typing.Sequence[bigquery.SchemaField]:
8585
bigquery.SchemaField("numeric_col", "NUMERIC"),
8686
bigquery.SchemaField("float64_col", "FLOAT"),
8787
bigquery.SchemaField("rowindex", "INTEGER"),
88-
bigquery.SchemaField("rowindex_2", "INTEGER"),
88+
bigquery.SchemaField("rowindex_2", "INTEGER", mode="REQUIRED"),
8989
bigquery.SchemaField("string_col", "STRING"),
9090
bigquery.SchemaField("time_col", "TIME"),
9191
bigquery.SchemaField("timestamp_col", "TIMESTAMP"),
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
WITH `bfcte_1` AS (
2+
SELECT
3+
`int64_col` AS `bfcol_0`,
4+
`rowindex` AS `bfcol_1`
5+
FROM `bigframes-dev`.`sqlglot_test`.`scalar_types`
6+
), `bfcte_2` AS (
7+
SELECT
8+
`bfcol_1` AS `bfcol_2`,
9+
`bfcol_0` AS `bfcol_3`
10+
FROM `bfcte_1`
11+
), `bfcte_0` AS (
12+
SELECT
13+
`int64_too` AS `bfcol_4`
14+
FROM `bigframes-dev`.`sqlglot_test`.`scalar_types`
15+
), `bfcte_3` AS (
16+
SELECT
17+
`bfcte_2`.*,
18+
EXISTS(
19+
SELECT
20+
1
21+
FROM (
22+
SELECT
23+
`bfcol_4`
24+
FROM `bfcte_0`
25+
GROUP BY
26+
`bfcol_4`
27+
) AS `bft_0`
28+
WHERE
29+
COALESCE(`bfcte_2`.`bfcol_3`, 0) = COALESCE(`bft_0`.`bfcol_4`, 0)
30+
AND COALESCE(`bfcte_2`.`bfcol_3`, 1) = COALESCE(`bft_0`.`bfcol_4`, 1)
31+
) AS `bfcol_5`
32+
FROM `bfcte_2`
33+
)
34+
SELECT
35+
`bfcol_2` AS `rowindex`,
36+
`bfcol_5` AS `int64_col`
37+
FROM `bfcte_3`

0 commit comments

Comments
 (0)