This repository was archived by the owner on Apr 1, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 67
Expand file tree
/
Copy pathbigquery_options.py
More file actions
450 lines (369 loc) · 17.9 KB
/
bigquery_options.py
File metadata and controls
450 lines (369 loc) · 17.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Options for BigQuery DataFrames."""
from __future__ import annotations
from typing import Literal, Optional, Sequence, Tuple
import warnings
import google.auth.credentials
import requests.adapters
import bigframes._importing
import bigframes.enums
import bigframes.exceptions as bfe
SESSION_STARTED_MESSAGE = (
"Cannot change '{attribute}' once a session has started. "
"Call bigframes.pandas.close_session() first, if you are using the bigframes.pandas API."
)
UNKNOWN_LOCATION_MESSAGE = "The location '{location}' is set to an unknown value. Did you mean '{possibility}'?"
def _get_validated_location(value: Optional[str]) -> Optional[str]:
import bigframes._tools.strings
if value is None or value in bigframes.constants.ALL_BIGQUERY_LOCATIONS:
return value
location = str(value)
location_lowercase = location.lower()
if location_lowercase in bigframes.constants.BIGQUERY_REGIONS:
return location_lowercase
location_uppercase = location.upper()
if location_uppercase in bigframes.constants.BIGQUERY_MULTIREGIONS:
return location_uppercase
possibility = min(
bigframes.constants.ALL_BIGQUERY_LOCATIONS,
key=lambda item: bigframes._tools.strings.levenshtein_distance(location, item),
)
# There are many layers before we get to (possibly) the user's code:
# -> bpd.options.bigquery.location = "us-central-1"
# -> location.setter
# -> _get_validated_location
msg = bfe.format_message(
UNKNOWN_LOCATION_MESSAGE.format(location=location, possibility=possibility)
)
warnings.warn(msg, stacklevel=3, category=bfe.UnknownLocationWarning)
return value
def _validate_ordering_mode(value: str) -> bigframes.enums.OrderingMode:
if value.casefold() == bigframes.enums.OrderingMode.STRICT.value.casefold():
return bigframes.enums.OrderingMode.STRICT
if value.casefold() == bigframes.enums.OrderingMode.PARTIAL.value.casefold():
return bigframes.enums.OrderingMode.PARTIAL
raise ValueError("Ordering mode must be one of 'strict' or 'partial'.")
class BigQueryOptions:
"""Encapsulates configuration for working with a session."""
def __init__(
self,
credentials: Optional[google.auth.credentials.Credentials] = None,
project: Optional[str] = None,
location: Optional[str] = None,
bq_connection: Optional[str] = None,
use_regional_endpoints: bool = False,
application_name: Optional[str] = None,
kms_key_name: Optional[str] = None,
skip_bq_connection_check: bool = False,
*,
allow_large_results: bool = False,
ordering_mode: Literal["strict", "partial"] = "strict",
client_endpoints_override: Optional[dict] = None,
requests_transport_adapters: Sequence[
Tuple[str, requests.adapters.BaseAdapter]
] = (),
enable_polars_execution: bool = False,
):
self._credentials = credentials
self._project = project
self._location = _get_validated_location(location)
self._bq_connection = bq_connection
self._use_regional_endpoints = use_regional_endpoints
self._application_name = application_name
self._kms_key_name = kms_key_name
self._skip_bq_connection_check = skip_bq_connection_check
self._allow_large_results = allow_large_results
self._requests_transport_adapters = requests_transport_adapters
self._session_started = False
# Determines the ordering strictness for the session.
self._ordering_mode = _validate_ordering_mode(ordering_mode)
if client_endpoints_override is None:
client_endpoints_override = {}
self._client_endpoints_override = client_endpoints_override
if enable_polars_execution:
bigframes._importing.import_polars()
self._enable_polars_execution = enable_polars_execution
@property
def application_name(self) -> Optional[str]:
"""The application name to amend to the user-agent sent to Google APIs.
The application name to amend to the user agent sent to Google APIs.
The recommended format is ``"application-name/major.minor.patch_version"``
or ``"(gpn:PartnerName;)"`` for official Google partners.
Returns:
None or str:
Application name as a string if exists; otherwise None.
"""
return self._application_name
@application_name.setter
def application_name(self, value: Optional[str]):
if self._session_started and self._application_name != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="application_name")
)
self._application_name = value
@property
def credentials(self) -> Optional[google.auth.credentials.Credentials]:
"""The OAuth2 credentials to use for this client.
Returns:
None or google.auth.credentials.Credentials:
google.auth.credentials.Credentials if exists; otherwise None.
"""
return self._credentials
@credentials.setter
def credentials(self, value: Optional[google.auth.credentials.Credentials]):
if self._session_started and self._credentials is not value:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="credentials"))
self._credentials = value
@property
def location(self) -> Optional[str]:
"""Default location for job, datasets, and tables.
For more information, see https://cloud.google.com/bigquery/docs/locations BigQuery locations.
Returns:
None or str:
Default location as a string; otherwise None.
"""
return self._location
@location.setter
def location(self, value: Optional[str]):
if self._session_started and self._location != _get_validated_location(value):
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="location"))
self._location = _get_validated_location(value)
@property
def project(self) -> Optional[str]:
"""Google Cloud project ID to use for billing and as the default project.
Returns:
None or str:
Google Cloud project ID as a string; otherwise None.
"""
return self._project
@project.setter
def project(self, value: Optional[str]):
if self._session_started and self._project != value:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="project"))
self._project = value
@property
def bq_connection(self) -> Optional[str]:
"""Name of the BigQuery connection to use in the form
<PROJECT_NUMBER/PROJECT_ID>.<LOCATION>.<CONNECTION_ID>.
You either need to create the connection in a location of your choice, or
you need the Project Admin IAM role to enable the service to create the
connection for you.
If this option isn't available, or the project or location isn't provided,
then the default connection project/location/connection_id is used in the session.
If this option isn't provided, or project or location aren't provided,
session will use its default project/location/connection_id as default connection.
Returns:
None or str:
Name of the BigQuery connection as a string; otherwise None.
"""
return self._bq_connection
@bq_connection.setter
def bq_connection(self, value: Optional[str]):
if self._session_started and self._bq_connection != value:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="bq_connection"))
self._bq_connection = value
@property
def skip_bq_connection_check(self) -> bool:
"""Forcibly use the BigQuery connection.
Setting this flag to True would avoid creating the BigQuery connection
and checking or setting IAM permissions on it. So if the BigQuery
connection (default or user-provided) does not exist, or it does not have
necessary permissions set up to support BigQuery DataFrames operations,
then a runtime error will be reported.
Returns:
bool:
A boolean value, where True indicates a BigQuery connection is
not created or the connection does not have necessary
permissions set up; otherwise False.
"""
return self._skip_bq_connection_check
@skip_bq_connection_check.setter
def skip_bq_connection_check(self, value: bool):
if self._session_started and self._skip_bq_connection_check != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="skip_bq_connection_check")
)
self._skip_bq_connection_check = value
@property
def allow_large_results(self) -> bool:
"""
DEPRECATED: Checks the legacy global setting for allowing large results.
Use ``bpd.options.compute.allow_large_results`` instead.
Warning: Accessing ``bpd.options.bigquery.allow_large_results`` is deprecated
and this property will be removed in a future version. The configuration for
handling large results has moved.
Returns:
bool: The value of the deprecated setting.
"""
return self._allow_large_results
@allow_large_results.setter
def allow_large_results(self, value: bool):
warnings.warn(
"Setting `bpd.options.bigquery.allow_large_results` is deprecated, "
"and will be removed in the future. "
"Please use `bpd.options.compute.allow_large_results = <value>` instead. "
"The `bpd.options.bigquery.allow_large_results` option is ignored if "
"`bpd.options.compute.allow_large_results` is set.",
FutureWarning,
stacklevel=2,
)
if self._session_started and self._allow_large_results != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="allow_large_results")
)
self._allow_large_results = value
@property
def use_regional_endpoints(self) -> bool:
"""Flag to connect to regional API endpoints for BigQuery API and
BigQuery Storage API.
.. note::
Use of regional endpoints is a feature in Preview and available only
in regions "europe-west3", "europe-west8", "europe-west9",
"me-central2", "us-central1", "us-central2", "us-east1", "us-east4",
"us-east5", "us-east7", "us-south1", "us-west1", "us-west2", "us-west3"
and "us-west4".
Requires that ``location`` is set. For [supported regions](https://cloud.google.com/bigquery/docs/regional-endpoints),
for example ``europe-west3``, you need to specify
``location='europe-west3'`` and ``use_regional_endpoints=True``, and
then BigQuery DataFrames would connect to the BigQuery endpoint
``bigquery.europe-west3.rep.googleapis.com``. For not supported regions,
for example ``asia-northeast1``, when you specify
``location='asia-northeast1'`` and ``use_regional_endpoints=True``,
the global endpoint ``bigquery.googleapis.com`` would be used, which
does not promise any guarantee on the request remaining within the
location during transit.
Returns:
bool:
A boolean value, where True indicates that regional endpoints
would be used for BigQuery and BigQuery storage APIs; otherwise
global endpoints would be used.
"""
return self._use_regional_endpoints
@use_regional_endpoints.setter
def use_regional_endpoints(self, value: bool):
if self._session_started and self._use_regional_endpoints != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="use_regional_endpoints")
)
if value:
msg = bfe.format_message(
"Use of regional endpoints is a feature in preview and "
"available only in selected regions and projects. "
)
warnings.warn(msg, category=bfe.PreviewWarning, stacklevel=2)
self._use_regional_endpoints = value
@property
def kms_key_name(self) -> Optional[str]:
"""
Customer managed encryption key used to control encryption of the
data-at-rest in BigQuery. This is of the format
projects/PROJECT_ID/locations/LOCATION/keyRings/KEYRING/cryptoKeys/KEY.
For more information, see https://cloud.google.com/bigquery/docs/customer-managed-encryption
Customer-managed Cloud KMS keys
Make sure the project used for Bigquery DataFrames has the
Cloud KMS CryptoKey Encrypter/Decrypter IAM role in the key's project.
For more information, see https://cloud.google.com/bigquery/docs/customer-managed-encryption#assign_role
Assign the Encrypter/Decrypter.
Returns:
None or str:
Name of the customer managed encryption key as a string; otherwise None.
"""
return self._kms_key_name
@kms_key_name.setter
def kms_key_name(self, value: str):
if self._session_started and self._kms_key_name != value:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="kms_key_name"))
self._kms_key_name = value
@property
def ordering_mode(self) -> Literal["strict", "partial"]:
"""Controls whether total row order is always maintained for DataFrame/Series.
Returns:
Literal:
A literal string value of either strict or partial ordering mode.
"""
return self._ordering_mode.value
@ordering_mode.setter
def ordering_mode(self, value: Literal["strict", "partial"]) -> None:
ordering_mode = _validate_ordering_mode(value)
if self._session_started and self._ordering_mode != ordering_mode:
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="ordering_mode"))
self._ordering_mode = ordering_mode
@property
def client_endpoints_override(self) -> dict:
"""Option that sets the BQ client endpoints addresses directly as a dict. Possible keys are "bqclient", "bqconnectionclient", "bqstoragereadclient"."""
return self._client_endpoints_override
@client_endpoints_override.setter
def client_endpoints_override(self, value: dict):
msg = bfe.format_message(
"This is an advanced configuration option for directly setting endpoints. "
"Incorrect use may lead to unexpected behavior or system instability. "
"Proceed only if you fully understand its implications."
)
warnings.warn(msg)
if self._session_started and self._client_endpoints_override != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="client_endpoints_override")
)
self._client_endpoints_override = value
@property
def requests_transport_adapters(
self,
) -> Sequence[Tuple[str, requests.adapters.BaseAdapter]]:
"""Transport adapters for requests-based REST clients such as the
google-cloud-bigquery package.
For more details, see the explanation in `requests guide to transport
adapters
<https://requests.readthedocs.io/en/latest/user/advanced/#transport-adapters>`_.
**Examples:**
Increase the connection pool size using the requests `HTTPAdapter
<https://requests.readthedocs.io/en/latest/api/#requests.adapters.HTTPAdapter>`_.
>>> import bigframes.pandas as bpd
>>> bpd.options.bigquery.requests_transport_adapters = (
... ("http://", requests.adapters.HTTPAdapter(pool_maxsize=100)),
... ("https://", requests.adapters.HTTPAdapter(pool_maxsize=100)),
... ) # doctest: +SKIP
Returns:
Sequence[Tuple[str, requests.adapters.BaseAdapter]]:
Prefixes and corresponding transport adapters to `mount
<https://requests.readthedocs.io/en/latest/api/#requests.Session.mount>`_
in requests-based REST clients.
"""
return self._requests_transport_adapters
@requests_transport_adapters.setter
def requests_transport_adapters(
self, value: Sequence[Tuple[str, requests.adapters.BaseAdapter]]
) -> None:
if self._session_started and self._requests_transport_adapters != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="requests_transport_adapters")
)
self._requests_transport_adapters = value
@property
def enable_polars_execution(self) -> bool:
"""If True, will use polars to execute some simple query plans locally."""
return self._enable_polars_execution
@enable_polars_execution.setter
def enable_polars_execution(self, value: bool):
if self._session_started and self._enable_polars_execution != value:
raise ValueError(
SESSION_STARTED_MESSAGE.format(attribute="enable_polars_execution")
)
if value is True:
msg = bfe.format_message(
"Polars execution is an experimental feature, and may not be stable. Must have polars installed."
)
warnings.warn(msg, category=bfe.PreviewWarning)
bigframes._importing.import_polars()
self._enable_polars_execution = value