Skip to content
This repository was archived by the owner on Mar 6, 2026. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 73 additions & 68 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2490,6 +2490,51 @@ def api_request(*args, **kwargs):
page_size=page_size,
)

def _prepare_load_config(
self, job_config: Optional[LoadJobConfig] = None
) -> LoadJobConfig:
"""Helper to construct a load job configuration.

Args:
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
A user-supplied job configuration.

Returns:
google.cloud.bigquery.job.LoadJobConfig:
The job configuration to use for a load job.
"""
if job_config is not None:
_verify_job_config_type(job_config, LoadJobConfig)
else:
job_config = job.LoadJobConfig()

return job_config._fill_from_default(self._default_load_job_config)

def _prepare_load_job(
self,
destination: Union[Table, TableReference, str],
job_config: LoadJobConfig,
job_id: Optional[str] = None,
job_id_prefix: Optional[str] = None,
location: Optional[str] = None,
project: Optional[str] = None,
source_uris: Optional[Sequence[str]] = None,
) -> job.LoadJob:
"""Helper for `load_table_from_` methods to prepare a LoadJob."""
job_id = _make_job_id(job_id, job_id_prefix)

if project is None:
project = self.project

if location is None:
location = self.location

job_ref = job._JobReference(job_id, project=project, location=location)

destination = _table_arg_to_table_ref(destination, default_project=self.project)

return job.LoadJob(job_ref, source_uris, destination, self, job_config)

def load_table_from_uri(
self,
source_uris: Union[str, Sequence[str]],
Expand Down Expand Up @@ -2547,31 +2592,21 @@ def load_table_from_uri(
If ``job_config`` is not an instance of
:class:`~google.cloud.bigquery.job.LoadJobConfig` class.
"""
job_id = _make_job_id(job_id, job_id_prefix)

if project is None:
project = self.project

if location is None:
location = self.location

job_ref = job._JobReference(job_id, project=project, location=location)

if isinstance(source_uris, str):
source_uris = [source_uris]

destination = _table_arg_to_table_ref(destination, default_project=self.project)

if job_config is not None:
_verify_job_config_type(job_config, LoadJobConfig)
else:
job_config = job.LoadJobConfig()

new_job_config = job_config._fill_from_default(self._default_load_job_config)
new_job_config = self._prepare_load_config(job_config)
load_job = self._prepare_load_job(
destination,
new_job_config,
job_id=job_id,
job_id_prefix=job_id_prefix,
location=location,
project=project,
source_uris=source_uris,
)

load_job = job.LoadJob(job_ref, source_uris, destination, self, new_job_config)
load_job._begin(retry=retry, timeout=timeout)

return load_job

def load_table_from_file(
Expand Down Expand Up @@ -2647,25 +2682,15 @@ def load_table_from_file(
If ``job_config`` is not an instance of
:class:`~google.cloud.bigquery.job.LoadJobConfig` class.
"""
job_id = _make_job_id(job_id, job_id_prefix)

if project is None:
project = self.project

if location is None:
location = self.location

destination = _table_arg_to_table_ref(destination, default_project=self.project)
job_ref = job._JobReference(job_id, project=project, location=location)

if job_config is not None:
_verify_job_config_type(job_config, LoadJobConfig)
else:
job_config = job.LoadJobConfig()

new_job_config = job_config._fill_from_default(self._default_load_job_config)

load_job = job.LoadJob(job_ref, None, destination, self, new_job_config)
new_job_config = self._prepare_load_config(job_config)
load_job = self._prepare_load_job(
destination,
new_job_config,
job_id=job_id,
job_id_prefix=job_id_prefix,
location=location,
project=project,
)
job_resource = load_job.to_api_repr()

if rewind:
Expand Down Expand Up @@ -2796,14 +2821,7 @@ def load_table_from_dataframe(
If ``job_config`` is not an instance of
:class:`~google.cloud.bigquery.job.LoadJobConfig` class.
"""
job_id = _make_job_id(job_id, job_id_prefix)

if job_config is not None:
_verify_job_config_type(job_config, LoadJobConfig)
else:
job_config = job.LoadJobConfig()

new_job_config = job_config._fill_from_default(self._default_load_job_config)
new_job_config = self._prepare_load_config(job_config)

supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET}
if new_job_config.source_format is None:
Expand All @@ -2830,9 +2848,6 @@ def load_table_from_dataframe(
# pyarrow is now the only supported parquet engine.
raise ValueError("This method requires pyarrow to be installed")

if location is None:
location = self.location

# If table schema is not provided, we try to fetch the existing table
# schema, and check if dataframe schema is compatible with it - except
# for WRITE_TRUNCATE jobs, the existing schema does not matter then.
Expand Down Expand Up @@ -2877,8 +2892,14 @@ def load_table_from_dataframe(
stacklevel=2,
)

# We need a unique suffix for every load_table_from_dataframe call to
# avoid collisions.
# See: https://github.com/googleapis/python-bigquery/issues/1363
session_suffix = uuid.uuid4().hex
tmpfd, tmppath = tempfile.mkstemp(
suffix="_job_{}.{}".format(job_id[:8], new_job_config.source_format.lower())
suffix="_job_{}.{}".format(
session_suffix, new_job_config.source_format.lower()
)
)
os.close(tmpfd)

Expand Down Expand Up @@ -3012,15 +3033,7 @@ def load_table_from_json(
If ``job_config`` is not an instance of
:class:`~google.cloud.bigquery.job.LoadJobConfig` class.
"""
job_id = _make_job_id(job_id, job_id_prefix)

if job_config is not None:
_verify_job_config_type(job_config, LoadJobConfig)
else:
job_config = job.LoadJobConfig()

new_job_config = job_config._fill_from_default(self._default_load_job_config)

new_job_config = self._prepare_load_config(job_config)
new_job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON

# In specific conditions, we check if the table alread exists, and/or
Expand All @@ -3040,14 +3053,6 @@ def load_table_from_json(
else:
new_job_config.autodetect = False

if project is None:
project = self.project

if location is None:
location = self.location

destination = _table_arg_to_table_ref(destination, default_project=self.project)

data_str = "\n".join(json.dumps(item, ensure_ascii=False) for item in json_rows)
encoded_str = data_str.encode()
data_file = io.BytesIO(encoded_str)
Expand Down
Loading