Skip to content

Approve Test Queue #39608

Approve Test Queue

Approve Test Queue #39608

# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: Approve Test Queue
on:
schedule:
- cron: '*/5 * * * *' # Runs every 5 minutes
workflow_dispatch: # Allows manual triggering
jobs:
approve-queue:
runs-on: ubuntu-latest
environment: main
steps:
- name: Checkout repository
uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: "3.12"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install requests
- name: Approve waiting deployments
env:
GITHUB_TOKEN: ${{ secrets.PAT }}
MAX_CONCURRENCY: ${{ vars.MAX_CONCURRENCY || 1 }}
run: |
python - <<EOF
import os
import requests
# GitHub API configuration
GITHUB_TOKEN = os.environ["GITHUB_TOKEN"]
REPO = os.environ["GITHUB_REPOSITORY"]
MAX_CONCURRENCY = int(os.environ["MAX_CONCURRENCY"])
API_BASE = f"https://api.github.com/repos/{REPO}"
# Headers for GitHub API
headers = {
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json",
"X-GitHub-Api-Version": "2022-11-28",
}
def make_request(endpoint, method="GET", data=None, allow_no_pending_deployments=False):
"""Make a request to the GitHub API with error handling."""
url = f"{API_BASE}/{endpoint}"
try:
if method == "GET":
response = requests.get(url, headers=headers)
else:
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
response_json = response.json()
if hasattr(response, "links") and "actions/runs?status" in endpoint:
response_json["next"] = response.links.get("next", {}).get("url")
return response_json
except requests.exceptions.HTTPError as e:
if (
allow_no_pending_deployments
and e.response is not None
and e.response.status_code == 422
):
try:
response_json = e.response.json()
except ValueError:
response_json = {}
if "No pending deployment requests to approve or reject" in str(response_json.get("errors", "")):
print(f"No pending deployment requests remain for {endpoint}; skipping")
return {"skipped": True, "reason": "no_pending_deployments"}
print(f"Error making request to {endpoint}: {str(e)}")
if e.response is not None:
print(f"Response: {e.response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"Error making request to {endpoint}: {str(e)}")
if e.response is not None:
print(f"Response: {e.response.text}")
return None
def get_workflow_runs(status):
"""Get all workflow runs for a given status."""
all_results = []
endpoint = f"actions/runs?status={status}"
while endpoint:
response = make_request(endpoint)
if not response:
break
all_results.extend(response.get("workflow_runs", []))
endpoint = None
next_url = response.get("next")
if next_url:
endpoint = f"actions/runs?{next_url.split('?')[1]}"
return all_results
def filter_cicd_runs(workflow_runs):
"""Keep only CICD workflow runs."""
return [run for run in workflow_runs if run.get("name") == "CICD NeMo"]
def print_workflow_run_details(label, workflow_runs):
"""Print the runs that are counted against concurrency."""
if not workflow_runs:
print(f"{label}: none")
return
print(f"{label}:")
for run in workflow_runs:
print(
" "
f"id={run.get('id')} "
f"status={run.get('status')} "
f"branch={run.get('head_branch')} "
f"title={run.get('display_title')}"
)
# Get current running and queued workflows
print("Fetching workflow runs...")
queued_workflow_runs = filter_cicd_runs(get_workflow_runs("queued"))
in_progress_workflow_runs = filter_cicd_runs(get_workflow_runs("in_progress"))
print_workflow_run_details("Queued CICD workflows counted against concurrency", queued_workflow_runs)
print_workflow_run_details("Running CICD workflows counted against concurrency", in_progress_workflow_runs)
# Count running and queued workflows
queued_workflows = len(queued_workflow_runs)
in_progress_workflows = len(in_progress_workflow_runs)
total_workflows = queued_workflows + in_progress_workflows
print(f"Current queued workflows: {queued_workflows}")
print(f"Current running workflows: {in_progress_workflows}")
print(f"Total workflows: {total_workflows}")
print(f"Max concurrency: {MAX_CONCURRENCY}")
if total_workflows >= MAX_CONCURRENCY:
print("Maximum concurrency reached, no new approvals will be made")
exit(0)
# Get waiting CI workflows for test environment
print("Fetching deployments...")
pending_workflows = filter_cicd_runs(get_workflow_runs("waiting"))
# Sort deployments by creation date (oldest first)
print("Sorting workflows...")
pending_workflows = sorted(pending_workflows, key=lambda x: x.get("created_at", ""))
# Process each deployment
print("Processing ...")
for workflow in pending_workflows:
if total_workflows >= MAX_CONCURRENCY:
print("Maximum concurrency reached, stopping approvals")
break
workflow_id = workflow.get("id")
workflow_name = workflow.get("display_title") or workflow.get("name") or "<unknown>"
if not workflow_id:
print(f"Skipping workflow without a run id: {workflow_name}")
continue
print(f"Approving workflow {workflow_name} with Run Id: {workflow_id}")
deployment_url = f"actions/runs/{workflow_id}/pending_deployments"
pending_deployments = make_request(deployment_url)
if not pending_deployments:
print(f"No pending deployments found for workflow {workflow_name}; skipping")
continue
environment_ids = []
environment_names = []
for deployment in pending_deployments:
environment = deployment.get("environment") or {}
environment_id = environment.get("id")
environment_name = environment.get("name") or "<unknown>"
if not environment_id:
print(f"Skipping deployment without an environment id for workflow {workflow_name}")
continue
environment_ids.append(environment_id)
environment_names.append(environment_name)
if not environment_ids:
print(f"No pending deployments with environment ids found for workflow {workflow_name}")
exit(1)
# Approve the deployment
status_data = {
"environment_ids": environment_ids,
"state": "approved",
"comment": "Automatically approved by queue manager"
}
result = make_request(
deployment_url,
method="POST",
data=status_data,
allow_no_pending_deployments=True,
)
if result is None:
print(f"Failed to approve environments {environment_names} for workflow {workflow_name}")
exit(1)
if isinstance(result, dict) and result.get("skipped"):
continue
total_workflows += 1
EOF
notify:
if: failure()
runs-on: ubuntu-latest
needs: [approve-queue]
steps:
- name: Notify
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
SLACK_WEBHOOK_ADMIN: <!subteam^${{ secrets.SLACK_WEBHOOK_ADMIN }}>
GITHUB_RUN_ID: ${{ github.run_id }}
GITHUB_REPOSITORY: ${{ github.repository }}
run: |
curl -X POST \
-H 'Content-type: application/json' \
--data "{\"text\":\":robot_joy: <https://github.com/${GITHUB_REPOSITORY}/actions/runs/${GITHUB_RUN_ID}|Test-queue-approval-bot workflow> failed. Please review manually.\n\ncc ${SLACK_WEBHOOK_ADMIN}\"}" \
$SLACK_WEBHOOK