Skip to content

Commit def0d17

Browse files
committed
WIP: add ingestion workflow logic
1 parent 95b0633 commit def0d17

26 files changed

Lines changed: 3565 additions & 4 deletions

pipeline/spanner/src/main/resources/spanner_schema.sql

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,38 @@
1+
-- Copyright 2026 Google LLC
2+
--
3+
-- Licensed under the Apache License, Version 2.0 (the "License")
4+
-- you may not use this file except in compliance with the License.
5+
-- You may obtain a copy of the License at
6+
--
7+
-- http://www.apache.org/licenses/LICENSE-2.0
8+
--
9+
-- Unless required by applicable law or agreed to in writing, software
10+
-- distributed under the License is distributed on an "AS IS" BASIS,
11+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
-- See the License for the specific language governing permissions and
13+
-- limitations under the License.
14+
115
CREATE PROTO BUNDLE (
216
`org.datacommons.Observations`
3-
)
17+
);
418

519
CREATE TABLE Node (
620
subject_id STRING(1024) NOT NULL,
721
value STRING(MAX),
822
bytes BYTES(MAX),
923
name STRING(MAX),
1024
types ARRAY<STRING(1024)>,
25+
last_update_timestamp TIMESTAMP OPTIONS (allow_commit_timestamp=true),
1126
name_tokenlist TOKENLIST AS (TOKENIZE_FULLTEXT(name)) HIDDEN,
12-
) PRIMARY KEY(subject_id)
27+
) PRIMARY KEY(subject_id);
1328

1429
CREATE TABLE Edge (
1530
subject_id STRING(1024) NOT NULL,
1631
predicate STRING(1024) NOT NULL,
1732
object_id STRING(1024) NOT NULL,
1833
provenance STRING(1024) NOT NULL,
1934
) PRIMARY KEY(subject_id, predicate, object_id, provenance),
20-
INTERLEAVE IN Node
35+
INTERLEAVE IN Node;
2136

2237
CREATE TABLE Observation (
2338
observation_about STRING(1024) NOT NULL,
@@ -31,4 +46,66 @@ CREATE TABLE Observation (
3146
import_name STRING(1024),
3247
provenance_url STRING(1024),
3348
is_dc_aggregate BOOL,
34-
) PRIMARY KEY(observation_about, variable_measured, facet_id)
49+
) PRIMARY KEY(observation_about, variable_measured, facet_id);
50+
51+
CREATE TABLE ImportStatus (
52+
ImportName STRING(MAX) NOT NULL,
53+
LatestVersion STRING(MAX),
54+
GraphPath STRING(MAX),
55+
State STRING(1024) NOT NULL,
56+
JobId STRING(1024),
57+
WorkflowId STRING(1024),
58+
ExecutionTime INT64,
59+
DataVolume INT64,
60+
DataImportTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ),
61+
StatusUpdateTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ),
62+
NextRefreshTimestamp TIMESTAMP,
63+
) PRIMARY KEY(ImportName);
64+
65+
CREATE TABLE IngestionHistory (
66+
CompletionTimestamp TIMESTAMP NOT NULL OPTIONS ( allow_commit_timestamp = TRUE ),
67+
IngestionFailure Bool NOT NULL,
68+
WorkflowExecutionID STRING(1024) NOT NULL,
69+
DataflowJobID STRING(1024),
70+
IngestedImports ARRAY<STRING(MAX)>,
71+
ExecutionTime INT64,
72+
NodeCount INT64,
73+
EdgeCount INT64,
74+
ObservationCount INT64,
75+
) PRIMARY KEY(CompletionTimestamp DESC);
76+
77+
CREATE TABLE ImportVersionHistory (
78+
ImportName STRING(MAX) NOT NULL,
79+
Version STRING(MAX) NOT NULL,
80+
UpdateTimestamp TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
81+
Comment STRING(MAX),
82+
) PRIMARY KEY (ImportName, UpdateTimestamp DESC);
83+
84+
CREATE TABLE IngestionLock (
85+
LockID STRING(1024) NOT NULL,
86+
LockOwner STRING(1024),
87+
AcquiredTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ),
88+
) PRIMARY KEY(LockID);
89+
90+
CREATE PROPERTY GRAPH DCGraph
91+
NODE TABLES(
92+
Node
93+
KEY(subject_id)
94+
LABEL Node PROPERTIES(
95+
bytes,
96+
name,
97+
subject_id,
98+
types,
99+
value)
100+
)
101+
EDGE TABLES(
102+
Edge
103+
KEY(subject_id, predicate, object_id, provenance)
104+
SOURCE KEY(subject_id) REFERENCES Node(subject_id)
105+
DESTINATION KEY(object_id) REFERENCES Node(subject_id)
106+
LABEL Edge PROPERTIES(
107+
object_id,
108+
predicate,
109+
provenance,
110+
subject_id)
111+
);

