diff --git a/modules/python/clients/aks_client.py b/modules/python/clients/aks_client.py index 3eb5c4179e..4d2dc520b8 100644 --- a/modules/python/clients/aks_client.py +++ b/modules/python/clients/aks_client.py @@ -314,7 +314,7 @@ def create_node_pool( "os_disk_type": "Managed", "nodeLabels": {"gpu": "true"} if gpu_node_pool else {}, "gpu_profile": { - "driver": "None" if gpu_node_pool and vm_size == "Standard_ND96asr_v4" else "Install", + "driver": "None" if not gpu_node_pool or vm_size == "Standard_ND96asr_v4" else "Install", }, } @@ -469,7 +469,7 @@ def scale_node_pool( resource_name=cluster_name, agent_pool_name=node_pool_name, parameters=node_pool, - ) + ).result() logger.info( f"Waiting for {node_count} nodes in pool {node_pool_name} to be ready..." @@ -681,7 +681,7 @@ def _progressive_scale( resource_name=cluster_name, agent_pool_name=node_pool_name, parameters=node_pool, - ) + ).result() # Use agentpool=node_pool_name as default label if not specified label_selector = f"agentpool={node_pool_name}" diff --git a/modules/python/clients/kubernetes_client.py b/modules/python/clients/kubernetes_client.py index 60838d2ce4..d85202b50f 100644 --- a/modules/python/clients/kubernetes_client.py +++ b/modules/python/clients/kubernetes_client.py @@ -975,6 +975,8 @@ def wait_for_condition(self, resource_type: str, wait_condition_type: str, names valid_conditions = { 'deployment': ['available', 'progressing', 'replicafailure', 'ready'], 'deployments': ['available', 'progressing', 'replicafailure', 'ready'], + 'job': ['complete', 'failed'], + 'jobs': ['complete', 'failed'], # Add more resource types as needed } @@ -1049,6 +1051,9 @@ def _check_resource_condition(self, resource_type: str, resource_name: str, cond if resource_type_lower in ['deployment', 'deployments']: return self._check_deployment_condition(resource_name, condition_type, namespace, wait_all) + if resource_type_lower in ['job', 'jobs']: + return self._check_job_condition(resource_name, condition_type, namespace, wait_all) + logger.warning(f"Unsupported resource type for condition checking: {resource_type}") return False @@ -1097,6 +1102,47 @@ def _is_deployment_condition_met(self, deployment, condition_type: str) -> bool: return False + def _check_job_condition(self, resource_name: str, condition_type: str, namespace: str, wait_all: bool) -> bool: + """Check job condition ('complete' or 'failed').""" + try: + if wait_all or not resource_name: + jobs = self.batch.list_namespaced_job(namespace=namespace).items + else: + job = self.batch.read_namespaced_job(name=resource_name, namespace=namespace) + jobs = [job] + + for job in jobs: + if not self._is_job_condition_met(job, condition_type): + return False + + return True + + except client.rest.ApiException as e: + if e.status == 404: + logger.debug("Job not found, waiting...") + return False + raise e + + def _is_job_condition_met(self, job, condition_type: str) -> bool: + """Check if a job meets the specified condition.""" + if not job.status: + return False + + condition_type_lower = condition_type.lower() + + if condition_type_lower == 'complete': + return bool(job.status.completion_time and job.status.succeeded and job.status.succeeded > 0) + + if condition_type_lower == 'failed': + return bool(job.status.failed and job.status.failed > 0 and not job.status.active) + + if job.status.conditions: + for condition in job.status.conditions: + if condition.type.lower() == condition_type_lower and condition.status == "True": + return True + + return False + def _expand_and_validate_manifests(self, manifests): """ Validate and expand manifests, handling List kind and non-dict manifests. @@ -1159,6 +1205,8 @@ def _apply_single_manifest(self, manifest, namespace=None): self.app.create_namespaced_daemon_set(namespace=namespace, body=manifest) elif kind == "StatefulSet": self.app.create_namespaced_stateful_set(namespace=namespace, body=manifest) + elif kind == "Job": + self.batch.create_namespaced_job(namespace=namespace, body=manifest) elif kind == "Service": self.api.create_namespaced_service(namespace=namespace, body=manifest) elif kind == "ConfigMap": @@ -1313,6 +1361,11 @@ def _update_single_manifest(self, manifest, namespace=None): self.app.patch_namespaced_stateful_set(name=name, namespace=namespace, body=manifest) else: raise ValueError("StatefulSet requires a namespace") + elif kind == "Job": + if namespace: + self.batch.patch_namespaced_job(name=name, namespace=namespace, body=manifest) + else: + raise ValueError("Job requires a namespace") elif kind == "Service": if namespace: self.api.patch_namespaced_service(name=name, namespace=namespace, body=manifest) @@ -1494,6 +1547,11 @@ def _delete_single_manifest(self, manifest, ignore_not_found: bool = True, names self.app.delete_namespaced_stateful_set(name=resource_name, namespace=namespace, body=delete_options) else: raise ValueError("StatefulSet requires a namespace") + elif kind == "Job": + if namespace: + self.batch.delete_namespaced_job(name=resource_name, namespace=namespace, body=delete_options) + else: + raise ValueError("Job requires a namespace") elif kind == "Service": if namespace: self.api.delete_namespaced_service(name=resource_name, namespace=namespace, body=delete_options) diff --git a/modules/python/crud/azure/node_pool_crud.py b/modules/python/crud/azure/node_pool_crud.py index ed88bbce93..1fd10df07a 100644 --- a/modules/python/crud/azure/node_pool_crud.py +++ b/modules/python/crud/azure/node_pool_crud.py @@ -8,6 +8,7 @@ import logging import time +import yaml from clients.aks_client import AKSClient from utils.logger_config import get_logger, setup_logging @@ -270,3 +271,229 @@ def all( logger.error(error_msg) errors.append(error_msg) return False + + def create_deployment( + self, + node_pool_name, + replicas=10, + manifest_dir=None, + number_of_deployments=1, + label_selector="app=nginx-container", + namespace="default" + ): + """ + Create Kubernetes deployments after node pool operations. + + Args: + node_pool_name: Name of the node pool to target + deployment_name: Base name for the deployments + namespace: Kubernetes namespace (default: "default") + replicas: Number of deployment replicas per deployment (default: 10) + manifest_dir: Directory containing Kubernetes manifest files + number_of_deployments: Number of deployments to create (default: 1) + + Returns: + True if all deployment creations were successful, False otherwise + """ + logger.info("Creating %d deployment(s)", number_of_deployments) + logger.info("Target node pool: %s", node_pool_name) + logger.info("Replicas per deployment: %d", replicas) + logger.info("Using manifest directory: %s", manifest_dir) + + try: + # Get Kubernetes client from AKS client + k8s_client = self.aks_client.k8s_client + + if not k8s_client: + logger.error("Kubernetes client not available") + return False + + successful_deployments = 0 + + # Loop through number of deployments + for deployment_index in range(1, number_of_deployments + 1): + logger.info("Creating deployment %d/%d", deployment_index, number_of_deployments) + + try: + if manifest_dir: + # Use the template path from manifest_dir + template_path = f"{manifest_dir}/deployment.yml" + else: + # Use default template path + template_path = "modules/python/crud/workload_templates/deployment.yml" + + # Generate deployment name + deployment_name = f"myapp-{node_pool_name}-{deployment_index}" + + # Create deployment template using k8s_client.create_template + deployment_template = k8s_client.create_template( + template_path, + { + "DEPLOYMENT_REPLICAS": replicas, + "NODE_POOL_NAME": node_pool_name, + "INDEX": deployment_index, + "LABEL_VALUE": label_selector.split("=", 1)[-1], + } + ) + + # Apply each document in the rendered multi-doc template + for doc in yaml.safe_load_all(deployment_template): + if doc: + k8s_client.apply_manifest_from_file(manifest_dict=doc) + + logger.info("Applied manifest for deployment %s", deployment_name) + + # Wait for deployment to be available (successful deployment verification) + logger.info("Waiting for deployment %s to become available...", deployment_name) + deployment_ready = k8s_client.wait_for_condition( + resource_type="deployment", + wait_condition_type="available", + resource_name=deployment_name, + namespace=namespace, + timeout_seconds=self.step_timeout + ) + + if deployment_ready: + logger.info("Deployment %s is successfully available", deployment_name) + + # Additionally wait for pods to be ready + logger.info("Waiting for pods of deployment %s to be ready...", deployment_name) + k8s_client.wait_for_pods_ready( + operation_timeout_in_minutes=5, + namespace=namespace, + pod_count=replicas, + label_selector=label_selector + ) + + logger.info("Successfully created and verified deployment %d", deployment_index) + successful_deployments += 1 + else: + logger.error("Deployment %s failed to become available within timeout", deployment_name) + continue + + except Exception as e: + logger.error("Failed to create deployment %d: %s", deployment_index, e) + # Continue with next deployment instead of failing completely + continue + + # Check if all deployments were successful + if successful_deployments == number_of_deployments: + logger.info("Successfully created all %d deployment(s)", number_of_deployments) + return True + if successful_deployments > 0: + logger.warning("Created %d/%d deployment(s)", successful_deployments, number_of_deployments) + return False + logger.error("Failed to create any deployments") + return False + + except Exception as e: + logger.error("Failed to create deployments: %s", e) + return False + + def create_job( + self, + node_pool_name, + completions=1, + manifest_dir=None, + number_of_jobs=1, + label_selector="app=nginx-container", + namespace="default" + ): + """ + Create Kubernetes jobs after node pool operations. + + Args: + node_pool_name: Name of the node pool to target + job_name: Base name for the jobs + namespace: Kubernetes namespace (default: "default") + completions: Number of job completions (default: 1) + manifest_dir: Directory containing Kubernetes manifest files + number_of_jobs: Number of jobs to create (default: 1) + + Returns: + True if all job creations were successful, False otherwise + """ + logger.info("Creating %d job(s)", number_of_jobs) + logger.info("Target node pool: %s", node_pool_name) + logger.info("Job completions: %d", completions) + logger.info("Using manifest directory: %s", manifest_dir) + + try: + # Get Kubernetes client from AKS client + k8s_client = self.aks_client.k8s_client + + if not k8s_client: + logger.error("Kubernetes client not available") + return False + + successful_jobs = 0 + + # Loop through number of jobs + for job_index in range(1, number_of_jobs + 1): + logger.info("Creating job %d/%d", job_index, number_of_jobs) + + try: + if manifest_dir: + # Use the template path from manifest_dir + template_path = f"{manifest_dir}/job.yml" + else: + # Use default template path + template_path = "modules/python/crud/workload_templates/job.yml" + + # Generate job name + job_name = f"myapp-{node_pool_name}-{job_index}" + + # Create job template using k8s_client.create_template + job_template = k8s_client.create_template( + template_path, + { + "JOB_COMPLETIONS": completions, + "NODE_POOL_NAME": node_pool_name, + "INDEX": job_index, + "LABEL_VALUE": label_selector.split("=", 1)[-1], + } + ) + + # Apply each document in the rendered multi-doc template + for doc in yaml.safe_load_all(job_template): + if doc: + k8s_client.apply_manifest_from_file(manifest_dict=doc) + + logger.info("Applied manifest for job %s", job_name) + + # Wait for job to complete (successful job verification) + logger.info("Waiting for job %s to complete...", job_name) + job_ready = k8s_client.wait_for_condition( + resource_type="job", + wait_condition_type="complete", + resource_name=job_name, + namespace=namespace, + timeout_seconds=self.step_timeout + ) + + if job_ready: + logger.info("Job %s is successfully complete", job_name) + logger.info("Successfully created and verified job %d", job_index) + successful_jobs += 1 + else: + logger.error("Job %s failed to complete within timeout", job_name) + continue + + except Exception as e: + logger.error("Failed to create job %d: %s", job_index, e) + # Continue with next job instead of failing completely + continue + + # Check if all jobs were successful + if successful_jobs == number_of_jobs: + logger.info("Successfully created all %d job(s)", number_of_jobs) + return True + if successful_jobs > 0: + logger.warning("Created %d/%d job(s)", successful_jobs, number_of_jobs) + return False + logger.error("Failed to create any jobs") + return False + + except Exception as e: + logger.error("Failed to create jobs: %s", e) + return False diff --git a/modules/python/crud/main.py b/modules/python/crud/main.py index 082fd68682..515fa0aa69 100644 --- a/modules/python/crud/main.py +++ b/modules/python/crud/main.py @@ -146,6 +146,44 @@ def handle_node_pool_operation(node_pool_crud, args): logger.error(f"Error during '{command}' operation: {str(e)}") return 1 +def handle_workload_operations(node_pool_crud, args): + """Handle workload operations (deployment, statefulset, jobs) based on the command""" + command = args.command + result = None + + try: + if command == "deployment": + # Prepare deploy arguments + deploy_kwargs = { + "node_pool_name": args.node_pool_name, + "replicas": args.replicas, + "manifest_dir": args.manifest_dir, + "number_of_deployments": args.number_of_deployments + } + + result = node_pool_crud.create_deployment(**deploy_kwargs) + elif command == "jobs": + # Prepare job arguments + job_kwargs = { + "node_pool_name": args.node_pool_name, + "completions": args.completions, + "manifest_dir": args.manifest_dir, + "number_of_jobs": args.number_of_jobs, + "label_selector": args.label_selector, + } + + result = node_pool_crud.create_job(**job_kwargs) + else: + logger.error("Unknown workload command: '%s'", command) + return 1 + # Check if the operation was successful + if result is False: + logger.error(f"Operation '{command}' failed") + return 1 + return 0 + except Exception as e: + logger.error(f"Error during '{command}' operation: {str(e)}") + return 1 def handle_node_pool_all(node_pool_crud, args): """Handle the all-in-one node pool operation command (create, scale up, scale down, delete)""" @@ -320,6 +358,61 @@ def main(): ) all_parser.set_defaults(func=handle_node_pool_operation) + # Deployment command - add after the "all" command parser + deployment_parser = subparsers.add_parser( + "deployment", parents=[common_parser], help="create deployments" + ) + deployment_parser.add_argument("--node-pool-name", required=True, help="Node pool name") + deployment_parser.add_argument( + "--number-of-deployments", + type=int, + default=1, + help="Number of deployments" + ) + deployment_parser.add_argument( + "--replicas", + type=int, + default=10, + help="Number of deployment replicas" + ) + deployment_parser.add_argument( + "--manifest-dir", + required=True, + help="Directory containing Kubernetes manifest files for the deployment" + ) + + deployment_parser.set_defaults(func=handle_workload_operations) + + # Jobs command - add after the "deployment" command parser + jobs_parser = subparsers.add_parser( + "jobs", parents=[common_parser], help="create jobs" + ) + jobs_parser.add_argument("--node-pool-name", required=True, help="Node pool name") + jobs_parser.add_argument( + "--number-of-jobs", + type=int, + default=1, + help="Number of jobs" + ) + jobs_parser.add_argument( + "--completions", + type=int, + default=1, + help="Number of job completions" + ) + jobs_parser.add_argument( + "--manifest-dir", + required=True, + help="Directory containing Kubernetes manifest files for the job" + ) + jobs_parser.add_argument( + "--label-selector", + default="app=nginx-container", + help="Label selector for created job pods (default: app=nginx-container)" + ) + + jobs_parser.set_defaults(func=handle_workload_operations) + # Arguments provided, run node pool operations and collect benchmark results try: args = parser.parse_args() diff --git a/modules/python/crud/workload_templates/deployment.yml b/modules/python/crud/workload_templates/deployment.yml new file mode 100644 index 0000000000..0d23751682 --- /dev/null +++ b/modules/python/crud/workload_templates/deployment.yml @@ -0,0 +1,34 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: myapp-{{NODE_POOL_NAME}}-{{INDEX}} + labels: + app: {{LABEL_VALUE}} +spec: + template: + metadata: + name: + labels: + app: {{LABEL_VALUE}} + spec: + containers: + - name: {{LABEL_VALUE}} + image: mcr.microsoft.com/oss/nginx/nginx:1.21.6 + ports: + - containerPort: 80 + replicas: {{DEPLOYMENT_REPLICAS}} + selector: + matchLabels: + app: {{LABEL_VALUE}} +--- +apiVersion: v1 +kind: Service +metadata: + name: myapp-{{NODE_POOL_NAME}}-{{INDEX}} +spec: + ports: + - port: 80 + name: myapp + clusterIP: None + selector: + app: {{LABEL_VALUE}} diff --git a/modules/python/crud/workload_templates/job.yml b/modules/python/crud/workload_templates/job.yml new file mode 100644 index 0000000000..4fc6642bff --- /dev/null +++ b/modules/python/crud/workload_templates/job.yml @@ -0,0 +1,20 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: myapp-{{NODE_POOL_NAME}}-{{INDEX}} + labels: + app: {{LABEL_VALUE}} +spec: + completions: {{JOB_COMPLETIONS}} + template: + metadata: + labels: + app: {{LABEL_VALUE}} + spec: + restartPolicy: Never + containers: + - name: {{LABEL_VALUE}} + image: mcr.microsoft.com/oss/nginx/nginx:1.21.6 + command: ["nginx", "-t"] + ports: + - containerPort: 80 diff --git a/modules/python/tests/clients/test_kubernetes_client.py b/modules/python/tests/clients/test_kubernetes_client.py index 8190789361..318ca2e13d 100644 --- a/modules/python/tests/clients/test_kubernetes_client.py +++ b/modules/python/tests/clients/test_kubernetes_client.py @@ -2586,6 +2586,36 @@ def test_apply_single_manifest_statefulset_no_namespace(self, mock_create_statef mock_create_statefulset.assert_called_once_with( namespace="default", body=manifest) + @patch('kubernetes.client.BatchV1Api.create_namespaced_job') + def test_apply_single_manifest_job(self, mock_create_job): + """Test _apply_single_manifest with Job resource.""" + manifest = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": {"name": "test-job", "namespace": "test-namespace"}, + "spec": {} + } + + # pylint: disable=protected-access + self.client._apply_single_manifest(manifest) + mock_create_job.assert_called_once_with( + namespace="test-namespace", body=manifest) + + @patch('kubernetes.client.BatchV1Api.create_namespaced_job') + def test_apply_single_manifest_job_no_namespace(self, mock_create_job): + """Test _apply_single_manifest with Job missing namespace uses 'default'.""" + manifest = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": {"name": "test-job"}, + "spec": {} + } + + # pylint: disable=protected-access + self.client._apply_single_manifest(manifest) + mock_create_job.assert_called_once_with( + namespace="default", body=manifest) + @patch('kubernetes.client.CoreV1Api.create_namespaced_service') def test_apply_single_manifest_service(self, mock_create_service): """Test _apply_single_manifest with Service resource.""" @@ -3407,6 +3437,160 @@ def test_wait_for_condition_case_insensitive(self): ) self.assertTrue(result) + # Tests for wait_for_condition with job resource type + + @patch('time.time') + def test_wait_for_condition_job_complete_success(self, mock_time): + """Test wait_for_condition for a completed job - success case""" + mock_time.side_effect = [0, 0, 1, 2, 2] + + mock_job = MagicMock() + mock_job.status.completion_time = "2026-04-20T00:00:00Z" + mock_job.status.succeeded = 1 + mock_job.status.failed = None + mock_job.status.active = None + mock_job.status.conditions = None + + with patch.object(self.client, 'batch') as mock_batch, \ + patch('time.sleep'): + mock_batch.read_namespaced_job.return_value = mock_job + + result = self.client.wait_for_condition( + resource_type="job", + resource_name="test-job", + wait_condition_type="complete", + namespace="test-namespace", + timeout_seconds=5 + ) + + self.assertTrue(result) + mock_batch.read_namespaced_job.assert_called_with( + name="test-job", + namespace="test-namespace" + ) + + @patch('time.time') + def test_wait_for_condition_job_complete_timeout(self, mock_time): + """Test wait_for_condition for a job - timeout when not yet complete""" + mock_time.side_effect = [0, 0, 2, 5, 6, 6] + + mock_job = MagicMock() + mock_job.status.completion_time = None + mock_job.status.succeeded = 0 + mock_job.status.failed = None + mock_job.status.active = 1 + mock_job.status.conditions = None + + with patch.object(self.client, 'batch') as mock_batch, \ + patch('time.sleep'): + mock_batch.read_namespaced_job.return_value = mock_job + + result = self.client.wait_for_condition( + resource_type="job", + resource_name="test-job", + wait_condition_type="complete", + namespace="test-namespace", + timeout_seconds=1 + ) + + self.assertFalse(result) + + @patch('time.time') + def test_wait_for_condition_job_failed_success(self, mock_time): + """Test wait_for_condition for a failed job - detects failure""" + mock_time.side_effect = [0, 0, 1, 2, 2] + + mock_job = MagicMock() + mock_job.status.completion_time = None + mock_job.status.succeeded = 0 + mock_job.status.failed = 3 + mock_job.status.active = 0 + mock_job.status.conditions = None + + with patch.object(self.client, 'batch') as mock_batch, \ + patch('time.sleep'): + mock_batch.read_namespaced_job.return_value = mock_job + + result = self.client.wait_for_condition( + resource_type="job", + resource_name="test-job", + wait_condition_type="failed", + namespace="test-namespace", + timeout_seconds=5 + ) + + self.assertTrue(result) + + @patch('time.time') + def test_wait_for_condition_all_jobs_complete(self, mock_time): + """Test wait_for_condition for all jobs in a namespace - all complete""" + mock_time.side_effect = [0, 0, 1, 2, 2] + + mock_job1 = MagicMock() + mock_job1.status.completion_time = "2026-04-20T00:00:00Z" + mock_job1.status.succeeded = 1 + mock_job1.status.failed = None + mock_job1.status.active = None + mock_job1.status.conditions = None + + mock_job2 = MagicMock() + mock_job2.status.completion_time = "2026-04-20T00:01:00Z" + mock_job2.status.succeeded = 2 + mock_job2.status.failed = None + mock_job2.status.active = None + mock_job2.status.conditions = None + + with patch.object(self.client, 'batch') as mock_batch, \ + patch('time.sleep'): + mock_batch.list_namespaced_job.return_value.items = [mock_job1, mock_job2] + + result = self.client.wait_for_condition( + resource_type="job", + resource_name=None, + wait_condition_type="complete", + namespace="test-namespace", + timeout_seconds=5, + wait_all=True + ) + + self.assertTrue(result) + mock_batch.list_namespaced_job.assert_called_with(namespace="test-namespace") + + @patch('time.time') + def test_wait_for_condition_job_not_found(self, mock_time): + """Test wait_for_condition for a job that doesn't exist - times out""" + mock_time.side_effect = [0, 0, 2, 5, 6, 6] + + api_exception = ApiException(status=404, reason="Not Found") + + with patch.object(self.client, 'batch') as mock_batch, \ + patch('time.sleep'): + mock_batch.read_namespaced_job.side_effect = api_exception + + result = self.client.wait_for_condition( + resource_type="job", + resource_name="nonexistent-job", + wait_condition_type="complete", + namespace="test-namespace", + timeout_seconds=1 + ) + + self.assertFalse(result) + + def test_wait_for_condition_job_invalid_condition(self): + """Test wait_for_condition with invalid condition for job resource""" + with self.assertRaises(ValueError) as context: + self.client.wait_for_condition( + resource_type="job", + resource_name="test-job", + wait_condition_type="available", + namespace="test-namespace", + timeout_seconds=1 + ) + + self.assertIn("Invalid condition 'available' for resource type 'job'", str(context.exception)) + self.assertIn("Valid conditions: complete, failed", str(context.exception)) + # Tests for the enhanced apply_manifest_from_file method with folder support @patch('os.path.isdir') @patch('os.path.isfile') @@ -4020,6 +4204,42 @@ def test_delete_single_manifest_statefulset_no_namespace(self): self.assertIn("StatefulSet requires a namespace", str(context.exception)) + def test_delete_single_manifest_job(self): + """Test deleting a single Job manifest.""" + manifest = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": {"name": "test-job", "namespace": "test-namespace"}, + "spec": {} + } + + with patch.object(self.client, 'batch') as mock_batch: + mock_batch.delete_namespaced_job.return_value = None + + # pylint: disable=protected-access + self.client._delete_single_manifest(manifest) + + mock_batch.delete_namespaced_job.assert_called_once_with( + name="test-job", + namespace="test-namespace", + body=unittest.mock.ANY + ) + + def test_delete_single_manifest_job_no_namespace(self): + """Test _delete_single_manifest with Job missing namespace raises error.""" + manifest = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": {"name": "test-job"}, + "spec": {} + } + + with self.assertRaises(Exception) as context: + # pylint: disable=protected-access + self.client._delete_single_manifest(manifest) + + self.assertIn("Job requires a namespace", str(context.exception)) + def test_delete_single_manifest_service(self): """Test deleting a single service manifest.""" manifest = { diff --git a/modules/python/tests/crud/test_azure_node_pool_crud.py b/modules/python/tests/crud/test_azure_node_pool_crud.py index 7089426166..7baabe28df 100644 --- a/modules/python/tests/crud/test_azure_node_pool_crud.py +++ b/modules/python/tests/crud/test_azure_node_pool_crud.py @@ -222,6 +222,246 @@ def test_all_operations(self, mock_time): # Check time.sleep was called 3 times (between operations) self.assertEqual(mock_time.sleep.call_count, 3) + @mock.patch("crud.azure.node_pool_crud.time") + def test_all_create_returns_false_early_exit(self, mock_time): + """Test that all() exits early when create returns False""" + # Setup - mock create to fail + self.node_pool_crud.create_node_pool = mock.MagicMock(return_value=False) + self.node_pool_crud.scale_node_pool = mock.MagicMock(return_value=True) + self.node_pool_crud.delete_node_pool = mock.MagicMock(return_value=True) + + # Execute + result = self.node_pool_crud.all( + node_pool_name="test-pool", + vm_size="Standard_DS2_v2", + node_count=1, + target_count=3, + progressive=True, + scale_step_size=1, + ) + + # Verify - should return False + self.assertFalse(result) + + # Verify create was called once + self.node_pool_crud.create_node_pool.assert_called_once() + + # Verify scale and delete were NOT called (early exit) + self.node_pool_crud.scale_node_pool.assert_not_called() + self.node_pool_crud.delete_node_pool.assert_not_called() + + # Verify time.sleep was NOT called (no operations after create) + mock_time.sleep.assert_not_called() + + @mock.patch("crud.azure.node_pool_crud.time") + def test_all_scale_up_fails_continues(self, mock_time): + """Test that all() continues to scale down and delete when scale up fails""" + # Setup - create succeeds, scale_up fails, scale_down and delete succeed + self.node_pool_crud.create_node_pool = mock.MagicMock(return_value=True) + self.node_pool_crud.scale_node_pool = mock.MagicMock( + side_effect=[False, True] # scale_up fails, scale_down succeeds + ) + self.node_pool_crud.delete_node_pool = mock.MagicMock(return_value=True) + + # Execute + result = self.node_pool_crud.all( + node_pool_name="test-pool", + vm_size="Standard_DS2_v2", + node_count=1, + target_count=3, + progressive=True, + scale_step_size=1, + ) + + # Verify - should return False (scale_up failed) + self.assertFalse(result) + + # Verify create was called once + self.node_pool_crud.create_node_pool.assert_called_once() + + # Verify scale was called TWICE (scale_up failed, but scale_down still called) + self.assertEqual(self.node_pool_crud.scale_node_pool.call_count, 2) + + # Verify delete was still called (cleanup continues despite scale_up failure) + self.node_pool_crud.delete_node_pool.assert_called_once() + + # Verify time.sleep was called 3 times (between all operations) + self.assertEqual(mock_time.sleep.call_count, 3) + + @mock.patch("crud.azure.node_pool_crud.time") + def test_all_scale_down_fails_continues(self, mock_time): + """Test that all() continues to delete when scale down fails""" + # Setup - create and scale_up succeed, scale_down fails, delete succeeds + self.node_pool_crud.create_node_pool = mock.MagicMock(return_value=True) + self.node_pool_crud.scale_node_pool = mock.MagicMock( + side_effect=[True, False] # scale_up succeeds, scale_down fails + ) + self.node_pool_crud.delete_node_pool = mock.MagicMock(return_value=True) + + # Execute + result = self.node_pool_crud.all( + node_pool_name="test-pool", + vm_size="Standard_DS2_v2", + node_count=1, + target_count=3, + progressive=True, + scale_step_size=1, + ) + + # Verify - should return False (scale_down failed) + self.assertFalse(result) + + # Verify create was called once + self.node_pool_crud.create_node_pool.assert_called_once() + + # Verify scale was called TWICE (scale_up succeeded, scale_down failed) + self.assertEqual(self.node_pool_crud.scale_node_pool.call_count, 2) + + # Verify delete was still called (cleanup continues despite scale_down failure) + self.node_pool_crud.delete_node_pool.assert_called_once() + + # Verify time.sleep was called 3 times (between all operations) + self.assertEqual(mock_time.sleep.call_count, 3) + + def test_create_deployment_success(self): + """Test successful deployment creation""" + # Setup + mock_k8s_client = mock.MagicMock() + self.mock_aks_client.k8s_client = mock_k8s_client + # Must return a real string - yaml.safe_load_all(MagicMock()) causes an infinite loop + mock_k8s_client.create_template.return_value = "apiVersion: apps/v1\nkind: Deployment\n" + mock_k8s_client.wait_for_condition.return_value = True + + # Execute + result = self.node_pool_crud.create_deployment(node_pool_name="test-pool") + + # Verify + self.assertTrue(result) + + def test_create_deployment_failure(self): + """Test deployment creation failure""" + # Setup + mock_k8s_client = mock.MagicMock() + self.mock_aks_client.k8s_client = mock_k8s_client + # Must return a real string - yaml.safe_load_all(MagicMock()) causes an infinite loop + mock_k8s_client.create_template.return_value = "apiVersion: apps/v1\nkind: Deployment\n" + mock_k8s_client.wait_for_condition.return_value = False + + # Execute + result = self.node_pool_crud.create_deployment(node_pool_name="test-pool") + + # Verify + self.assertFalse(result) + + def test_create_deployment_no_client(self): + """Test deployment creation with no Kubernetes client""" + # Setup + self.mock_aks_client.k8s_client = None + + # Execute + result = self.node_pool_crud.create_deployment(node_pool_name="test-pool") + + # Verify + self.assertFalse(result) + + def test_create_deployment_partial_success(self): + """Test deployment creation when some deployments succeed and others fail""" + # Setup + mock_k8s_client = mock.MagicMock() + self.mock_aks_client.k8s_client = mock_k8s_client + + # Must return a real string - yaml.safe_load_all(MagicMock()) causes an infinite loop + mock_k8s_client.create_template.return_value = "apiVersion: apps/v1\nkind: Deployment\n" + + # Simulate: deployment 1 succeeds, deployment 2 fails, deployment 3 succeeds + # wait_for_condition returns True/False for each deployment + mock_k8s_client.wait_for_condition.side_effect = [True, False, True] + + # Execute - request 3 deployments + result = self.node_pool_crud.create_deployment( + node_pool_name="test-pool", + number_of_deployments=3, + replicas=5 + ) + + # Verify - should return False (not all deployments succeeded) + self.assertFalse(result) + + # Verify wait_for_condition was called 3 times (once per deployment) + self.assertEqual(mock_k8s_client.wait_for_condition.call_count, 3) + + # Verify create_template was called 3 times (attempted all deployments) + self.assertEqual(mock_k8s_client.create_template.call_count, 3) + + def test_create_job_success(self): + """Test successful job creation""" + # Setup + mock_k8s_client = mock.MagicMock() + self.mock_aks_client.k8s_client = mock_k8s_client + # Must return a real string - yaml.safe_load_all(MagicMock()) causes an infinite loop + mock_k8s_client.create_template.return_value = "apiVersion: batch/v1\nkind: Job\n" + mock_k8s_client.wait_for_condition.return_value = True + + # Execute + result = self.node_pool_crud.create_job(node_pool_name="test-pool") + + # Verify + self.assertTrue(result) + + def test_create_job_failure(self): + """Test job creation failure""" + # Setup + mock_k8s_client = mock.MagicMock() + self.mock_aks_client.k8s_client = mock_k8s_client + # Must return a real string - yaml.safe_load_all(MagicMock()) causes an infinite loop + mock_k8s_client.create_template.return_value = "apiVersion: batch/v1\nkind: Job\n" + mock_k8s_client.wait_for_condition.return_value = False + + # Execute + result = self.node_pool_crud.create_job(node_pool_name="test-pool") + + # Verify + self.assertFalse(result) + + def test_create_job_no_client(self): + """Test job creation with no Kubernetes client""" + # Setup + self.mock_aks_client.k8s_client = None + + # Execute + result = self.node_pool_crud.create_job(node_pool_name="test-pool") + + # Verify + self.assertFalse(result) + + def test_create_job_partial_success(self): + """Test job creation when some jobs succeed and others fail""" + # Setup + mock_k8s_client = mock.MagicMock() + self.mock_aks_client.k8s_client = mock_k8s_client + + # Must return a real string - yaml.safe_load_all(MagicMock()) causes an infinite loop + mock_k8s_client.create_template.return_value = "apiVersion: batch/v1\nkind: Job\n" + + # Simulate: job 1 succeeds, job 2 fails, job 3 succeeds + # wait_for_condition returns True/False for each job + mock_k8s_client.wait_for_condition.side_effect = [True, False, True] + + # Execute - request 3 jobs + result = self.node_pool_crud.create_job( + node_pool_name="test-pool", + number_of_jobs=3, + completions=5 + ) + + # Verify - should return False (not all jobs succeeded) + self.assertFalse(result) + + # Verify wait_for_condition was called 3 times (once per job) + self.assertEqual(mock_k8s_client.wait_for_condition.call_count, 3) + + # Verify create_template was called 3 times (attempted all jobs) + self.assertEqual(mock_k8s_client.create_template.call_count, 3) if __name__ == "__main__": unittest.main() diff --git a/modules/python/tests/crud/test_main.py b/modules/python/tests/crud/test_main.py index c3ab848a83..8ad5a839e1 100644 --- a/modules/python/tests/crud/test_main.py +++ b/modules/python/tests/crud/test_main.py @@ -13,6 +13,7 @@ from crud.main import ( get_node_pool_crud_class, handle_node_pool_operation, + handle_workload_operations, main, check_for_progressive_scaling, collect_benchmark_results, @@ -145,6 +146,42 @@ def test_handle_node_pool_operation_scale_non_progressive(self, mock_azure_crud) gpu_node_pool=False, ) + @mock.patch("crud.main.logger") + @mock.patch("crud.main.AzureNodePoolCRUD") + def test_handle_node_pool_operation_scale_fails_returns_error( + self, mock_azure_crud, mock_logger + ): + """Test handle_node_pool_operation when scale up fails but continues execution. + + This test verifies that when scale_node_pool returns False (e.g., some nodes + failed to scale but the operation completed), the function correctly returns + exit code 1 to indicate failure while allowing the calling code to continue. + """ + # Setup - progressive scaling where operation fails + mock_args = mock.MagicMock() + mock_args.command = "scale" + mock_args.node_pool_name = "test-np" + mock_args.target_count = 10 + mock_args.scale_step_size = 2 # Progressive scaling + mock_args.gpu_node_pool = False + + # Configure mock to return False (scale failed but didn't raise exception) + mock_azure_crud.scale_node_pool.return_value = False + + # Execute + result = handle_node_pool_operation(mock_azure_crud, mock_args) + + # Verify - operation failed but returned gracefully (no exception) + self.assertEqual(result, 1) # 1 means failure + mock_azure_crud.scale_node_pool.assert_called_once_with( + node_pool_name="test-np", + node_count=10, + progressive=True, + scale_step_size=2, + gpu_node_pool=False, + ) + mock_logger.error.assert_called_with("Operation 'scale' failed") + @mock.patch("crud.main.AzureNodePoolCRUD") def test_handle_node_pool_operation_delete(self, mock_azure_crud): """Test handle_node_pool_operation for delete command""" @@ -339,6 +376,143 @@ def test_main_collect_command_simple(self, mock_collect_func): mock_collect_func.assert_called_once() self.assertEqual(cm.exception.code, 0) + @mock.patch("crud.main.AzureNodePoolCRUD") + def test_handle_workload_operations_create_pod_success(self, mock_azure_crud): + """Test handle_workload_operations for successful pod creation""" + # Setup + mock_args = mock.MagicMock() + mock_args.command = "deployment" + mock_args.node_pool_name = "test-nodepool" + mock_args.replicas = 5 + mock_args.manifest_dir = "/path/to/manifests" + mock_args.number_of_deployments = 3 + + # Configure mock to return success + mock_azure_crud.create_deployment.return_value = True + + # Execute + result = handle_workload_operations(mock_azure_crud, mock_args) + + # Verify + self.assertEqual(result, 0) # 0 means success + mock_azure_crud.create_deployment.assert_called_once_with( + node_pool_name="test-nodepool", + replicas=5, + manifest_dir="/path/to/manifests", + number_of_deployments=3 + ) + + @mock.patch("crud.main.AzureNodePoolCRUD") + def test_handle_workload_operations_failure(self, mock_azure_crud): + """Test handle_workload_operations when operation fails""" + # Setup + mock_args = mock.MagicMock() + mock_args.command = "deployment" + mock_args.node_pool_name = "test-nodepool" + mock_args.replicas = 5 + mock_args.manifest_dir = "/path/to/manifests" + mock_args.number_of_deployments = 3 + + # Configure mock to return failure + mock_azure_crud.create_deployment.return_value = False + + # Execute + result = handle_workload_operations(mock_azure_crud, mock_args) + + # Verify + self.assertEqual(result, 1) # 1 means failure + + @mock.patch("crud.main.logger") + @mock.patch("crud.main.AzureNodePoolCRUD") + def test_handle_workload_operations_exception(self, mock_azure_crud, mock_logger): + """Test handle_workload_operations with exception during operation""" + # Setup + mock_args = mock.MagicMock() + mock_args.command = "deployment" + mock_args.node_pool_name = "test-nodepool" + mock_args.replicas = 5 + mock_args.manifest_dir = "/path/to/manifests" + mock_args.number_of_deployments = 3 + + # Configure mock to raise exception + mock_azure_crud.create_deployment.side_effect = ValueError("Test error") + + # Execute + result = handle_workload_operations(mock_azure_crud, mock_args) + + # Verify + self.assertEqual(result, 1) # 1 means error + mock_logger.error.assert_called_with( + "Error during 'deployment' operation: Test error" + ) + + @mock.patch("crud.main.logger") + @mock.patch("crud.main.AzureNodePoolCRUD") + def test_handle_workload_operations_partial_success(self, mock_azure_crud, mock_logger): + """Test handle_workload_operations when deployment returns partial success (False). + + The create_deployment method returns False when some deployments succeed but + not all of them (partial success). This tests that handle_workload_operations + correctly treats this as a failure and returns exit code 1. + """ + # Setup - simulate a partial success scenario where create_deployment + # returns False (e.g., 2 out of 3 deployments succeeded) + mock_args = mock.MagicMock() + mock_args.command = "deployment" + mock_args.node_pool_name = "test-nodepool" + mock_args.replicas = 5 + mock_args.manifest_dir = "/path/to/manifests" + mock_args.number_of_deployments = 3 # Requesting 3 deployments + + # Configure mock to return False (partial success - some deployments + # succeeded but not all, which is still considered a failure) + mock_azure_crud.create_deployment.return_value = False + + # Execute + result = handle_workload_operations(mock_azure_crud, mock_args) + + # Verify + self.assertEqual(result, 1) # 1 means failure (partial success is still failure) + mock_azure_crud.create_deployment.assert_called_once_with( + node_pool_name="test-nodepool", + replicas=5, + manifest_dir="/path/to/manifests", + number_of_deployments=3 + ) + # Verify the error was logged for the failed operation + mock_logger.error.assert_called_with("Operation 'deployment' failed") + + @mock.patch("crud.main.AzureNodePoolCRUD") + def test_handle_workload_operations_multiple_deployments_success(self, mock_azure_crud): + """Test handle_workload_operations with multiple deployments all succeeding. + + This test verifies that when create_deployment is called with multiple + deployments (number_of_deployments > 1) and all deployments succeed, + the function returns success (exit code 0). + """ + # Setup - configure for multiple deployments + mock_args = mock.MagicMock() + mock_args.command = "deployment" + mock_args.node_pool_name = "test-nodepool" + mock_args.replicas = 10 + mock_args.manifest_dir = "/path/to/manifests" + mock_args.number_of_deployments = 5 # Multiple deployments + + # Configure mock to return True (all deployments succeeded) + mock_azure_crud.create_deployment.return_value = True + + # Execute + result = handle_workload_operations(mock_azure_crud, mock_args) + + # Verify + self.assertEqual(result, 0) # 0 means success + mock_azure_crud.create_deployment.assert_called_once_with( + node_pool_name="test-nodepool", + replicas=10, + manifest_dir="/path/to/manifests", + number_of_deployments=5 + ) + class TestCollectBenchmarkResults(unittest.TestCase): """Tests for the collect_benchmark_results function""" diff --git a/steps/engine/crud/k8s/execute.yml b/steps/engine/crud/k8s/execute.yml index 354b67e828..4c861cb817 100644 --- a/steps/engine/crud/k8s/execute.yml +++ b/steps/engine/crud/k8s/execute.yml @@ -9,6 +9,12 @@ parameters: step_time_out: 600 step_wait_time: 30 gpu_node_pool: false + deployment_name: "" + number_of_deployments: 1 + number_of_jobs: 1 + completions: 1 + replicas: 10 + manifest_dir: "" steps: - script: | @@ -37,6 +43,7 @@ steps: --step-wait-time "$STEP_WAIT_TIME" \ --step-timeout "$STEP_TIME_OUT" \ ${GPU_NODE_POOL:+--gpu-node-pool} + displayName: 'Execute K8s Create & Scale Up Operations for ${{ parameters.cloud }}' workingDirectory: modules/python env: @@ -54,6 +61,61 @@ steps: ${{ if eq(parameters.cloud, 'aws') }}: CAPACITY_TYPE: $(CAPACITY_TYPE) +- script: | + set -eo pipefail + + # Deploy Workloads + PYTHONPATH=$PYTHONPATH:$(pwd) python3 "$PYTHON_SCRIPT_FILE" deployment \ + --cloud "$CLOUD" \ + --run-id "$RUN_ID" \ + --result-dir "$RESULT_DIR" \ + --node-pool-name "$POOL_NAME" \ + --number-of-deployments "$NUMBER_OF_DEPLOYMENTS" \ + --replicas "$REPLICAS" \ + --manifest-dir "$MANIFEST_DIR" \ + --step-timeout "$STEP_TIME_OUT" \ + ${GPU_NODE_POOL:+--gpu-node-pool} + displayName: 'Execute K8s Workload operations for ${{ parameters.cloud }}' + workingDirectory: modules/python + env: + PYTHON_SCRIPT_FILE: $(Pipeline.Workspace)/s/modules/python/crud/main.py + POOL_NAME: ${{ parameters.pool_name }} + CLOUD: ${{ parameters.cloud }} + STEP_TIME_OUT: ${{ parameters.step_time_out }} + RESULT_DIR: $(System.DefaultWorkingDirectory)/$(RUN_ID) + GPU_NODE_POOL: ${{ parameters.gpu_node_pool }} + DEPLOYMENT_NAME: ${{ parameters.deployment_name }} + NUMBER_OF_DEPLOYMENTS: ${{ parameters.number_of_deployments }} + REPLICAS: ${{ parameters.replicas }} + MANIFEST_DIR: ${{ parameters.manifest_dir }} + +- script: | + set -eo pipefail + + # Deploy Jobs + PYTHONPATH=$PYTHONPATH:$(pwd) python3 "$PYTHON_SCRIPT_FILE" jobs \ + --cloud "$CLOUD" \ + --run-id "$RUN_ID" \ + --result-dir "$RESULT_DIR" \ + --node-pool-name "$POOL_NAME" \ + --number-of-jobs "$NUMBER_OF_JOBS" \ + --completions "$COMPLETIONS" \ + --manifest-dir "$MANIFEST_DIR" \ + --step-timeout "$STEP_TIME_OUT" \ + ${GPU_NODE_POOL:+--gpu-node-pool} + displayName: 'Execute K8s Job operations for ${{ parameters.cloud }}' + workingDirectory: modules/python + env: + PYTHON_SCRIPT_FILE: $(Pipeline.Workspace)/s/modules/python/crud/main.py + POOL_NAME: ${{ parameters.pool_name }} + CLOUD: ${{ parameters.cloud }} + STEP_TIME_OUT: ${{ parameters.step_time_out }} + RESULT_DIR: $(System.DefaultWorkingDirectory)/$(RUN_ID) + GPU_NODE_POOL: ${{ parameters.gpu_node_pool }} + NUMBER_OF_JOBS: ${{ parameters.number_of_jobs }} + COMPLETIONS: ${{ parameters.completions }} + MANIFEST_DIR: ${{ parameters.manifest_dir }} + - script: | set -eo pipefail diff --git a/steps/topology/k8s-crud-gpu/execute-crud.yml b/steps/topology/k8s-crud-gpu/execute-crud.yml index 166a123e38..6937c54e69 100644 --- a/steps/topology/k8s-crud-gpu/execute-crud.yml +++ b/steps/topology/k8s-crud-gpu/execute-crud.yml @@ -22,3 +22,9 @@ steps: result_dir: $(System.DefaultWorkingDirectory)/$(RUN_ID) gpu_node_pool: $(GPU_NODE_POOL) step_wait_time: $(STEP_WAIT_TIME) + deployment_name: $(DEPLOYMENT_NAME) + number_of_deployments: $(NUMBER_OF_DEPLOYMENTS) + number_of_jobs: $(NUMBER_OF_JOBS) + completions: $(COMPLETIONS) + replicas: $(REPLICAS) + manifest_dir: $(MANIFEST_DIR)