11#! /bin/bash
22
3- # This script runs the Dataflow snapshot import job .
3+ # This script runs a range of Dataflow snapshot import jobs sequentially .
44# It should be executed from the 'bigtable-dataflow-parent/bigtable-beam-import' directory.
5+ #
6+ # Usage: ./run-snapshot-import.sh <start_shard> <end_shard>
7+ # Or: ./run-snapshot-import.sh --all
8+ # Example: ./run-snapshot-import.sh 0 3
9+ # Example: ./run-snapshot-import.sh --all
10+ #
11+ # You can override default configurations by setting environment variables in your terminal.
12+ # Example: TABLE_NAME="my-table" SNAPSHOT_NAME="my-snap" ./run-snapshot-import.sh 0 3
13+ #
14+ # NOTE: If you are running on a newer JDK (like Java 21 or 26) and hit ByteBuddy errors,
15+ # you can add '-Dnet.bytebuddy.experimental=true' to the java command lines below.
16+ #
17+ # --- Manual Parallel Execution ---
18+ # To run shards in parallel groups of 4 (assuming 20 shards total), you can run 5 instances of this script.
19+ #
20+ # IMPORTANT: Shard 0 performs the restore step. You MUST run the first group (including shard 0)
21+ # first and let it complete the restore step before launching other groups in parallel,
22+ # otherwise they will fail because the restored files won't exist yet!
23+ #
24+ # Example for manual parallel execution:
25+ # ./run-snapshot-import.sh 0 3 & # Run this first!
26+ # # Wait for shard 0 to finish restore, then run the rest:
27+ # ./run-snapshot-import.sh 4 7 &
28+ # ./run-snapshot-import.sh 8 11 &
29+ # ./run-snapshot-import.sh 12 15 &
30+ # ./run-snapshot-import.sh 16 19 &
31+ #
32+ # --- Automated Parallel Execution ---
33+ # Alternatively, use the --all flag to automatically handle the restore step and launch all groups:
34+ # ./run-snapshot-import.sh --all
535
6- export PROJECT_ID=db-blackbelt-cndb
7- export INSTANCE_ID=bench-workload
8- export TABLE_NAME=validation_test
9- export SNAPSHOT_NAME=validation_test_20200929
10- export SERVICE_ACCOUNT=295490517436-compute@developer.gserviceaccount.com
36+ if [ " $# " -ne 2 ] && [ " $1 " != " --all " ] ; then
37+ echo " Usage: $0 <start_shard> <end_shard> "
38+ echo " Or: $0 --all "
39+ exit 1
40+ fi
1141
12- export BUCKET=jh-data-sandbox-backups
13- export REGION=us-west1
42+ START_SHARD=$1
43+ END_SHARD=$2
44+
45+ # Configurations (Uses environment variables if set, otherwise defaults)
46+ export PROJECT_ID=" ${PROJECT_ID:- google.com: cloud-bigtable-dev} "
47+ export INSTANCE_ID=" ${INSTANCE_ID:- tianlei-test-inst} "
48+ export BUCKET=" ${BUCKET:- tianlei-beam-test-bucket} "
49+ export REGION=" ${REGION:- us-central1} "
50+
51+ export TABLE_NAME=" ${TABLE_NAME:- validation_test} "
52+ export SNAPSHOT_NAME=" ${SNAPSHOT_NAME:- validation_test_20200929} "
53+ export SERVICE_ACCOUNT=" ${SERVICE_ACCOUNT:- 295490517436-compute@ developer.gserviceaccount.com} "
54+
55+ export NUM_SHARDS=" ${NUM_SHARDS:- 20} "
56+
57+ export NETWORK=" ${NETWORK:- tianlei-network} "
58+ export SUBNETWORK=" ${SUBNETWORK:- regions/ us-central1/ subnetworks/ tianlei-network} "
1459
15- # Using version 2.17.0 as per the current project version
1660JAR_PATH=" target/bigtable-beam-import-2.17.0-shaded.jar"
1761
18- echo " Submitting Dataflow job for shardIndex: 0. The initial job will restore the snapshot to the first iteration and will skip this step for subsequent jobs"
19- java -jar ${JAR_PATH} importsnapshot \
20- --runner=DataflowRunner \
21- --project=${PROJECT_ID} \
22- --bigtableInstanceId=${INSTANCE_ID} \
23- --bigtableTableId=${TABLE_NAME} \
24- --importConfigFilePath=import-config-test.json \
25- --stagingLocation=gs://${BUCKET} /dataflow/staging \
26- --tempLocation=gs://${BUCKET} /dataflow/temp \
27- --workerMachineType=n1-highmem-4 \
28- --diskSizeGb=500 \
29- --maxNumWorkers=10 \
30- --region=${REGION} \
31- --serviceAccount=${SERVICE_ACCOUNT} \
32- --usePublicIps=false \
33- --enableSnappy=true \
34- --skipRestoreStep=false \
35- --numShards=20 \
36- --shardIndex=0
62+ # --- AUTO-PARALLEL MODE ---
63+ if [ " $1 " == " --all" ]; then
64+ echo " 🚀 Starting fully automated snapshot import..."
65+
66+ # Step 1: Perform ONLY the restore step
67+ echo " Step 1/2: Performing snapshot restore (blocking)..."
68+ java -jar ${JAR_PATH} importsnapshot \
69+ --runner=DataflowRunner \
70+ --project=${PROJECT_ID} \
71+ --bigtableInstanceId=${INSTANCE_ID} \
72+ --bigtableTableId=${TABLE_NAME} \
73+ --importConfigFilePath=import-config-test.json \
74+ --stagingLocation=gs://${BUCKET} /dataflow/staging \
75+ --tempLocation=gs://${BUCKET} /dataflow/temp \
76+ --region=${REGION} \
77+ --performOnlyRestoreStep=true \
78+ --jobName=" restore-job" \
79+ --network=${NETWORK} \
80+ --subnetwork=${SUBNETWORK}
81+
82+ echo " Restore completed. Proceeding to data import."
83+
84+ # Step 2: Launch parallel groups of 4
85+ echo " Step 2/2: Launching parallel groups of 4 shards..."
86+ SHARDS_PER_GROUP=4
87+
88+ for (( start= 0 ; start< $NUM_SHARDS ; start+= $SHARDS_PER_GROUP )) ; do
89+ end=$(( start + SHARDS_PER_GROUP - 1 ))
90+ [ $end -ge $NUM_SHARDS ] && end=$(( NUM_SHARDS - 1 ))
91+
92+ echo " Launching group: shards $start to $end in background"
93+ # Call ourselves with the range!
94+ $0 $start $end &
95+ done
96+
97+ echo " All groups launched. Waiting for all background jobs to finish..."
98+ wait
99+ echo " 🎉 All import jobs completed!"
100+ exit 0
101+ fi
102+ # ----------------------------------------
37103
38- # Loop from 1 to 19
39- for i in {1..19} ; do
104+ # Standard Range Mode
105+ for i in $( seq $START_SHARD $END_SHARD ) ; do
40106 echo " Submitting Dataflow job for shardIndex: $i "
41107
42- JOB=" job ${i} "
108+ # We skip restore for all shards if running via --all because Step 1 handled it.
109+ # If running manually via ranges, shard 0 will perform restore.
110+ SKIP_RESTORE=" true"
111+ if [ $i -eq 0 ]; then
112+ SKIP_RESTORE=" false"
113+ fi
114+
115+ JOB=" job-${i} "
43116 java -jar ${JAR_PATH} importsnapshot \
44117 --runner=DataflowRunner \
45118 --project=${PROJECT_ID} \
@@ -55,11 +128,12 @@ for i in {1..19}; do
55128 --serviceAccount=${SERVICE_ACCOUNT} \
56129 --usePublicIps=false \
57130 --enableSnappy=true \
58- --skipRestoreStep=true \
59- --numShards=20 \
131+ --skipRestoreStep=${SKIP_RESTORE} \
132+ --numShards=${NUM_SHARDS} \
60133 --shardIndex=$i \
61- --jobName=" ${JOB} " &
134+ --jobName=" ${JOB} " \
135+ --network=${NETWORK} \
136+ --subnetwork=${SUBNETWORK}
62137
63- # Optional: Sleep briefly between submissions to avoid API rate limits
64- sleep 5
138+ # Sequential within this script instance
65139done
0 commit comments