Skip to content

Commit 1a3b3fa

Browse files
authored
Merge branch 'master' into implmented_golden_checks_annual_pop
2 parents 0be9119 + 5d07395 commit 1a3b3fa

27 files changed

Lines changed: 651 additions & 407 deletions

import-automation/executor/app/executor/import_executor.py

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
AUTO_IMPORT_JOB_STATUS = "auto-import-job-status"
7878
IMPORT_SUMMARY_FILE = "import_summary.json"
7979
STAGING_VERSION_FILE = "staging_version.txt"
80+
MAX_LOG_CHUNK_SIZE = 50000
8081

8182

8283
class ImportStatus(Enum):
@@ -480,7 +481,8 @@ def _invoke_import_tool(self, absolute_import_dir: str,
480481
"latency": timer.time(),
481482
"input_index": input_index,
482483
"import_input": import_prefix,
483-
})
484+
},
485+
skip_stream_logging=True)
484486
process.check_returncode()
485487
logging.info(
486488
f'Generated resolved mcf for {import_prefix} in {output_path}.')
@@ -809,7 +811,8 @@ def _invoke_import_job(self, absolute_import_dir: str,
809811
"latency_secs": timer.time(),
810812
"script_index": script_index,
811813
"script_path": path,
812-
})
814+
},
815+
skip_stream_logging=True)
813816
process.check_returncode()
814817

