Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit e6bae77

Browse files
committed
chore: copy create external table patterns
1 parent 36fd5bb commit e6bae77

File tree

5 files changed

+157
-183
lines changed

5 files changed

+157
-183
lines changed

bigframes/bigquery/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
from bigframes.bigquery._operations.search import create_vector_index, vector_search
6262
from bigframes.bigquery._operations.sql import sql_scalar
6363
from bigframes.bigquery._operations.struct import struct
64-
from bigframes.bigquery.table import create_external_table
64+
from bigframes.bigquery._operations.table import create_external_table
6565
from bigframes.core.logging import log_adapter
6666

6767
_functions = [
Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from __future__ import annotations
1616

17-
from typing import Mapping, Optional, Union
17+
from typing import Any, Mapping, Optional, Sequence, Union
1818

1919
import bigframes_vendored.constants
2020
import google.cloud.bigquery
@@ -101,3 +101,85 @@ def create_external_table(
101101
session.read_gbq_query(sql)
102102

103103
return _get_table_metadata(bqclient=session.bqclient, table_name=table_name)
104+
105+
106+
@log_adapter.method_logger(custom_base_name="bigquery_table")
107+
def load_data(
108+
uris: str | Sequence[str],
109+
format: str,
110+
destination_table: str,
111+
*,
112+
schema: Optional[Mapping[str, str]] = None,
113+
cluster_by: Optional[Sequence[str]] = None,
114+
partition_by: Optional[str] = None,
115+
options: Optional[dict[str, Any]] = None,
116+
load_options: Optional[dict[str, Any]] = None,
117+
connection: Optional[str] = None,
118+
hive_partition_columns: Optional[Mapping[str, str]] = None,
119+
overwrite: bool = False,
120+
session: Optional[bigframes.session.Session] = None,
121+
) -> pd.Series:
122+
"""
123+
Loads data from external files into a BigQuery table using the `LOAD DATA` statement.
124+
125+
Args:
126+
uris (str | List[str]):
127+
The fully qualified URIs for the external data locations (e.g., 'gs://bucket/path/file.csv').
128+
format (str):
129+
The format of the external data (e.g., 'CSV', 'PARQUET', 'AVRO', 'JSON').
130+
destination_table (str, optional):
131+
The name of the destination table. If not specified, a temporary table will be created.
132+
schema (List[google.cloud.bigquery.SchemaField], optional):
133+
The schema of the destination table. If not provided, schema auto-detection will be used.
134+
cluster_by (List[str], optional):
135+
A list of columns to cluster the table by.
136+
partition_by (str, optional):
137+
The partition expression for the table.
138+
options (dict[str, Any], optional):
139+
Table options (e.g., {'description': 'my table'}).
140+
load_options (dict[str, Any], optional):
141+
Options for loading data (e.g., {'skip_leading_rows': 1}).
142+
connection (str, optional):
143+
The connection name to use for reading external data.
144+
hive_partition_columns (List[google.cloud.bigquery.SchemaField], optional):
145+
The external partitioning columns. If set to an empty list, partitioning is inferred.
146+
overwrite (bool, default False):
147+
If True, overwrites the destination table. If False, appends to it.
148+
session (bigframes.session.Session, optional):
149+
The session to use. If not provided, the default session is used.
150+
151+
Returns:
152+
pandas.Series:
153+
A Series with object dtype containing the table metadata. Reference
154+
the `BigQuery Table REST API reference
155+
<https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table>`_
156+
for available fields.
157+
"""
158+
import bigframes.pandas as bpd
159+
160+
if session is None:
161+
session = bpd.get_global_session()
162+
163+
if isinstance(uris, str):
164+
uris = [uris]
165+
166+
sql = bigframes.core.sql.table.load_data_ddl(
167+
destination_table=destination_table,
168+
uris=uris,
169+
format=format,
170+
schema_fields=schema,
171+
cluster_by=cluster_by,
172+
partition_by=partition_by,
173+
table_options=options,
174+
load_options=load_options,
175+
connection=connection,
176+
hive_partition_columns=hive_partition_columns,
177+
overwrite=overwrite,
178+
)
179+
180+
# Execute the LOAD DATA statement
181+
session.read_gbq_query(sql)
182+
183+
# Return a DataFrame pointing to the destination table
184+
# We use session.read_gbq to ensure it uses the same session
185+
return session.read_gbq(destination_table)

bigframes/bigquery/_operations/io.py

Lines changed: 0 additions & 116 deletions
This file was deleted.

bigframes/core/sql/__init__.py

Lines changed: 2 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import decimal
2222
import json
2323
import math
24-
from typing import Any, cast, Collection, Iterable, Mapping, Optional, TYPE_CHECKING, Union
24+
from typing import cast, Collection, Iterable, Mapping, Optional, TYPE_CHECKING, Union
2525

2626
import shapely.geometry.base # type: ignore
2727

@@ -172,7 +172,7 @@ def create_vector_index_ddl(
172172
table_name: str,
173173
column_name: str,
174174
stored_column_names: Collection[str],
175-
options: Mapping[str, Union[str | int | bool | float]] = {},
175+
options: Mapping[str, Union[str, int, bool, float]] = {},
176176
) -> str:
177177
"""Encode the VECTOR INDEX statement for BigQuery Vector Search."""
178178

@@ -275,65 +275,3 @@ def schema_field_to_sql(field: bigquery.SchemaField) -> str:
275275
if field.description:
276276
sql += f" OPTIONS(description={simple_literal(field.description)})"
277277
return sql
278-
279-
280-
def load_data_ddl(
281-
destination_table: str,
282-
uris: list[str],
283-
format: str,
284-
*,
285-
schema_fields: list[bigquery.SchemaField] | None = None,
286-
cluster_by: list[str] | None = None,
287-
partition_by: str | None = None,
288-
table_options: dict[str, Any] | None = None,
289-
load_options: dict[str, Any] | None = None,
290-
connection: str | None = None,
291-
hive_partition_columns: list[bigquery.SchemaField] | None = None,
292-
overwrite: bool = False,
293-
) -> str:
294-
"""Construct a LOAD DATA DDL statement."""
295-
action = "OVERWRITE" if overwrite else "INTO"
296-
297-
query = f"LOAD DATA {action} {googlesql.identifier(destination_table)}\n"
298-
299-
if schema_fields:
300-
columns_sql = ",\n".join(schema_field_to_sql(field) for field in schema_fields)
301-
query += f"(\n{columns_sql}\n)\n"
302-
303-
if partition_by:
304-
query += f"PARTITION BY {partition_by}\n"
305-
306-
if cluster_by:
307-
query += f"CLUSTER BY {', '.join(cluster_by)}\n"
308-
309-
if table_options:
310-
opts_list = []
311-
for k, v in table_options.items():
312-
opts_list.append(f"{k}={simple_literal(v)}")
313-
query += f"OPTIONS({', '.join(opts_list)})\n"
314-
315-
files_opts = {}
316-
if load_options:
317-
files_opts.update(load_options)
318-
319-
files_opts["uris"] = uris
320-
files_opts["format"] = format
321-
322-
files_opts_list = []
323-
for k, v in files_opts.items():
324-
files_opts_list.append(f"{k}={simple_literal(v)}")
325-
326-
query += f"FROM FILES({', '.join(files_opts_list)})\n"
327-
328-
if hive_partition_columns:
329-
cols_sql = ",\n".join(
330-
schema_field_to_sql(field) for field in hive_partition_columns
331-
)
332-
query += f"WITH PARTITION COLUMNS (\n{cols_sql}\n)\n"
333-
elif hive_partition_columns is not None:
334-
query += "WITH PARTITION COLUMNS\n"
335-
336-
if connection:
337-
query += f"WITH CONNECTION {connection}\n"
338-
339-
return query
Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,12 @@
1414

1515
from __future__ import annotations
1616

17-
from typing import Mapping, Optional, Union
17+
from typing import Any, Mapping, Optional, Union
18+
19+
from google.cloud import bigquery
20+
21+
import bigframes.core.compile.googlesql as googlesql
22+
import bigframes.core.sql
1823

1924

2025
def create_external_table_ddl(
@@ -66,3 +71,68 @@ def create_external_table_ddl(
6671
statement.append(f"OPTIONS ({options_str})")
6772

6873
return " ".join(statement)
74+
75+
76+
def load_data_ddl(
77+
destination_table: str,
78+
uris: list[str],
79+
format: str,
80+
*,
81+
schema_fields: list[bigquery.SchemaField] | None = None,
82+
cluster_by: list[str] | None = None,
83+
partition_by: str | None = None,
84+
table_options: dict[str, Any] | None = None,
85+
load_options: dict[str, Any] | None = None,
86+
connection: str | None = None,
87+
hive_partition_columns: list[bigquery.SchemaField] | None = None,
88+
overwrite: bool = False,
89+
) -> str:
90+
"""Construct a LOAD DATA DDL statement."""
91+
action = "OVERWRITE" if overwrite else "INTO"
92+
93+
query = f"LOAD DATA {action} {googlesql.identifier(destination_table)}\n"
94+
95+
if schema_fields:
96+
columns_sql = ",\n".join(
97+
bigframes.core.sql.schema_field_to_sql(field) for field in schema_fields
98+
)
99+
query += f"(\n{columns_sql}\n)\n"
100+
101+
if partition_by:
102+
query += f"PARTITION BY {partition_by}\n"
103+
104+
if cluster_by:
105+
query += f"CLUSTER BY {', '.join(cluster_by)}\n"
106+
107+
if table_options:
108+
opts_list = []
109+
for k, v in table_options.items():
110+
opts_list.append(f"{k}={bigframes.core.sql.simple_literal(v)}")
111+
query += f"OPTIONS({', '.join(opts_list)})\n"
112+
113+
files_opts = {}
114+
if load_options:
115+
files_opts.update(load_options)
116+
117+
files_opts["uris"] = uris
118+
files_opts["format"] = format
119+
120+
files_opts_list = []
121+
for k, v in files_opts.items():
122+
files_opts_list.append(f"{k}={bigframes.core.sql.simple_literal(v)}")
123+
124+
query += f"FROM FILES({', '.join(files_opts_list)})\n"
125+
126+
if hive_partition_columns:
127+
cols_sql = ",\n".join(
128+
bigframes.core.sql.schema_field_to_sql(field)
129+
for field in hive_partition_columns
130+
)
131+
query += f"WITH PARTITION COLUMNS (\n{cols_sql}\n)\n"
132+
elif hive_partition_columns is not None:
133+
query += "WITH PARTITION COLUMNS\n"
134+
135+
if connection:
136+
query += f"WITH CONNECTION {connection}\n"
137+
138+
return query

0 commit comments

Comments
 (0)