-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Expand file tree
/
Copy pathrun_rc_validation_python_yaml.yml
More file actions
286 lines (258 loc) · 12.7 KB
/
run_rc_validation_python_yaml.yml
File metadata and controls
286 lines (258 loc) · 12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
name: Run Python YAML RC Validation
on:
workflow_dispatch:
inputs:
RELEASE_VER:
description: 'Beam Release Version (e.g., 2.69.0)'
required: true
default: '2.69.0'
RC_NUM:
description: 'Release Candidate number (e.g., 1)'
required: true
default: '1'
# APACHE_CONTENTS_REPO is not needed for Python-only YAML test
# CLEANUP_BQ_RESOURCES is not needed as we use GCS
# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.inputs.RELEASE_VER }}-${{ github.event.inputs.RC_NUM }}'
cancel-in-progress: true
# Setting explicit permissions for the action
permissions:
actions: write
pull-requests: write # Needed for setup-action potentially
checks: write
contents: read # Needs read to checkout the code
deployments: read
id-token: write # Required for GCP Workload Identity Federation
issues: write
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read
env: # Workflow level env vars
GCP_PROJECT_ID: 'apache-beam-testing'
jobs:
run_python_yaml_rc_validation:
name: Run Python YAML RC Validation (${{ github.event.inputs.RELEASE_VER }} RC${{ github.event.inputs.RC_NUM }})
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 300
env: # Job-level env vars
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
RUN_ID_SUFFIX: ${{ github.run_id }}_${{ github.run_attempt }}
GCE_REGION: 'us-central1'
RELEASE_VERSION: ${{ github.event.inputs.RELEASE_VER }}
RC_NUM: ${{ github.event.inputs.RC_NUM }}
# Define the base bucket and unique folder prefix directly here
GCS_UNIQUE_FOLDER_PREFIX: gs://rc-validation-migration-tests/yaml_rc_validation/${{ github.event.inputs.RELEASE_VER }}_RC${{ github.event.inputs.RC_NUM }}_${{ github.run_id }}_${{ github.run_attempt }}
# Temp, Staging, and Output locations will be constructed in the steps using the prefix above
RC_TAG: "v${{github.event.inputs.RELEASE_VER}}-RC${{github.event.inputs.RC_NUM}}"
PYTHON_VERSION: '3.12' # Or adjust if needed
BEAM_PYTHON_SDK_TAR_GZ: apache_beam-${{ github.event.inputs.RELEASE_VER }}.tar.gz
BEAM_SOURCE_ZIP: apache-beam-${{ github.event.inputs.RELEASE_VER }}-source-release.zip
APACHE_DIST_URL_BASE: https://dist.apache.org/repos/dist/dev/beam/${{ github.event.inputs.RELEASE_VER }}
YAML_PIPELINE_FILE: t1_2.yaml
SUBMISSION_TIMEOUT_SECONDS: 120 # Timeout for the python submission script itself
steps:
- name: Checkout code at RC tag
uses: actions/checkout@v4
with:
ref: ${{ env.RC_TAG }}
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
java-version: 11 # Keep Java setup for now, might be needed by gcloud/Dataflow
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install Dependencies
run: |
sudo apt-get update --yes
sudo apt-get install -y wget unzip coreutils procps grep sed
shell: bash
- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@aa5489c8933f4cc7a4f7d45035b3b1440c9c10db
- name: Download RC Artifacts
run: |
echo "Downloading from ${{ env.APACHE_DIST_URL_BASE }}"
wget ${{ env.APACHE_DIST_URL_BASE }}/python/${{ env.BEAM_PYTHON_SDK_TAR_GZ }}
wget ${{ env.APACHE_DIST_URL_BASE }}/python/${{ env.BEAM_PYTHON_SDK_TAR_GZ }}.sha512
# Source zip not strictly needed if installing from tar.gz, but keeping for consistency/potential future use
wget ${{ env.APACHE_DIST_URL_BASE }}/${{ env.BEAM_SOURCE_ZIP }}
wget ${{ env.APACHE_DIST_URL_BASE }}/${{ env.BEAM_SOURCE_ZIP }}.sha512
shell: bash
- name: Verify Hashes
run: |
echo "Verifying sha512 checksums..."
sha512sum -c ${{ env.BEAM_PYTHON_SDK_TAR_GZ }}.sha512
sha512sum -c ${{ env.BEAM_SOURCE_ZIP }}.sha512
shell: bash
- name: Setup Python Virtual Environment
run: |
echo "Setting up Python virtual environment..."
python -m venv beam_env
source beam_env/bin/activate
pip install --upgrade pip setuptools wheel
echo "Virtual environment ready."
shell: bash
- name: Install Python SDK with [gcp, yaml] extras
run: |
echo "Installing Python SDK: ${{ env.BEAM_PYTHON_SDK_TAR_GZ }} with [gcp,yaml] extras"
source beam_env/bin/activate
# Install from the downloaded tar.gz
pip install "${{ env.BEAM_PYTHON_SDK_TAR_GZ }}[gcp,yaml]"
echo "SDK installed."
pip freeze # Log installed packages
shell: bash
- name: Create YAML Pipeline File
run: |
echo "Creating YAML pipeline file: ${{ env.YAML_PIPELINE_FILE }}"
cat <<EOF > ${{ env.YAML_PIPELINE_FILE }}
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: projects/pubsub-public-data/topics/taxirides-realtime
format: json
schema:
type: object
properties:
ride_id: {type: string}
- type: WriteToJson
config:
# Construct the output path directly here
path: "${{ env.GCS_UNIQUE_FOLDER_PREFIX }}/output/out.json"
num_shards: 100
windowing:
type: fixed
size: 30s
options:
streaming: true
EOF
echo "YAML file created:"
cat ${{ env.YAML_PIPELINE_FILE }}
shell: bash
- name: Run YAML Pipeline (Dataflow Runner), Wait, Extract ID, Cleanup Submitter
id: submit_yaml_df
run: |
echo "Running YAML Pipeline with DataflowRunner in Background..."
source beam_env/bin/activate
python -m apache_beam.yaml.main \
--yaml_pipeline_file=${{ env.YAML_PIPELINE_FILE }} \
--runner DataflowRunner \
--region=${{ env.GCE_REGION }} \
--project=${{ env.GCP_PROJECT_ID }} \
--temp_location ${{ env.GCS_UNIQUE_FOLDER_PREFIX }}/temp \
--staging_location ${{ env.GCS_UNIQUE_FOLDER_PREFIX }}/staging \
> yaml_dataflow_submit.log 2>&1 &
YAML_DF_PID=$!
echo "YAML Pipeline (Dataflow Runner) submission process started in background with PID: ${YAML_DF_PID}"
echo ${YAML_DF_PID} > yaml_dataflow_submit.pid
echo "Waiting up to ${{ env.SUBMISSION_TIMEOUT_SECONDS }} seconds for Dataflow job submission process (PID: ${YAML_DF_PID}) to potentially complete..."
sleep ${{ env.SUBMISSION_TIMEOUT_SECONDS }}
echo "Proceeding with Job ID extraction..."
# Try extracting Job ID using common patterns from Dataflow submission logs
JOB_ID=$(grep -oP 'Dataflow Job ID: \K\S+' yaml_dataflow_submit.log || grep -oP "job_id='?\K[^' >]+" yaml_dataflow_submit.log || grep -oP "id: '?\"?\K[^'\" >]+" yaml_dataflow_submit.log | head -n 1)
if [[ -n "$JOB_ID" ]]; then
echo "Extracted YAML Dataflow Job ID: $JOB_ID"
echo "$JOB_ID" > yaml_dataflow_jobid.txt
else
echo "ERROR: Could not extract YAML Dataflow Job ID after ${{ env.SUBMISSION_TIMEOUT_SECONDS }}s wait. Log content:"
echo "--- YAML Dataflow submission log START ---"
cat yaml_dataflow_submit.log || echo "Log file not found."
echo "--- YAML Dataflow submission log END ---"
# Exit the step with failure if job ID is crucial and not found
exit 1
fi
# Check if the submission process is still running and kill it if necessary
if [ -f yaml_dataflow_submit.pid ] && ps -p $YAML_DF_PID > /dev/null; then
echo "Submission process (PID: $YAML_DF_PID) is still running after ${{ env.SUBMISSION_TIMEOUT_SECONDS }}s. Attempting to kill it."
kill -9 $YAML_DF_PID || echo "Failed to kill process $YAML_DF_PID."
else
echo "Submission process (PID: $YAML_DF_PID) has already finished or PID file is missing."
fi
# Clean up PID file regardless
if [ -f yaml_dataflow_submit.pid ]; then
rm yaml_dataflow_submit.pid
fi
echo "YAML Pipeline (Dataflow Runner) submission step finished processing."
shell: bash
- name: Wait for Job to Run
run: |
if [ ! -f yaml_dataflow_jobid.txt ]; then
echo "Skipping wait as Job ID was not extracted."
exit 0 # Allow cleanup to proceed
fi
JOB_ID=$(cat yaml_dataflow_jobid.txt)
echo "Waiting for 40 minutes for Dataflow job $JOB_ID to run..."
sleep 2400 # 20 minutes = 2400 seconds
echo "Wait finished."
shell: bash
- name: Cancel YAML Dataflow Job
if: always() # Run even if wait failed or previous steps failed, to attempt cleanup
run: |
if [ -f yaml_dataflow_jobid.txt ]; then
JOB_ID=$(cat yaml_dataflow_jobid.txt)
if [[ -n "$JOB_ID" ]]; then
echo "Attempting to cancel YAML Dataflow job: $JOB_ID in region ${{ env.GCE_REGION }}"
gcloud dataflow jobs cancel "$JOB_ID" --region=${{ env.GCE_REGION }} --project=${{ env.GCP_PROJECT_ID }} || echo "Failed to cancel YAML Dataflow job $JOB_ID (maybe it finished or was already cancelled)."
else
echo "YAML Dataflow Job ID file exists but is empty."
fi
# Keep jobid file for validation step, remove in final cleanup
else
echo "yaml_dataflow_jobid.txt not found, cannot cancel job (it might have failed before ID extraction)."
fi
shell: bash
- name: Validate GCS Output
run: |
if [ ! -f yaml_dataflow_jobid.txt ]; then
echo "Skipping GCS validation as Job ID was not extracted (job likely failed early)."
exit 0 # Allow cleanup to proceed
fi
# Construct the output path pattern directly here
OUTPUT_PATTERN="${{ env.GCS_UNIQUE_FOLDER_PREFIX }}/output/out.json-*-of-*"
echo "Validating GCS output files exist matching pattern: ${OUTPUT_PATTERN}"
# Wait a bit for cancellation to finalize and files to potentially appear fully
sleep 60
# Check if any files matching the pattern exist within the unique output folder.
echo "Checking for files matching pattern: ${OUTPUT_PATTERN}"
if gsutil ls "${OUTPUT_PATTERN}" > /dev/null 2>&1; then
echo "SUCCESS: Found output files matching pattern in GCS."
gsutil ls "${OUTPUT_PATTERN}" # List found files
else
echo "ERROR: No output files found matching pattern '${OUTPUT_PATTERN}' in GCS bucket."
exit 1
fi
shell: bash
# ================== Cleanup ==================
- name: Cleanup GCS Temp/Staging and Local Files
if: always()
run: |
echo "Deleting unique run folder in GCS: ${{ env.GCS_UNIQUE_FOLDER_PREFIX }}"
# Delete the entire unique folder for this run, including temp, staging, and output
gsutil -m rm -r "${{ env.GCS_UNIQUE_FOLDER_PREFIX }}" || echo "Failed to delete unique run folder ${{ env.GCS_UNIQUE_FOLDER_PREFIX }} in GCS. Manual cleanup might be required."
echo "Removing local log, yaml, and jobid files..."
rm -f yaml_dataflow_submit.log ${{ env.YAML_PIPELINE_FILE }} yaml_dataflow_jobid.txt
shell: bash