Skip to content
Draft
6 changes: 3 additions & 3 deletions modules/python/clients/aks_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def create_node_pool(
"os_disk_type": "Managed",
"nodeLabels": {"gpu": "true"} if gpu_node_pool else {},
"gpu_profile": {
"driver": "None" if gpu_node_pool and vm_size == "Standard_ND96asr_v4" else "Install",
"driver": "None" if not gpu_node_pool or vm_size == "Standard_ND96asr_v4" else "Install",
},
}

Expand Down Expand Up @@ -469,7 +469,7 @@ def scale_node_pool(
resource_name=cluster_name,
agent_pool_name=node_pool_name,
parameters=node_pool,
)
).result()

logger.info(
f"Waiting for {node_count} nodes in pool {node_pool_name} to be ready..."
Expand Down Expand Up @@ -681,7 +681,7 @@ def _progressive_scale(
resource_name=cluster_name,
agent_pool_name=node_pool_name,
parameters=node_pool,
)
).result()

# Use agentpool=node_pool_name as default label if not specified
label_selector = f"agentpool={node_pool_name}"
Expand Down
58 changes: 58 additions & 0 deletions modules/python/clients/kubernetes_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,8 @@ def wait_for_condition(self, resource_type: str, wait_condition_type: str, names
valid_conditions = {
'deployment': ['available', 'progressing', 'replicafailure', 'ready'],
'deployments': ['available', 'progressing', 'replicafailure', 'ready'],
'job': ['complete', 'failed'],
'jobs': ['complete', 'failed'],
# Add more resource types as needed
}

Expand Down Expand Up @@ -1049,6 +1051,9 @@ def _check_resource_condition(self, resource_type: str, resource_name: str, cond
if resource_type_lower in ['deployment', 'deployments']:
return self._check_deployment_condition(resource_name, condition_type, namespace, wait_all)

if resource_type_lower in ['job', 'jobs']:
return self._check_job_condition(resource_name, condition_type, namespace, wait_all)

logger.warning(f"Unsupported resource type for condition checking: {resource_type}")
return False

Expand Down Expand Up @@ -1097,6 +1102,47 @@ def _is_deployment_condition_met(self, deployment, condition_type: str) -> bool:

return False

def _check_job_condition(self, resource_name: str, condition_type: str, namespace: str, wait_all: bool) -> bool:
"""Check job condition ('complete' or 'failed')."""
try:
if wait_all or not resource_name:
jobs = self.batch.list_namespaced_job(namespace=namespace).items
else:
job = self.batch.read_namespaced_job(name=resource_name, namespace=namespace)
jobs = [job]

for job in jobs:
if not self._is_job_condition_met(job, condition_type):
return False

return True

except client.rest.ApiException as e:
if e.status == 404:
logger.debug("Job not found, waiting...")
return False
raise e

def _is_job_condition_met(self, job, condition_type: str) -> bool:
"""Check if a job meets the specified condition."""
if not job.status:
return False

condition_type_lower = condition_type.lower()

if condition_type_lower == 'complete':
return bool(job.status.completion_time and job.status.succeeded and job.status.succeeded > 0)

if condition_type_lower == 'failed':
return bool(job.status.failed and job.status.failed > 0 and not job.status.active)

if job.status.conditions:
for condition in job.status.conditions:
if condition.type.lower() == condition_type_lower and condition.status == "True":
return True

return False

def _expand_and_validate_manifests(self, manifests):
"""
Validate and expand manifests, handling List kind and non-dict manifests.
Expand Down Expand Up @@ -1159,6 +1205,8 @@ def _apply_single_manifest(self, manifest, namespace=None):
self.app.create_namespaced_daemon_set(namespace=namespace, body=manifest)
elif kind == "StatefulSet":
self.app.create_namespaced_stateful_set(namespace=namespace, body=manifest)
elif kind == "Job":
self.batch.create_namespaced_job(namespace=namespace, body=manifest)
elif kind == "Service":
self.api.create_namespaced_service(namespace=namespace, body=manifest)
elif kind == "ConfigMap":
Expand Down Expand Up @@ -1313,6 +1361,11 @@ def _update_single_manifest(self, manifest, namespace=None):
self.app.patch_namespaced_stateful_set(name=name, namespace=namespace, body=manifest)
else:
raise ValueError("StatefulSet requires a namespace")
elif kind == "Job":
if namespace:
self.batch.patch_namespaced_job(name=name, namespace=namespace, body=manifest)
else:
raise ValueError("Job requires a namespace")
elif kind == "Service":
if namespace:
self.api.patch_namespaced_service(name=name, namespace=namespace, body=manifest)
Expand Down Expand Up @@ -1494,6 +1547,11 @@ def _delete_single_manifest(self, manifest, ignore_not_found: bool = True, names
self.app.delete_namespaced_stateful_set(name=resource_name, namespace=namespace, body=delete_options)
else:
raise ValueError("StatefulSet requires a namespace")
elif kind == "Job":
if namespace:
self.batch.delete_namespaced_job(name=resource_name, namespace=namespace, body=delete_options)
else:
raise ValueError("Job requires a namespace")
elif kind == "Service":
if namespace:
self.api.delete_namespaced_service(name=resource_name, namespace=namespace, body=delete_options)
Expand Down
227 changes: 227 additions & 0 deletions modules/python/crud/azure/node_pool_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import logging
import time
import yaml

from clients.aks_client import AKSClient
from utils.logger_config import get_logger, setup_logging
Expand Down Expand Up @@ -270,3 +271,229 @@ def all(
logger.error(error_msg)
errors.append(error_msg)
return False

def create_deployment(
self,
node_pool_name,
replicas=10,
manifest_dir=None,
number_of_deployments=1,
label_selector="app=nginx-container",
namespace="default"
):
"""
Create Kubernetes deployments after node pool operations.

Args:
node_pool_name: Name of the node pool to target
deployment_name: Base name for the deployments
namespace: Kubernetes namespace (default: "default")
replicas: Number of deployment replicas per deployment (default: 10)
manifest_dir: Directory containing Kubernetes manifest files
number_of_deployments: Number of deployments to create (default: 1)

Returns:
True if all deployment creations were successful, False otherwise
"""
logger.info("Creating %d deployment(s)", number_of_deployments)
logger.info("Target node pool: %s", node_pool_name)
logger.info("Replicas per deployment: %d", replicas)
logger.info("Using manifest directory: %s", manifest_dir)

try:
# Get Kubernetes client from AKS client
k8s_client = self.aks_client.k8s_client

if not k8s_client:
logger.error("Kubernetes client not available")
return False

successful_deployments = 0

# Loop through number of deployments
for deployment_index in range(1, number_of_deployments + 1):
logger.info("Creating deployment %d/%d", deployment_index, number_of_deployments)

try:
if manifest_dir:
# Use the template path from manifest_dir
template_path = f"{manifest_dir}/deployment.yml"
else:
# Use default template path
template_path = "modules/python/crud/workload_templates/deployment.yml"

# Generate deployment name
deployment_name = f"myapp-{node_pool_name}-{deployment_index}"

# Create deployment template using k8s_client.create_template
deployment_template = k8s_client.create_template(
template_path,
{
"DEPLOYMENT_REPLICAS": replicas,
"NODE_POOL_NAME": node_pool_name,
"INDEX": deployment_index,
"LABEL_VALUE": label_selector.split("=", 1)[-1],
}
)

# Apply each document in the rendered multi-doc template
for doc in yaml.safe_load_all(deployment_template):
if doc:
k8s_client.apply_manifest_from_file(manifest_dict=doc)

logger.info("Applied manifest for deployment %s", deployment_name)

# Wait for deployment to be available (successful deployment verification)
logger.info("Waiting for deployment %s to become available...", deployment_name)
deployment_ready = k8s_client.wait_for_condition(
resource_type="deployment",
wait_condition_type="available",
resource_name=deployment_name,
namespace=namespace,
timeout_seconds=self.step_timeout
)

if deployment_ready:
logger.info("Deployment %s is successfully available", deployment_name)

# Additionally wait for pods to be ready
logger.info("Waiting for pods of deployment %s to be ready...", deployment_name)
k8s_client.wait_for_pods_ready(
operation_timeout_in_minutes=5,
namespace=namespace,
pod_count=replicas,
label_selector=label_selector
)

logger.info("Successfully created and verified deployment %d", deployment_index)
successful_deployments += 1
else:
logger.error("Deployment %s failed to become available within timeout", deployment_name)
continue

except Exception as e:
logger.error("Failed to create deployment %d: %s", deployment_index, e)
# Continue with next deployment instead of failing completely
continue

# Check if all deployments were successful
if successful_deployments == number_of_deployments:
logger.info("Successfully created all %d deployment(s)", number_of_deployments)
return True
if successful_deployments > 0:
logger.warning("Created %d/%d deployment(s)", successful_deployments, number_of_deployments)
return False
logger.error("Failed to create any deployments")
return False

except Exception as e:
logger.error("Failed to create deployments: %s", e)
return False

def create_job(
self,
node_pool_name,
completions=1,
manifest_dir=None,
number_of_jobs=1,
label_selector="app=nginx-container",
namespace="default"
):
"""
Create Kubernetes jobs after node pool operations.

Args:
node_pool_name: Name of the node pool to target
job_name: Base name for the jobs
namespace: Kubernetes namespace (default: "default")
completions: Number of job completions (default: 1)
manifest_dir: Directory containing Kubernetes manifest files
number_of_jobs: Number of jobs to create (default: 1)

Returns:
True if all job creations were successful, False otherwise
"""
logger.info("Creating %d job(s)", number_of_jobs)
logger.info("Target node pool: %s", node_pool_name)
logger.info("Job completions: %d", completions)
logger.info("Using manifest directory: %s", manifest_dir)

try:
# Get Kubernetes client from AKS client
k8s_client = self.aks_client.k8s_client

if not k8s_client:
logger.error("Kubernetes client not available")
return False

successful_jobs = 0

# Loop through number of jobs
for job_index in range(1, number_of_jobs + 1):
logger.info("Creating job %d/%d", job_index, number_of_jobs)

try:
if manifest_dir:
# Use the template path from manifest_dir
template_path = f"{manifest_dir}/job.yml"
else:
# Use default template path
template_path = "modules/python/crud/workload_templates/job.yml"

# Generate job name
job_name = f"myapp-{node_pool_name}-{job_index}"

# Create job template using k8s_client.create_template
job_template = k8s_client.create_template(
template_path,
{
"JOB_COMPLETIONS": completions,
"NODE_POOL_NAME": node_pool_name,
"INDEX": job_index,
"LABEL_VALUE": label_selector.split("=", 1)[-1],
}
)

# Apply each document in the rendered multi-doc template
for doc in yaml.safe_load_all(job_template):
if doc:
k8s_client.apply_manifest_from_file(manifest_dict=doc)

logger.info("Applied manifest for job %s", job_name)

# Wait for job to complete (successful job verification)
logger.info("Waiting for job %s to complete...", job_name)
job_ready = k8s_client.wait_for_condition(
resource_type="job",
wait_condition_type="complete",
resource_name=job_name,
namespace=namespace,
timeout_seconds=self.step_timeout
)

if job_ready:
logger.info("Job %s is successfully complete", job_name)
logger.info("Successfully created and verified job %d", job_index)
successful_jobs += 1
else:
logger.error("Job %s failed to complete within timeout", job_name)
continue

except Exception as e:
logger.error("Failed to create job %d: %s", job_index, e)
# Continue with next job instead of failing completely
continue

# Check if all jobs were successful
if successful_jobs == number_of_jobs:
logger.info("Successfully created all %d job(s)", number_of_jobs)
return True
if successful_jobs > 0:
logger.warning("Created %d/%d job(s)", successful_jobs, number_of_jobs)
return False
logger.error("Failed to create any jobs")
return False

except Exception as e:
logger.error("Failed to create jobs: %s", e)
return False
Loading
Loading