workflow/cloudbuild.yaml

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
# Child Cloud Build configuration for deploying to a specific environment.
16+
# Defaults are set to PRODUCTION. Staging builds must override these values.
17+
#
18+
substitutions:
19+
# Production config.
20+
_PROJECT_ID: 'datcom-import-automation-prod'
21+
_SPANNER_PROJECT_ID: 'datcom-store'
22+
_SPANNER_INSTANCE_ID: 'dc-kg-test'
23+
_SPANNER_DATABASE_ID: 'dc_graph_import'
24+
_SPANNER_GRAPH_DATABASE_ID: 'dc_graph_2025_11_07'
25+
_GCS_BUCKET_ID: 'datcom-prod-imports'
26+
_LOCATION: 'us-central1'
27+
_GCS_MOUNT_BUCKET: 'datcom-volume-mount'
28+
_BQ_DATASET_ID: 'datacommons'
29+
30+
steps:
31+
- id: 'import-automation-workflow'
32+
name: 'gcr.io/cloud-builders/gcloud'
33+
args: ['workflows', 'deploy', 'import-automation-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'import-automation-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},GCS_BUCKET_ID=${_GCS_BUCKET_ID},GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET}']
34+
35+
- id: 'spanner-ingestion-workflow'
36+
name: 'gcr.io/cloud-builders/gcloud'
37+
args: ['workflows', 'deploy', 'spanner-ingestion-workflow', '--project', '${_PROJECT_ID}', '--location', '${_LOCATION}', '--source', 'spanner-ingestion-workflow.yaml', '--set-env-vars', 'LOCATION=${_LOCATION},PROJECT_ID=${_PROJECT_ID},SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},SPANNER_DATABASE_ID=${_SPANNER_GRAPH_DATABASE_ID}']
38+
39+
- id: 'deploy-ingestion-helper'
40+
name: 'gcr.io/cloud-builders/gcloud'
41+
args: ['builds', 'submit', 'import-automation/workflow/ingestion-helper', '--config', 'import-automation/workflow/ingestion-helper/cloudbuild.yaml', '--substitutions', '_PROJECT_ID=${_PROJECT_ID},_LOCATION=${_LOCATION}']
42+
43+
- id: 'deploy-import-helper'
44+
name: 'gcr.io/cloud-builders/gcloud'
45+
args: ['builds', 'submit', 'import-automation/workflow/import-helper', '--config', 'import-automation/workflow/import-helper/cloudbuild.yaml', '--substitutions', '_PROJECT_ID=${_PROJECT_ID},_LOCATION=${_LOCATION}']
46+
47+
options:
48+
logging: CLOUD_LOGGING_ONLY

