Skip to content

Commit d3e0206

Browse files
authored
fix: big query ingestion job (#1611)
1 parent 52a4be8 commit d3e0206

4 files changed

Lines changed: 451 additions & 60 deletions

File tree

functions-python/big_query_ingestion/src/common/bq_data_transfer.py

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,16 @@
33
from datetime import datetime
44

55
from google.cloud import bigquery, storage
6-
from google.cloud.bigquery.job import LoadJobConfig, SourceFormat
76

7+
from shared.helpers.big_query_helpers import (
8+
collect_blobs_and_uris,
9+
make_staging_table_ref,
10+
ensure_staging_table_like_target,
11+
load_uris_into_staging,
12+
publish_staging_to_target,
13+
cleanup_success,
14+
cleanup_failure,
15+
)
816
from shared.helpers.bq_schema.schema import json_schema_to_bigquery, load_json_schema
917

1018
# Environment variables
@@ -60,50 +68,42 @@ def create_bigquery_table(self):
6068
)
6169

6270
def load_data_to_bigquery(self):
63-
"""Loads data from Cloud Storage to BigQuery."""
71+
"""Loads data from Cloud Storage to BigQuery atomically via a staging table.
72+
The process is:
73+
1. Create a staging table with the same schema as the target.
74+
2. Load data from GCS to the staging table.
75+
3. If load is successful, copy data from staging to target (overwriting).
76+
4. If all steps succeed, delete the blobs and staging table.
77+
5. If any step fails, delete the staging table and do not touch blobs or target.
78+
"""
6479
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
65-
table_ref = dataset_ref.table(table_id)
66-
source_uris = []
67-
# Get the list of blobs in the bucket
68-
blobs = list(
69-
self.storage_client.list_blobs(bucket_name, prefix=self.nd_json_path_prefix)
80+
target_table_ref = dataset_ref.table(table_id)
81+
82+
blobs, source_uris = collect_blobs_and_uris(
83+
self.storage_client,
84+
bucket_name=bucket_name,
85+
prefix=self.nd_json_path_prefix,
7086
)
71-
for blob in blobs:
72-
uri = f"gs://{bucket_name}/{blob.name}"
73-
source_uris.append(uri)
74-
logging.info("Found %s files to load to BigQuery.", len(source_uris))
75-
76-
if len(source_uris) > 0:
77-
# Load the data to BigQuery
78-
job_config = LoadJobConfig()
79-
job_config.source_format = SourceFormat.NEWLINE_DELIMITED_JSON
80-
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
81-
82-
load_job = self.bigquery_client.load_table_from_uri(
83-
source_uris, table_ref, job_config=job_config
87+
88+
if not source_uris:
89+
return
90+
91+
staging_table_ref = make_staging_table_ref(target_table_ref)
92+
93+
try:
94+
ensure_staging_table_like_target(
95+
self.bigquery_client, target_table_ref, staging_table_ref
96+
)
97+
load_uris_into_staging(self.bigquery_client, staging_table_ref, source_uris)
98+
publish_staging_to_target(
99+
self.bigquery_client, staging_table_ref, target_table_ref
84100
)
85-
try:
86-
load_job.result() # Wait for the job to complete
87-
logging.info(
88-
"Loaded %s files into %s.%s.%s.",
89-
len(source_uris),
90-
table_ref.project,
91-
table_ref.dataset_id,
92-
table_ref.table_id,
93-
)
94-
# If successful, delete the blobs
95-
for blob in blobs:
96-
blob.delete()
97-
logging.debug("Deleted blob: %s", blob.name)
98-
logging.info("Deleted blobs")
99-
except Exception as e:
100-
logging.error("An error occurred while loading data to BigQuery: %s", e)
101-
for error in load_job.errors:
102-
logging.error("Error: %s", error["message"])
103-
if "location" in error:
104-
logging.error("Location: %s", error["location"])
105-
if "reason" in error:
106-
logging.error("Reason: %s", error["reason"])
101+
cleanup_success(self.bigquery_client, staging_table_ref, blobs)
102+
103+
except Exception as e:
104+
logging.error("An error occurred while loading data to BigQuery: %s", e)
105+
cleanup_failure(self.bigquery_client, staging_table_ref)
106+
raise
107107

108108
def send_data_to_bigquery(self):
109109
"""Full process to send data to BigQuery."""

functions-python/big_query_ingestion/tests/test_common.py

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,37 +62,72 @@ def test_create_bigquery_table_exists(self, _):
6262
self.mock_bq_client().get_table.assert_called_once()
6363
self.mock_bq_client().create_table.assert_not_called()
6464

65+
@patch("common.bq_data_transfer.cleanup_success")
66+
@patch("common.bq_data_transfer.publish_staging_to_target")
67+
@patch("common.bq_data_transfer.load_uris_into_staging")
68+
@patch("common.bq_data_transfer.ensure_staging_table_like_target")
69+
@patch("common.bq_data_transfer.make_staging_table_ref")
70+
@patch("common.bq_data_transfer.collect_blobs_and_uris")
6571
@patch("common.bq_data_transfer.bigquery.DatasetReference")
66-
def test_load_data_to_bigquery(self, _):
72+
def test_load_data_to_bigquery(
73+
self,
74+
mock_dataset_ref,
75+
mock_collect_blobs,
76+
mock_make_staging,
77+
mock_ensure_staging,
78+
mock_load_uris,
79+
mock_publish,
80+
mock_cleanup,
81+
):
6782
mock_blob = MagicMock()
6883
mock_blob.name = "file1.ndjson"
69-
self.mock_storage_client().list_blobs.return_value = [mock_blob]
70-
71-
mock_load_job = MagicMock()
72-
self.mock_bq_client().load_table_from_uri.return_value = mock_load_job
84+
mock_collect_blobs.return_value = ([mock_blob], ["gs://bucket/file1.ndjson"])
85+
mock_make_staging.return_value = MagicMock()
7386

7487
self.transfer.load_data_to_bigquery()
7588

76-
self.mock_storage_client().list_blobs.assert_called_once()
77-
self.mock_bq_client().load_table_from_uri.assert_called_once()
78-
mock_load_job.result.assert_called_once()
89+
mock_collect_blobs.assert_called_once()
90+
mock_make_staging.assert_called_once()
91+
mock_ensure_staging.assert_called_once()
92+
mock_load_uris.assert_called_once()
93+
mock_publish.assert_called_once()
94+
mock_cleanup.assert_called_once()
7995

8096
@patch("common.bq_data_transfer.bigquery.DatasetReference")
81-
def test_load_data_to_bigquery_error(self, _):
97+
@patch("common.bq_data_transfer.collect_blobs_and_uris")
98+
@patch("common.bq_data_transfer.make_staging_table_ref")
99+
@patch("common.bq_data_transfer.ensure_staging_table_like_target")
100+
@patch(
101+
"common.bq_data_transfer.load_uris_into_staging",
102+
side_effect=Exception("Load job failed"),
103+
)
104+
@patch("common.bq_data_transfer.cleanup_failure")
105+
def test_load_data_to_bigquery_error(
106+
self,
107+
mock_cleanup,
108+
mock_load_uris,
109+
mock_ensure_staging,
110+
mock_make_staging,
111+
mock_collect_blobs,
112+
mock_dataset_ref,
113+
):
82114
mock_blob = MagicMock()
83115
mock_blob.name = "file1.ndjson"
84-
self.mock_storage_client().list_blobs.return_value = [mock_blob]
85-
86-
mock_load_job = MagicMock()
87-
mock_load_job.result.side_effect = Exception("Load job failed")
88-
self.mock_bq_client().load_table_from_uri.return_value = mock_load_job
116+
mock_collect_blobs.return_value = ([mock_blob], ["gs://bucket/file1.ndjson"])
117+
mock_make_staging.return_value = MagicMock()
89118

90-
with self.assertLogs(level="ERROR") as log:
119+
with self.assertLogs(level="ERROR") as log, self.assertRaises(Exception) as ctx:
91120
self.transfer.load_data_to_bigquery()
92121

93-
self.assertIn(
94-
"An error occurred while loading data to BigQuery: Load job failed",
95-
log.output[0],
122+
mock_collect_blobs.assert_called_once()
123+
mock_make_staging.assert_called_once()
124+
mock_ensure_staging.assert_called_once()
125+
mock_load_uris.assert_called_once()
126+
mock_cleanup.assert_called_once()
127+
assert "Load job failed" in str(ctx.exception)
128+
assert any(
129+
"An error occurred while loading data to BigQuery" in record
130+
for record in log.output
96131
)
97132

98133
@patch("common.bq_data_transfer.BigQueryDataTransfer.create_bigquery_dataset")
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import logging
2+
import time
3+
from typing import List, Tuple
4+
5+
from google.cloud import bigquery
6+
from google.cloud.bigquery import LoadJobConfig, CopyJobConfig, SourceFormat
7+
8+
# 500 below the documented max of 10k URIs per load job to allow for any overhead and avoid hitting limits.
9+
MAX_URIS_PER_JOB = 9500
10+
11+
12+
def chunked(seq: List[str], size: int):
13+
for i in range(0, len(seq), size):
14+
yield seq[i : i + size]
15+
16+
17+
def collect_blobs_and_uris(
18+
storage_client, bucket_name: str, prefix: str
19+
) -> Tuple[List[object], List[str]]:
20+
"""List blobs in GCS with given prefix and construct their URIs."""
21+
try:
22+
blobs = list(storage_client.list_blobs(bucket_name, prefix=prefix))
23+
uris = [f"gs://{bucket_name}/{b.name}" for b in blobs]
24+
logging.info("Found %s files to load to BigQuery.", len(uris))
25+
return blobs, uris
26+
except Exception as e:
27+
logging.error("Failed to list blobs or construct URIs: %s", e)
28+
raise
29+
30+
31+
def make_staging_table_ref(
32+
target_table_ref: bigquery.TableReference,
33+
) -> bigquery.TableReference:
34+
"""Construct a staging table reference in the same dataset with a unique name."""
35+
try:
36+
staging_table_id = f"{target_table_ref.table_id}__staging_{int(time.time())}"
37+
dataset_ref = bigquery.DatasetReference(
38+
target_table_ref.project, target_table_ref.dataset_id
39+
)
40+
return dataset_ref.table(staging_table_id)
41+
except Exception as e:
42+
logging.error("Failed to construct staging table reference: %s", e)
43+
raise
44+
45+
46+
def ensure_staging_table_like_target(
47+
bigquery_client,
48+
target_table_ref: bigquery.TableReference,
49+
staging_table_ref: bigquery.TableReference,
50+
) -> None:
51+
"""Create staging table with same schema (and partitioning/clustering) as target."""
52+
try:
53+
target_tbl = bigquery_client.get_table(target_table_ref)
54+
55+
staging_tbl = bigquery.Table(staging_table_ref, schema=target_tbl.schema)
56+
staging_tbl.time_partitioning = getattr(target_tbl, "time_partitioning", None)
57+
staging_tbl.range_partitioning = getattr(target_tbl, "range_partitioning", None)
58+
staging_tbl.clustering_fields = getattr(target_tbl, "clustering_fields", None)
59+
60+
bigquery_client.create_table(staging_tbl, exists_ok=True)
61+
logging.info(
62+
"Staging table ready: %s.%s.%s",
63+
staging_table_ref.project,
64+
staging_table_ref.dataset_id,
65+
staging_table_ref.table_id,
66+
)
67+
except Exception as e:
68+
logging.error("Failed to create staging table like target: %s", e)
69+
raise
70+
71+
72+
def load_uris_into_staging(
73+
bigquery_client,
74+
staging_table_ref: bigquery.TableReference,
75+
source_uris: List[str],
76+
) -> None:
77+
"""Load NDJSON files into staging in batches (10k URIs max/job)."""
78+
try:
79+
for batch_idx, uri_batch in enumerate(
80+
chunked(source_uris, MAX_URIS_PER_JOB), start=1
81+
):
82+
logging.info(
83+
"Loading batch %s into staging (%s files)...", batch_idx, len(uri_batch)
84+
)
85+
job_cfg = LoadJobConfig(
86+
source_format=SourceFormat.NEWLINE_DELIMITED_JSON,
87+
write_disposition=(
88+
bigquery.WriteDisposition.WRITE_TRUNCATE
89+
if batch_idx == 1
90+
else bigquery.WriteDisposition.WRITE_APPEND
91+
),
92+
)
93+
job = bigquery_client.load_table_from_uri(
94+
uri_batch, staging_table_ref, job_config=job_cfg
95+
)
96+
job.result() # fail fast
97+
98+
staging_loaded = bigquery_client.get_table(staging_table_ref)
99+
logging.info(
100+
"All batches loaded into staging. Reported rows: %s",
101+
staging_loaded.num_rows,
102+
)
103+
except Exception as e:
104+
logging.error("Failed to load URIs into staging: %s", e)
105+
raise
106+
107+
108+
def publish_staging_to_target(
109+
bigquery_client,
110+
staging_table_ref: bigquery.TableReference,
111+
target_table_ref: bigquery.TableReference,
112+
) -> None:
113+
"""Replace target contents with staging contents (publish moment)."""
114+
try:
115+
copy_cfg = CopyJobConfig(
116+
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
117+
)
118+
119+
logging.info("Publishing: copying staging to target with WRITE_TRUNCATE...")
120+
copy_job = bigquery_client.copy_table(
121+
sources=staging_table_ref,
122+
destination=target_table_ref,
123+
job_config=copy_cfg,
124+
)
125+
copy_job.result()
126+
logging.info("Publish complete: target replaced successfully.")
127+
except Exception as e:
128+
logging.error("Failed to publish staging to target: %s", e)
129+
raise
130+
131+
132+
def cleanup_success(
133+
bigquery_client, staging_table_ref: bigquery.TableReference, blobs: List[object]
134+
) -> None:
135+
"""Delete staging table and source blobs after successful publish."""
136+
try:
137+
bigquery_client.delete_table(staging_table_ref, not_found_ok=True)
138+
logging.info("Deleted staging table.")
139+
140+
for b in blobs:
141+
b.delete()
142+
logging.info("Deleted %s blobs.", len(blobs))
143+
except Exception as e:
144+
logging.error("Failed during cleanup after success: %s", e)
145+
raise
146+
147+
148+
def cleanup_failure(
149+
bigquery_client, staging_table_ref: bigquery.TableReference
150+
) -> None:
151+
"""Attempt to delete staging table after failure, but log and continue if it fails (to preserve for inspection)."""
152+
try:
153+
bigquery_client.delete_table(staging_table_ref, not_found_ok=True)
154+
logging.info("Deleted staging table after failure.")
155+
except Exception as e:
156+
logging.warning(
157+
"Failed to delete staging table after failure; leaving it for inspection. Exception: %s",
158+
e,
159+
)

0 commit comments

Comments
 (0)