Skip to content

Commit 3086725

Browse files
committed
feat(import): enhance helper script with range support and auto-parallel mode
1 parent c85ce52 commit 3086725

1 file changed

Lines changed: 107 additions & 36 deletions

File tree

Lines changed: 107 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,115 @@
11
#!/bin/bash
22

3-
# This script runs the Dataflow snapshot import job.
3+
# This script runs a range of Dataflow snapshot import jobs sequentially.
44
# It should be executed from the 'bigtable-dataflow-parent/bigtable-beam-import' directory.
5+
#
6+
# Usage: ./run-snapshot-import.sh <start_shard> <end_shard>
7+
# Or: ./run-snapshot-import.sh --all
8+
# Example: ./run-snapshot-import.sh 0 3
9+
# Example: ./run-snapshot-import.sh --all
10+
#
11+
# You can override default configurations by setting environment variables in your terminal.
12+
# Example: TABLE_NAME="my-table" SNAPSHOT_NAME="my-snap" ./run-snapshot-import.sh 0 3
13+
#
14+
# --- Manual Parallel Execution ---
15+
# To run shards in parallel groups of 4 (assuming 20 shards total), you can run 5 instances of this script.
16+
#
17+
# IMPORTANT: Shard 0 performs the restore step. You MUST run the first group (including shard 0)
18+
# first and let it complete the restore step before launching other groups in parallel,
19+
# otherwise they will fail because the restored files won't exist yet!
20+
#
21+
# Example for manual parallel execution:
22+
# ./run-snapshot-import.sh 0 3 & # Run this first!
23+
# # Wait for shard 0 to finish restore, then run the rest:
24+
# ./run-snapshot-import.sh 4 7 &
25+
# ./run-snapshot-import.sh 8 11 &
26+
# ./run-snapshot-import.sh 12 15 &
27+
# ./run-snapshot-import.sh 16 19 &
28+
#
29+
# --- Automated Parallel Execution ---
30+
# Alternatively, use the --all flag to automatically handle the restore step and launch all groups:
31+
# ./run-snapshot-import.sh --all
532

6-
export PROJECT_ID=db-blackbelt-cndb
7-
export INSTANCE_ID=bench-workload
8-
export TABLE_NAME=validation_test
9-
export SNAPSHOT_NAME=validation_test_20200929
10-
export SERVICE_ACCOUNT=295490517436-compute@developer.gserviceaccount.com
33+
if [ "$#" -ne 2 ] && [ "$1" != "--all" ]; then
34+
echo "Usage: $0 <start_shard> <end_shard>"
35+
echo " Or: $0 --all"
36+
exit 1
37+
fi
1138

12-
export BUCKET=jh-data-sandbox-backups
13-
export REGION=us-west1
39+
START_SHARD=$1
40+
END_SHARD=$2
41+
42+
# Configurations (Uses environment variables if set, otherwise defaults)
43+
export PROJECT_ID="${PROJECT_ID:-google.com:cloud-bigtable-dev}"
44+
export INSTANCE_ID="${INSTANCE_ID:-tianlei-test-inst}"
45+
export BUCKET="${BUCKET:-tianlei-beam-test-bucket}"
46+
export REGION="${REGION:-us-central1}"
47+
48+
export TABLE_NAME="${TABLE_NAME:-validation_test}"
49+
export SNAPSHOT_NAME="${SNAPSHOT_NAME:-validation_test_20200929}"
50+
export SERVICE_ACCOUNT="${SERVICE_ACCOUNT:-295490517436-compute@developer.gserviceaccount.com}"
51+
52+
export NUM_SHARDS="${NUM_SHARDS:-20}"
53+
54+
export NETWORK="${NETWORK:-tianlei-network}"
55+
export SUBNETWORK="${SUBNETWORK:-regions/us-central1/subnetworks/tianlei-network}"
1456

15-
# Using version 2.17.0 as per the current project version
1657
JAR_PATH="target/bigtable-beam-import-2.17.0-shaded.jar"
1758