workflow/cloudbuild_main.yaml

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
# Parent Cloud Build configuration that orchestrates Staging and Production deployments.
16+
# Usage: gcloud builds submit . --config=cloudbuild.yaml --project=datcom-ci
17+
18+
substitutions:
19+
# Staging Configuration (Overrides defaults in child build)
20+
_PROJECT_ID: 'datcom-ci'
21+
_SPANNER_PROJECT_ID: 'datcom-ci'
22+
_SPANNER_INSTANCE_ID: 'datcom-spanner-test'
23+
_SPANNER_DATABASE_ID: 'dc-test-db'
24+
_SPANNER_GRAPH_DATABASE_ID: 'dc-test-db'
25+
_GCS_BUCKET_ID: 'datcom-ci-test'
26+
_GCS_MOUNT_BUCKET: 'datcom-ci-test'
27+
_BQ_DATASET_ID: 'datacommons'
28+
_LOCATION: 'us-central1'
29+
30+
steps:
31+
32+
# 1. Trigger Staging Build (Child)
33+
# Overrides default (Production) values with Staging values.
34+
- id: 'deploy-staging'
35+
name: 'gcr.io/cloud-builders/gcloud'
36+
args:
37+
- 'builds'
38+
- 'submit'
39+
- '.'
40+
- '--config=cloudbuild.yaml'
41+
- '--project=${_PROJECT_ID}'
42+
- '--service-account=projects/${_PROJECT_ID}/serviceAccounts/import-automation-sa@${_PROJECT_ID}.iam.gserviceaccount.com'
43+
- '--default-buckets-behavior=REGIONAL_USER_OWNED_BUCKET'
44+
- '--substitutions=_PROJECT_ID=${_PROJECT_ID},_SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID},_SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID},_SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID},_GCS_BUCKET_ID=${_GCS_BUCKET_ID},_LOCATION=${_LOCATION},_GCS_MOUNT_BUCKET=${_GCS_MOUNT_BUCKET},_BQ_DATASET_ID=${_BQ_DATASET_ID}'
45+
dir: 'import-automation/workflow'
46+
47+
# 2. Run E2E Tests on Staging
48+
- id: 'e2e-test-staging'
49+
name: 'python:3.11'
50+
entrypoint: 'bash'
51+
args:
52+
- '-c'
53+
- |
54+
pip install google-cloud-spanner google-cloud-workflows absl-py
55+
python spanner_ingestion_test.py
56+
env:
57+
- 'PROJECT_ID=${_PROJECT_ID}'
58+
- 'LOCATION=${_LOCATION}'
59+
- 'SPANNER_PROJECT_ID=${_SPANNER_PROJECT_ID}'
60+
- 'SPANNER_INSTANCE_ID=${_SPANNER_INSTANCE_ID}'
61+
- 'SPANNER_DATABASE_ID=${_SPANNER_DATABASE_ID}'
62+
- 'GCS_BUCKET_ID=${_GCS_BUCKET_ID}'
63+
dir: 'import-automation/workflow'
64+
65+
# 3. Trigger Production Build (Child)
66+
# Uses default (Production) values defined in cloudbuild_deploy.yaml.
67+
- id: 'deploy-prod'
68+
name: 'gcr.io/cloud-builders/gcloud'
69+
args:
70+
- 'builds'
71+
- 'submit'
72+
- '.'
73+
- '--config=cloudbuild.yaml'
74+
- '--project=${_PROJECT_ID}' # Build runs in CI project, deploys to Prod
75+
- '--service-account=projects/${_PROJECT_ID}/serviceAccounts/import-automation-sa@${_PROJECT_ID}.iam.gserviceaccount.com'
76+
- '--default-buckets-behavior=REGIONAL_USER_OWNED_BUCKET'
77+
dir: 'import-automation/workflow'
78+
79+
options:
80+
logging: CLOUD_LOGGING_ONLY
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
main:
2+
params: [args]
3+
steps:
4+
- init:
5+
assign:
6+
- projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
7+
- region: ${sys.get_env("LOCATION")}
8+
- imageUri: ${default(map.get(args, "imageUri"), "us-docker.pkg.dev/datcom-ci/gcr.io/dc-import-executor:stable")}
9+
- jobId: ${text.replace_all(text.to_lower(text.substring(text.split(args.importName, ":")[1], 0, 50) + "-" + string(int(sys.now()))), "_", "-")}
10+
- importName: ${args.importName}
11+
- importConfig: ${default(map.get(args, "importConfig"), "{}")}
12+
- gcsMountBucket: ${sys.get_env("GCS_MOUNT_BUCKET")}
13+
- gcsImportBucket: ${sys.get_env("GCS_BUCKET_ID")}
14+
- gcsMountPath: "/tmp/gcs"
15+
- ingestionHelper: "spanner-ingestion-helper"
16+
- functionUrl: ${sys.get_env("INGESTION_HELPER_URL")}
17+
- startTime: ${sys.now()}
18+
- defaultResources:
19+
machine: "n2-standard-8"
20+
cpu: 8000
21+
memory: 32768
22+
disk: 100
23+
- resources: ${default(map.get(args, "resources"), defaultResources)}
24+
- runIngestion: ${default(map.get(args, "runIngestion"), false)}
25+
- ingestionArgs:
26+
importList:
27+
- ${text.split(importName, ":")[1]}
28+
- runImportJob:
29+
try:
30+
call: googleapis.batch.v1.projects.locations.jobs.create
31+
args:
32+
parent: ${"projects/" + projectId + "/locations/" + region}
33+
jobId: ${jobId}
34+
body:
35+
allocationPolicy:
36+
instances:
37+
- policy:
38+
machineType: ${resources.machine}
39+
provisioningModel: "STANDARD"
40+
bootDisk:
41+
image: "projects/debian-cloud/global/images/family/debian-12"
42+
size_gb: ${resources.disk}
43+
installOpsAgent: true
44+
taskGroups:
45+
taskSpec:
46+
volumes:
47+
- gcs:
48+
remotePath: ${gcsMountBucket}
49+
mountPath: ${gcsMountPath}
50+
computeResource:
51+
cpuMilli: ${resources.cpu}
52+
memoryMib: ${resources.memory}
53+
runnables:
54+
- container:
55+
imageUri: ${imageUri}
56+
commands:
57+
- ${"--import_name=" + importName}
58+
- ${"--import_config=" + importConfig}
59+
environment:
60+
variables:
61+
IMPORT_NAME: ${importName}
62+
BATCH_JOB_NAME: ${jobId}
63+
taskCount: 1
64+
parallelism: 1
65+
logsPolicy:
66+
destination: CLOUD_LOGGING
67+
connector_params:
68+
timeout: 604800 #7 days
69+
polling_policy:
70+
initial_delay: 60
71+
multiplier: 2
72+
max_delay: 600
73+
result: importJobResponse
74+
except:
75+
as: e
76+
steps:
77+
- updateImportStatus:
78+
call: http.post
79+
args:
80+
url: ${functionUrl}
81+
auth:
82+
type: OIDC
83+
body:
84+
actionType: 'update_import_status'
85+
jobId: ${jobId}
86+
importName: ${importName}
87+
status: 'FAILURE'
88+
executionTime: ${int(sys.now() - startTime)}
89+
latestVersion: ${"gs://" + gcsImportBucket + "/" + text.replace_all(importName, ":", "/")}
90+
result: functionResponse
91+
- failWorkflow:
92+
raise: ${e}
93+
- updateImportVersion:
94+
call: http.post
95+
args:
96+
url: ${functionUrl}
97+
auth:
98+
type: OIDC
99+
body:
100+
actionType: 'update_import_version'
101+
importName: ${importName}
102+
version: 'STAGING'
103+
override: false
104+
comment: '${"import-workflow:" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}'
105+
result: functionResponse
106+
- runIngestion:
107+
switch:
108+
- condition: ${runIngestion}
109+
steps:
110+
- runSpannerIngestion:
111+
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.create
112+
args:
113+
parent: ${"projects/" + projectId + "/locations/" + region + "/workflows/spanner-ingestion-workflow"}
114+
body:
115+
argument: ${json.encode_to_string(ingestionArgs)}
116+
connector_params:
117+
skip_polling: true
118+
- returnResult:
119+
return:
120+
jobId: ${jobId}
121+
importName: ${importName}

workflow/import-helper/Dockerfile

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
FROM python:3.12-slim
16+
17+
# Copy uv binary
18+
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
19+
20+
# Allow statements and log messages to immediately appear in the logs
21+
ENV PYTHONUNBUFFERED True
22+
23+
WORKDIR /app
24+
25+
# Copy local code to the container image.
26+
COPY . .
27+
28+
# Install production dependencies using uv.
29+
RUN uv pip install --system --no-cache -r requirements.txt
30+
31+
# Run the functions framework
32+
CMD ["functions-framework", "--target", "handle_feed_event"]

0 commit comments

Comments
 (0)