|
| 1 | +import logging |
| 2 | +import sys |
| 3 | +import unittest |
| 4 | + |
| 5 | +import yaml |
| 6 | + |
| 7 | +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") |
| 8 | +logger = logging.getLogger(__name__) |
| 9 | + |
| 10 | + |
| 11 | +def load_template(yaml_file): |
| 12 | + try: |
| 13 | + with open(yaml_file) as file: |
| 14 | + documents = yaml.safe_load_all(file) |
| 15 | + list_of_documents = [doc for doc in documents] |
| 16 | + return list_of_documents |
| 17 | + except yaml.YAMLError as error: |
| 18 | + print("Error in configuration file: ", error) |
| 19 | + |
| 20 | + |
| 21 | +class ScaledJobTemplateTests(unittest.TestCase): |
| 22 | + def test_scaled_job_has_zero_limits(self): |
| 23 | + scaled_jobs = [doc for doc in LIST_OF_DOCUMENTS if doc and doc.get("kind") == "ScaledJob"] |
| 24 | + self.assertTrue(scaled_jobs, "No ScaledJob resources found") |
| 25 | + for doc in scaled_jobs: |
| 26 | + logger.info(f"Assert ScaledJob limits are set to 0 in {doc['metadata']['name']}") |
| 27 | + self.assertEqual(doc.get("apiVersion"), "keda.sh/v1alpha1") |
| 28 | + spec = doc.get("spec", {}) |
| 29 | + job_target_ref = spec.get("jobTargetRef", {}) |
| 30 | + self.assertEqual(job_target_ref.get("backoffLimit"), 0) |
| 31 | + self.assertEqual(spec.get("minReplicaCount"), 0) |
| 32 | + self.assertEqual(spec.get("successfulJobsHistoryLimit"), 0) |
| 33 | + |
| 34 | + |
| 35 | +if __name__ == "__main__": |
| 36 | + failed = False |
| 37 | + try: |
| 38 | + FILE_NAME = sys.argv[1] |
| 39 | + LIST_OF_DOCUMENTS = load_template(FILE_NAME) |
| 40 | + suite = unittest.TestLoader().loadTestsFromTestCase(ScaledJobTemplateTests) |
| 41 | + test_runner = unittest.TextTestRunner(verbosity=3) |
| 42 | + failed = not test_runner.run(suite).wasSuccessful() |
| 43 | + except Exception as e: |
| 44 | + logger.fatal(e) |
| 45 | + failed = True |
| 46 | + |
| 47 | + if failed: |
| 48 | + exit(1) |
0 commit comments