11#! /bin/bash
22
3- # This script runs the Dataflow snapshot import job .
3+ # This script runs a range of Dataflow snapshot import jobs sequentially .
44# It should be executed from the 'bigtable-dataflow-parent/bigtable-beam-import' directory.
5+ #
6+ # Usage: ./run-snapshot-import.sh <start_shard> <end_shard>
7+ # Or: ./run-snapshot-import.sh --all
8+ # Example: ./run-snapshot-import.sh 0 3
9+ # Example: ./run-snapshot-import.sh --all
10+ #
11+ # You can override default configurations by setting environment variables in your terminal.
12+ # Example: TABLE_NAME="my-table" SNAPSHOT_NAME="my-snap" ./run-snapshot-import.sh 0 3
13+ #
14+ # --- Manual Parallel Execution ---
15+ # To run shards in parallel groups of 4 (assuming 20 shards total), you can run 5 instances of this script.
16+ #
17+ # IMPORTANT: Shard 0 performs the restore step. You MUST run the first group (including shard 0)
18+ # first and let it complete the restore step before launching other groups in parallel,
19+ # otherwise they will fail because the restored files won't exist yet!
20+ #
21+ # Example for manual parallel execution:
22+ # ./run-snapshot-import.sh 0 3 & # Run this first!
23+ # # Wait for shard 0 to finish restore, then run the rest:
24+ # ./run-snapshot-import.sh 4 7 &
25+ # ./run-snapshot-import.sh 8 11 &
26+ # ./run-snapshot-import.sh 12 15 &
27+ # ./run-snapshot-import.sh 16 19 &
28+ #
29+ # --- Automated Parallel Execution ---
30+ # Alternatively, use the --all flag to automatically handle the restore step and launch all groups:
31+ # ./run-snapshot-import.sh --all
532
6- export PROJECT_ID=db-blackbelt-cndb
7- export INSTANCE_ID=bench-workload
8- export TABLE_NAME=validation_test
9- export SNAPSHOT_NAME=validation_test_20200929
10- export SERVICE_ACCOUNT=295490517436-compute@developer.gserviceaccount.com
33+ if [ " $# " -ne 2 ] && [ " $1 " != " --all " ] ; then
34+ echo " Usage: $0 <start_shard> <end_shard> "
35+ echo " Or: $0 --all "
36+ exit 1
37+ fi
1138
12- export BUCKET=jh-data-sandbox-backups
13- export REGION=us-west1
39+ START_SHARD=$1
40+ END_SHARD=$2
41+
42+ # Configurations (Uses environment variables if set, otherwise defaults)
43+ export PROJECT_ID=" ${PROJECT_ID:- google.com: cloud-bigtable-dev} "
44+ export INSTANCE_ID=" ${INSTANCE_ID:- tianlei-test-inst} "
45+ export BUCKET=" ${BUCKET:- tianlei-beam-test-bucket} "
46+ export REGION=" ${REGION:- us-central1} "
47+
48+ export TABLE_NAME=" ${TABLE_NAME:- validation_test} "
49+ export SNAPSHOT_NAME=" ${SNAPSHOT_NAME:- validation_test_20200929} "
50+ export SERVICE_ACCOUNT=" ${SERVICE_ACCOUNT:- 295490517436-compute@ developer.gserviceaccount.com} "
51+
52+ export NUM_SHARDS=" ${NUM_SHARDS:- 20} "
53+
54+ export NETWORK=" ${NETWORK:- tianlei-network} "
55+ export SUBNETWORK=" ${SUBNETWORK:- regions/ us-central1/ subnetworks/ tianlei-network} "
1456
15- # Using version 2.17.0 as per the current project version
1657JAR_PATH=" target/bigtable-beam-import-2.17.0-shaded.jar"
1758
18- echo " Submitting Dataflow job for shardIndex: 0. The initial job will restore the snapshot to the first iteration and will skip this step for subsequent jobs"
19- java -jar ${JAR_PATH} importsnapshot \
20- --runner=DataflowRunner \
21- --project=${PROJECT_ID} \
22- --bigtableInstanceId=${INSTANCE_ID} \
23- --bigtableTableId=${TABLE_NAME} \
24- --importConfigFilePath=import-config-test.json \
25- --stagingLocation=gs://${BUCKET} /dataflow/staging \
26- --tempLocation=gs://${BUCKET} /dataflow/temp \
27- --workerMachineType=n1-highmem-4 \
28- --diskSizeGb=500 \
29- --maxNumWorkers=10 \
30- --region=${REGION} \
31- --serviceAccount=${SERVICE_ACCOUNT} \
32- --usePublicIps=false \
33- --enableSnappy=true \
34- --skipRestoreStep=false \
35- --numShards=20 \
36- --shardIndex=0
59+ # --- AUTO-PARALLEL MODE ---
60+ if [ " $1 " == " --all" ]; then
61+ echo " 🚀 Starting fully automated snapshot import..."
62+
63+ # Step 1: Perform ONLY the restore step
64+ echo " Step 1/2: Performing snapshot restore (blocking)..."
65+ /usr/lib/jvm/java-21-openjdk-amd64/bin/java -Dnet.bytebuddy.experimental=true -jar ${JAR_PATH} importsnapshot \
66+ --runner=DataflowRunner \
67+ --project=${PROJECT_ID} \
68+ --bigtableInstanceId=${INSTANCE_ID} \
69+ --bigtableTableId=${TABLE_NAME} \
70+ --importConfigFilePath=import-config-test.json \
71+ --stagingLocation=gs://${BUCKET} /dataflow/staging \
72+ --tempLocation=gs://${BUCKET} /dataflow/temp \
73+ --region=${REGION} \
74+ --performOnlyRestoreStep=true \
75+ --jobName=" restore-job" \
76+ --network=${NETWORK} \
77+ --subnetwork=${SUBNETWORK}
78+
79+ echo " Restore completed. Proceeding to data import."
80+
81+ # Step 2: Launch parallel groups of 4
82+ echo " Step 2/2: Launching parallel groups of 4 shards..."
83+ SHARDS_PER_GROUP=4
84+
85+ for (( start= 0 ; start< $NUM_SHARDS ; start+= $SHARDS_PER_GROUP )) ; do
86+ end=$(( start + SHARDS_PER_GROUP - 1 ))
87+ [ $end -ge $NUM_SHARDS ] && end=$(( NUM_SHARDS - 1 ))
88+
89+ echo " Launching group: shards $start to $end in background"
90+ # Call ourselves with the range!
91+ $0 $start $end &
92+ done
93+
94+ echo " All groups launched. Waiting for all background jobs to finish..."
95+ wait
96+ echo " 🎉 All import jobs completed!"
97+ exit 0
98+ fi
99+ # ----------------------------------------
37100
38- # Loop from 1 to 19
39- for i in {1..19} ; do
101+ # Standard Range Mode
102+ for i in $( seq $START_SHARD $END_SHARD ) ; do
40103 echo " Submitting Dataflow job for shardIndex: $i "
41104
42- JOB=" job ${i} "
43- java -jar ${JAR_PATH} importsnapshot \
105+ # We skip restore for all shards if running via --all because Step 1 handled it.
106+ # If running manually via ranges, shard 0 will perform restore.
107+ SKIP_RESTORE=" true"
108+ if [ $i -eq 0 ]; then
109+ SKIP_RESTORE=" false"
110+ fi
111+
112+ JOB=" job-${i} "
113+ /usr/lib/jvm/java-21-openjdk-amd64/bin/java -Dnet.bytebuddy.experimental=true -jar ${JAR_PATH} importsnapshot \
44114 --runner=DataflowRunner \
45115 --project=${PROJECT_ID} \
46116 --bigtableInstanceId=${INSTANCE_ID} \
@@ -55,11 +125,12 @@ for i in {1..19}; do
55125 --serviceAccount=${SERVICE_ACCOUNT} \
56126 --usePublicIps=false \
57127 --enableSnappy=true \
58- --skipRestoreStep=true \
59- --numShards=20 \
128+ --skipRestoreStep=${SKIP_RESTORE} \
129+ --numShards=${NUM_SHARDS} \
60130 --shardIndex=$i \
61- --jobName=" ${JOB} " &
131+ --jobName=" ${JOB} " \
132+ --network=${NETWORK} \
133+ --subnetwork=${SUBNETWORK}
62134
63- # Optional: Sleep briefly between submissions to avoid API rate limits
64- sleep 5
135+ # Sequential within this script instance
65136done
0 commit comments