815818
import_summary.import_stats['script_execution_time'] = start_timer.time(
@@ -1118,6 +1121,33 @@ def run_and_handle_exception(
11181121
return ExecutionResult(ImportStatus.FAILURE, [], message)
11191122

11201123

1124+
def _stream_payload_in_chunks(label: str, payload) -> None:
1125+
"""Helper function to split text and log in safe chunks to avoid
1126+
1127+
"Log entry too large" (InvalidArgument) errors.
1128+
"""
1129+
if not payload:
1130+
return
1131+
1132+
if isinstance(payload, bytes):
1133+
payload_str = payload.decode('utf-8', errors='replace')
1134+
else:
1135+
payload_str = str(payload)
1136+
1137+
chunk_size = MAX_LOG_CHUNK_SIZE
1138+
total_len = len(payload_str)
1139+
1140+
if total_len <= chunk_size:
1141+
logging.info(f'{label}:\n{payload_str}')
1142+
return
1143+
1144+
logging.info(f'--- Start of {label} (Total length: {total_len} chars) ---')
1145+
for i in range(0, total_len, chunk_size):
1146+
chunk = payload_str[i:i + chunk_size]
1147+
logging.info(f'[{label} Part {i//chunk_size + 1}]:\n{chunk}')
1148+
logging.info(f'--- End of {label} ---')
1149+
1150+
11211151
@log_function_call
11221152
def _run_with_timeout_async(args: List[str],
11231153
timeout: float,
@@ -1217,8 +1247,8 @@ def _run_with_timeout(args: List[str],
12171247
cwd=cwd,
12181248
env=env)
12191249
logging.info(
1220-
f'Completed command: {args}, retcode: {process.returncode}, stdout:'
1221-
f' {process.stdout}, stderr: {process.stderr}')
1250+
f'Completed command: {args}, retcode: {process.returncode}')
1251+
12221252
return process
12231253
except Exception as e:
12241254
message = traceback.format_exc()
@@ -1378,28 +1408,37 @@ def _construct_process_message(message: str,
13781408
message = (f'{message}\n'
13791409
f'[Subprocess command]: {command}\n'
13801410
f'[Subprocess return code]: {process.returncode}')
1381-
if process.stdout:
1382-
message += f'\n[Subprocess stdout]:\n{process.stdout}'
1383-
if process.stderr:
1384-
message += f'\n[Subprocess stderr]:\n{process.stderr}'
13851411
return message
13861412

13871413

13881414
def _log_process(process: subprocess.CompletedProcess,
13891415
import_name: str = '',
1390-
metrics: dict = {}) -> None:
1416+
metrics: dict = None,
1417+
skip_stream_logging: bool = False) -> None:
13911418
"""Logs the result of a subprocess.
13921419
13931420
Args:
13941421
process: subprocess.CompletedProcess object whose arguments, return code,
13951422
stdout, and stderr are to be logged.
1423+
import_name: Name of the import for labeling logs.
1424+
metrics: Dictionary to store execution metrics.
1425+
skip_stream_logging: Whether to skip chunked logging of stdout and stderr.
13961426
"""
1427+
if metrics is None:
1428+
metrics = {}
13971429
process_message = 'Subprocess succeeded'
13981430
if process.returncode:
13991431
process_message = 'Subprocess failed'
1432+
14001433
message = _construct_process_message(process_message, process)
14011434
logging.info(message)
14021435

1436+
if not skip_stream_logging:
1437+
_stream_payload_in_chunks(f'[{import_name}] Subprocess stdout',
1438+
process.stdout)
1439+
_stream_payload_in_chunks(f'[{import_name}] Subprocess stderr',
1440+
process.stderr)
1441+
14031442
status = ImportStatus.FAILURE if process.returncode else ImportStatus.SUCCESS
14041443
if import_name:
14051444
metrics["import_name"] = import_name

import-automation/executor/test/import_executor_test.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,7 @@ def test_construct_process_message(self):
8383
expected = (
8484
'message\n'
8585
'[Subprocess command]: printf "out" & >&2 printf "err" & exit 1\n'
86-
'[Subprocess return code]: 1\n'
87-
'[Subprocess stdout]:\n'
88-
'out\n'
89-
'[Subprocess stderr]:\n'
90-
'err')
86+
'[Subprocess return code]: 1')
9187
self.assertEqual(expected, message)
9288

9389
def test_construct_process_message_no_output(self):

import-automation/workflow/cloudbuild.yaml

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,40 +12,88 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
#
15-
# Child Cloud Build configuration for deploying to a specific environment.
16-
# Defaults are set to PRODUCTION. Staging builds must override these values.
17-
#
15+
# Parent Cloud Build configuration that orchestrates Staging and Production deployments.
16+
# Usage: gcloud builds submit . --config=cloudbuild.yaml --project=datcom-ci
17+
1818
substitutions:
19-
# Production config.
20-
_PROJECT_ID: 'datcom-import-automation-prod'
21-
_SPANNER_PROJECT_ID: 'datcom-store'
22-
_SPANNER_INSTANCE_ID: 'dc-kg-test'
23-
_SPANNER_DATABASE_ID: 'dc_graph_import'
24-
_SPANNER_GRAPH_DATABASE_ID: 'dc_graph_2025_11_07'
25-
_GCS_BUCKET_ID: 'datcom-prod-imports'
26-
_LOCATION: 'us-central1'
27-
_GCS_MOUNT_BUCKET: 'datcom-volume-mount'
19+
# Staging Configuration (Overrides defaults in child build)
20+
_PROJECT_ID: 'datcom-ci'
21+
_SPANNER_PROJECT_ID: 'datcom-ci'
22+
_SPANNER_INSTANCE_ID: 'datcom-spanner-test'
23+
_SPANNER_DATABASE_ID: 'dc-test-db'
24+
_SPANNER_GRAPH_DATABASE_ID: 'dc-test-db'
25+
_GCS_BUCKET_ID: 'datcom-ci-test'
26+
_GCS_MOUNT_BUCKET: 'datcom-ci-test'
2827
_BQ_DATASET_ID: 'datacommons'
29-
_PROJECT_NUMBER: '965988403328'
28+
_LOCATION: 'us-central1'
29+
_PROJECT_NUMBER: '879489846695'
3030
_AR_REPO_URL: 'us-docker.pkg.dev/datcom-ci/gcr.io'
31-
_BQ_SPANNER_CONN_ID: 'projects/datcom-import-automation-prod/locations/us/connections/bq_spanner_conn'
31+
_BQ_SPANNER_CONN_ID: 'projects/datcom-ci/locations/us-central1/connections/bq_spanner_conn_test'
32+
_VERSION: '${SHORT_SHA}'
33+
_DATAFLOW_TEMPLATE_PATH: 'gs://datcom-templates/templates/flex/ingestion.json'
34+
_PROD_TAG: 'latest' # TODO: rename to prod
3235

3336
steps:
34-
- id: 'ingestion-helper-service'
37+
38+
# 1. Build and push helper images
39+
- id: 'build-ingestion-helper'
40+
name: 'gcr.io/cloud-builders/gcloud'
41+
args: ['builds', 'submit', 'ingestion-helper', '--config', 'ingestion-helper/cloudbuild.yaml', '--substitutions', '_AR_REPO_URL=${_AR_REPO_URL},_VERSION=${_VERSION}']
42+
dir: 'import-automation/workflow'
43+
44+
- id: 'build-import-helper'
3545
name: 'gcr.io/cloud-builders/gcloud'
36-
args: ['run', 'deploy', 'ingestion-helper-service', '--image', '${_AR_REPO_URL}/datacommons-ingestion-helper:latest', '--region', '${_LOCATION}', '--project', '${_PROJECT_ID}', '--no-allow-unauthenticated', '--timeout', '60m', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},SPANNER_GRAPH_DATABASE_ID=${_SPANNER_GRAPH_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},BQ_DATASET_ID=${_BQ_DATASET_ID},BQ_SPANNER_CONN_ID=${_BQ_SPANNER_CONN_ID}']
46+
args: ['builds', 'submit', 'import-helper', '--config', 'import-helper/cloudbuild.yaml', '--substitutions', '_AR_REPO_URL=${_AR_REPO_URL},_VERSION=${_VERSION}']
47+
dir: 'import-automation/workflow'
3748

38-
- id: 'import-helper-service'
49+
# 2. Trigger Staging Build (Child)
50+
# Overrides default (Production) values with Staging values.
51+
- id: 'deploy-staging'
3952
name: 'gcr.io/cloud-builders/gcloud'
40-
args: ['run', 'deploy', 'import-helper-service', '--image', '${_AR_REPO_URL}/datacommons-import-helper:latest', '--region', '${_LOCATION}', '--project', '${_PROJECT_ID}', '--no-allow-unauthenticated', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},PROJECT_NUMBER=${_PROJECT_NUMBER},GCS_BUCKET_ID=${_GCS_BUCKET_ID}']
53+
args:
54+
- 'builds'
55+
- 'submit'
56+
- '.'
57+
- '--config=deploy-services.yaml'
58+
- '--project=${_PROJECT_ID}'
59+
- '--substitutions=_PROJECT_ID=${_PROJECT_ID},_SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},_SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},_SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},_SPANNER_GRAPH_DATABASE_ID=${_SPANNER_GRAPH_DATABASE_ID},_GCS_BUCKET_ID=${_GCS_BUCKET_ID},_LOCATION=${_LOCATION},_GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET},_BQ_DATASET_ID=${_BQ_DATASET_ID},_PROJECT_NUMBER=${_PROJECT_NUMBER},_BQ_SPANNER_CONN_ID=${_BQ_SPANNER_CONN_ID},_VERSION=${_VERSION},_DATAFLOW_TEMPLATE_PATH=${_DATAFLOW_TEMPLATE_PATH}'
60+
dir: 'import-automation/workflow'
61+
62+
# 2. Run E2E Tests on Staging
63+
- id: 'e2e-test-staging'
64+
name: 'python:3.11'
65+
entrypoint: 'bash'
66+
args:
67+
- '-c'
68+
- |
69+
pip install google-cloud-spanner google-cloud-workflows absl-py
70+
python spanner_ingestion_test.py
71+
env:
72+
- 'PROJECT_ID=${_PROJECT_ID}'
73+
- 'LOCATION=${_LOCATION}'
74+
- 'SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID}'
75+
- 'SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID}'
76+
- 'SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}'
77+
- 'GCS_BUCKET_ID=${_GCS_BUCKET_ID}'
78+
dir: 'import-automation/workflow'
4179

42-
- id: 'import-automation-workflow'
80+
- id: 'tag-prod'
4381
name: 'gcr.io/cloud-builders/gcloud'
44-
args: ['workflows', 'deploy', 'import-automation-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET},PROJECT_NUMBER=${_PROJECT_NUMBER}']
82+
args: ['builds', 'submit', '.', '--config', 'tag-prod.yaml', '--substitutions', '_AR_REPO_URL=${_AR_REPO_URL},_PROD_TAG=${_PROD_TAG},_VERSION=${_VERSION}']
83+
dir: 'import-automation/workflow'
4584

46-
- id: 'spanner-ingestion-workflow'
85+
# 3. Trigger Production Build (Child)
86+
# Uses default (Production) values defined in cloudbuild.yaml.
87+
- id: 'deploy-prod'
4788
name: 'gcr.io/cloud-builders/gcloud'
48-
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_GRAPH_DATABASE_ID},PROJECT_NUMBER=${_PROJECT_NUMBER}']
89+
args:
90+
- 'builds'
91+
- 'submit'
92+
- '.'
93+
- '--config=deploy-services.yaml'
94+
- '--project=${_PROJECT_ID}' # Build runs in CI project, deploys to Prod
95+
- '--substitutions=_VERSION=${_PROD_TAG}'
96+
dir: 'import-automation/workflow'
4997

