This repository was archived by the owner on Apr 1, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 67
Expand file tree
/
Copy pathpyformat.py
More file actions
199 lines (156 loc) · 6.37 KB
/
pyformat.py
File metadata and controls
199 lines (156 loc) · 6.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helpers for the pyformat feature."""
# TODO(tswast): consolidate with pandas-gbq and bigquery-magics. See:
# https://github.com/googleapis/python-bigquery-magics/blob/main/bigquery_magics/pyformat.py
from __future__ import annotations
import string
import typing
from typing import Any, Optional, Tuple, Union
import google.cloud.bigquery
import pandas
from bigframes.core import utils
import bigframes.core.local_data
from bigframes.core.tools import bigquery_schema
import bigframes.session
_BQ_TABLE_TYPES = Union[
google.cloud.bigquery.Table,
google.cloud.bigquery.TableReference,
google.cloud.bigquery.table.TableListItem,
]
def _table_to_sql(table: _BQ_TABLE_TYPES) -> str:
# BiglakeIcebergTable IDs have 4 parts. BigFrames packs catalog.namespace
# into the dataset_id.
dataset_parts = table.dataset_id.split(".")
dataset_sql = ".".join(f"`{part}`" for part in dataset_parts)
return f"`{table.project}`.{dataset_sql}.`{table.table_id}`"
def _pandas_df_to_sql_dry_run(pd_df: pandas.DataFrame) -> str:
# Ensure there are no duplicate column labels.
#
# Please make sure this stays in sync with the logic used to_gbq(). See
# bigframes.dataframe.DataFrame._prepare_export().
new_col_labels, new_idx_labels = utils.get_standardized_ids(
pd_df.columns, pd_df.index.names
)
pd_copy = pd_df.copy()
pd_copy.columns = pandas.Index(new_col_labels)
pd_copy.index.names = new_idx_labels
managed_table = bigframes.core.local_data.ManagedArrowTable.from_pandas(pd_copy)
bqschema = managed_table.schema.to_bigquery()
return bigquery_schema.to_sql_dry_run(bqschema)
def _pandas_df_to_sql(
df_pd: pandas.DataFrame,
*,
name: str,
session: Optional[bigframes.session.Session] = None,
dry_run: bool = False,
) -> str:
if session is None:
if not dry_run:
message = (
f"Can't embed pandas DataFrame {name} in a SQL "
"string without a bigframes session except if for a dry run."
)
raise ValueError(message)
return _pandas_df_to_sql_dry_run(df_pd)
# Use the _deferred engine to avoid loading data too often during dry run.
df = session.read_pandas(df_pd, write_engine="_deferred")
return _table_to_sql(df._to_placeholder_table(dry_run=dry_run))
def _field_to_template_value(
name: str,
value: Any,
*,
session: Optional[bigframes.session.Session] = None,
dry_run: bool = False,
) -> str:
"""Convert value to something embeddable in a SQL string."""
import bigframes.core.compile.sqlglot.sql as sql # Avoid circular imports
import bigframes.dataframe # Avoid circular imports
_validate_type(name, value)
table_types = typing.get_args(_BQ_TABLE_TYPES)
if isinstance(value, table_types):
return _table_to_sql(value)
if isinstance(value, pandas.DataFrame):
return _pandas_df_to_sql(value, session=session, dry_run=dry_run, name=name)
if isinstance(value, bigframes.dataframe.DataFrame):
import bigframes.core.bq_data as bq_data
import bigframes.core.nodes as nodes
# TODO(b/493608478): Remove this workaround for BigLake/Iceberg tables,
# which cannot currently be used in views, once a fix rolls out.
def is_biglake(
node: nodes.BigFrameNode, child_results: Tuple[bool, ...]
) -> bool:
if isinstance(node, nodes.ReadTableNode):
return isinstance(node.source.table, bq_data.BiglakeIcebergTable)
return any(child_results)
contains_biglake = value._block.expr.node.reduce_up(is_biglake)
if contains_biglake:
sql_query, _, _ = value._to_sql_query(include_index=False)
return f"({sql_query})"
return _table_to_sql(value._to_placeholder_table(dry_run=dry_run))
if isinstance(value, str):
return value
return sql.to_sql(sql.literal(value))
def _validate_type(name: str, value: Any):
"""Raises TypeError if value is unsupported."""
import bigframes.dataframe # Avoid circular imports
import bigframes.dtypes # Avoid circular imports
if value is None:
return # None can't be used in isinstance, but is a valid literal.
supported_types = (
typing.get_args(_BQ_TABLE_TYPES)
+ bigframes.dtypes.SUPPORTED_LITERAL_TYPES
+ (bigframes.dataframe.DataFrame,)
+ (pandas.DataFrame,)
)
if not isinstance(value, supported_types):
raise TypeError(
f"{name} has unsupported type: {type(value)}. "
f"Only {supported_types} are supported."
)
def _parse_fields(sql_template: str) -> list[str]:
return [
field_name
for _, field_name, _, _ in string.Formatter().parse(sql_template)
if field_name is not None
]
def pyformat(
sql_template: str,
*,
pyformat_args: dict,
session: Optional[bigframes.session.Session] = None,
dry_run: bool = False,
) -> str:
"""Unsafe Python-style string formatting of SQL string.
Only some data types supported.
Warning: strings are **not** escaped. This allows them to be used in
contexts such as table identifiers, where normal query parameters are not
supported.
Args:
sql_template (str):
SQL string with 0+ {var_name}-style format options.
pyformat_args (dict):
Variable namespace to use for formatting.
Raises:
TypeError: if a referenced variable is not of a supported type.
KeyError: if a referenced variable is not found.
"""
fields = _parse_fields(sql_template)
format_kwargs = {}
for name in fields:
value = pyformat_args[name]
format_kwargs[name] = _field_to_template_value(
name, value, session=session, dry_run=dry_run
)
return sql_template.format(**format_kwargs)