18-
echo "Submitting Dataflow job for shardIndex: 0. The initial job will restore the snapshot to the first iteration and will skip this step for subsequent jobs"
19-
java -jar ${JAR_PATH} importsnapshot \
20-
--runner=DataflowRunner \
21-
--project=${PROJECT_ID} \
22-
--bigtableInstanceId=${INSTANCE_ID} \
23-
--bigtableTableId=${TABLE_NAME} \
24-
--importConfigFilePath=import-config-test.json \
25-
--stagingLocation=gs://${BUCKET}/dataflow/staging \
26-
--tempLocation=gs://${BUCKET}/dataflow/temp \
27-
--workerMachineType=n1-highmem-4 \
28-
--diskSizeGb=500 \
29-
--maxNumWorkers=10 \
30-
--region=${REGION} \
31-
--serviceAccount=${SERVICE_ACCOUNT} \
32-
--usePublicIps=false \
33-
--enableSnappy=true \
34-
--skipRestoreStep=false \
35-
--numShards=20 \
36-
--shardIndex=0
59+
# --- AUTO-PARALLEL MODE ---
60+
if [ "$1" == "--all" ]; then
61+
echo "🚀 Starting fully automated snapshot import..."
62+
63+
# Step 1: Perform ONLY the restore step
64+
echo "Step 1/2: Performing snapshot restore (blocking)..."
65+
java -jar ${JAR_PATH} importsnapshot \
66+
--runner=DataflowRunner \
67+
--project=${PROJECT_ID} \
68+
--bigtableInstanceId=${INSTANCE_ID} \
69+
--bigtableTableId=${TABLE_NAME} \
70+
--importConfigFilePath=import-config-test.json \
71+
--stagingLocation=gs://${BUCKET}/dataflow/staging \
72+
--tempLocation=gs://${BUCKET}/dataflow/temp \
73+
--region=${REGION} \
74+
--performOnlyRestoreStep=true \
75+
--jobName="restore-job" \
76+
--network=${NETWORK} \
77+
--subnetwork=${SUBNETWORK}
78+
79+
echo "Restore completed. Proceeding to data import."
80+
81+
# Step 2: Launch parallel groups of 4
82+
echo "Step 2/2: Launching parallel groups of 4 shards..."
83+
SHARDS_PER_GROUP=4
84+
85+
for (( start=0; start<$NUM_SHARDS; start+=$SHARDS_PER_GROUP )); do
86+
end=$((start + SHARDS_PER_GROUP - 1))
87+
[ $end -ge $NUM_SHARDS ] && end=$((NUM_SHARDS - 1))
88+
89+
echo "Launching group: shards $start to $end in background"
90+
# Call ourselves with the range!
91+
$0 $start $end &
92+
done
93+
94+
echo "All groups launched. Waiting for all background jobs to finish..."
95+
wait
96+
echo "🎉 All import jobs completed!"
97+
exit 0
98+
fi
99+
# ----------------------------------------
37100

38-
# Loop from 1 to 19
39-
for i in {1..19}; do
101+
# Standard Range Mode
102+
for i in $(seq $START_SHARD $END_SHARD); do
40103
echo "Submitting Dataflow job for shardIndex: $i"
41104

42-
JOB="job ${i}"
105+
# We skip restore for all shards if running via --all because Step 1 handled it.
106+
# If running manually via ranges, shard 0 will perform restore.
107+
SKIP_RESTORE="true"
108+
if [ $i -eq 0 ]; then
109+
SKIP_RESTORE="false"
110+
fi
111+
112+
JOB="job-${i}"
43113
java -jar ${JAR_PATH} importsnapshot \
44114
--runner=DataflowRunner \
45115
--project=${PROJECT_ID} \
@@ -55,11 +125,12 @@ for i in {1..19}; do
55125
--serviceAccount=${SERVICE_ACCOUNT} \
56126
--usePublicIps=false \
57127
--enableSnappy=true \
58-
--skipRestoreStep=true \
59-
--numShards=20 \
128+
--skipRestoreStep=${SKIP_RESTORE} \
129+
--numShards=${NUM_SHARDS} \
60130
--shardIndex=$i \
61-
--jobName="${JOB}" &
131+
--jobName="${JOB}" \
132+
--network=${NETWORK} \
133+
--subnetwork=${SUBNETWORK}
62134

63-
# Optional: Sleep briefly between submissions to avoid API rate limits
64-
sleep 5
135+
# Sequential within this script instance
65136
done

0 commit comments

Comments
 (0)