5098
options:
5199
logging: CLOUD_LOGGING_ONLY

import-automation/workflow/cloudbuild_main.yaml

Lines changed: 0 additions & 90 deletions
This file was deleted.
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
# Child Cloud Build configuration for deploying to a specific environment.
16+
# Defaults are set to PRODUCTION. Staging builds must override these values.
17+
#
18+
substitutions:
19+
# Production config.
20+
_PROJECT_ID: 'datcom-import-automation-prod'
21+
_SPANNER_PROJECT_ID: 'datcom-store'
22+
_SPANNER_INSTANCE_ID: 'dc-kg-test'
23+
_SPANNER_DATABASE_ID: 'dc_graph_import'
24+
_SPANNER_GRAPH_DATABASE_ID: 'dc_graph_2025_11_07'
25+
_GCS_BUCKET_ID: 'datcom-prod-imports'
26+
_LOCATION: 'us-central1'
27+
_GCS_MOUNT_BUCKET: 'datcom-volume-mount'
28+
_BQ_DATASET_ID: 'datacommons'
29+
_PROJECT_NUMBER: '965988403328'
30+
_AR_REPO_URL: 'us-docker.pkg.dev/datcom-ci/gcr.io'
31+
_BQ_SPANNER_CONN_ID: 'projects/datcom-import-automation-prod/locations/us/connections/bq_spanner_conn'
32+
_VERSION: '${SHORT_SHA}'
33+
_DATAFLOW_TEMPLATE_PATH: 'gs://datcom-templates/templates/flex/ingestion-${_VERSION}.json'
34+
35+
steps:
36+
- id: 'ingestion-helper-service'
37+
name: 'gcr.io/cloud-builders/gcloud'
38+
args: ['run', 'deploy', 'ingestion-helper-service', '--image', '${_AR_REPO_URL}/datacommons-ingestion-helper:${_VERSION}', '--region', '${_LOCATION}', '--project', '${_PROJECT_ID}', '--no-allow-unauthenticated', '--timeout', '60m', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},SPANNER_GRAPH_DATABASE_ID=${_SPANNER_GRAPH_DATABASE_ID},GCS_BUCKET_ID=${_GCS_BUCKET_ID},BQ_DATASET_ID=${_BQ_DATASET_ID},BQ_SPANNER_CONN_ID=${_BQ_SPANNER_CONN_ID}']
39+
40+
- id: 'import-helper-service'
41+
name: 'gcr.io/cloud-builders/gcloud'
42+
args: ['run', 'deploy', 'import-helper-service', '--image', '${_AR_REPO_URL}/datacommons-import-helper:${_VERSION}', '--region', '${_LOCATION}', '--project', '${_PROJECT_ID}', '--no-allow-unauthenticated', '--set-env-vars', 'PROJECT_ID=${_PROJECT_ID},LOCATION=${_LOCATION},PROJECT_NUMBER=${_PROJECT_NUMBER},GCS_BUCKET_ID=${_GCS_BUCKET_ID}']
43+
44+
- id: 'import-automation-workflow'
45+
name: 'gcr.io/cloud-builders/gcloud'
46+
args: ['workflows', 'deploy', 'import-automation-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET},PROJECT_NUMBER=${_PROJECT_NUMBER}']
47+
48+
- id: 'spanner-ingestion-workflow'
49+
name: 'gcr.io/cloud-builders/gcloud'
50+
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_GRAPH_DATABASE_ID},PROJECT_NUMBER=${_PROJECT_NUMBER},DATAFLOW_TEMPLATE_PATH=${_DATAFLOW_TEMPLATE_PATH}']
51+
52+
options:
53+
logging: CLOUD_LOGGING_ONLY

0 commit comments

Comments
 (0)