diff --git a/butler.py b/butler.py index 15bc9f0ef8f..f8757dbac00 100644 --- a/butler.py +++ b/butler.py @@ -435,7 +435,6 @@ def main(): 'clean_indexes', help='Clean up undefined indexes (in index.yaml).') parser_clean_indexes.add_argument( '-c', '--config-dir', required=True, help='Path to application config.') - parser_create_config = subparsers.add_parser( 'create_config', help='Create a new deployment config.') parser_create_config.add_argument( diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index eac1d204154..acb70f89ca1 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -64,6 +64,12 @@ 'regression': 24 * 60 * 60, } + +def get_task_duration(command): + """Gets the duration of a task.""" + return TASK_LEASE_SECONDS_BY_COMMAND.get(command, TASK_LEASE_SECONDS) + + TASK_QUEUE_DISPLAY_NAMES = { 'LINUX': 'Linux', 'LINUX_WITH_GPU': 'Linux (with GPU)', @@ -503,6 +509,7 @@ def __init__(self, pubsub_message): } self.eta = datetime.datetime.utcfromtimestamp(float(self.attribute('eta'))) + self.do_not_ack = False def attribute(self, key): """Return attribute value.""" @@ -550,7 +557,8 @@ def lease(self, _event=None): # pylint: disable=arguments-differ leaser_thread.join() # If we get here the task succeeded in running. Acknowledge the message. - self._pubsub_message.ack() + if not self.do_not_ack: + self._pubsub_message.ack() track_task_end() def dont_retry(self): @@ -587,7 +595,8 @@ def lease(self, _event=None): # pylint: disable=arguments-differ leaser_thread.join() # If we get here the task succeeded in running. Acknowledge the message. - self._pubsub_message.ack() + if not self.do_not_ack: + self._pubsub_message.ack() track_task_end() diff --git a/src/clusterfuzz/_internal/batch/service.py b/src/clusterfuzz/_internal/batch/service.py index a04106a0560..c11aea0a6da 100644 --- a/src/clusterfuzz/_internal/batch/service.py +++ b/src/clusterfuzz/_internal/batch/service.py @@ -238,11 +238,6 @@ def _get_config_names(batch_tasks: List[RemoteTask]): return config_map -def _get_task_duration(command): - return tasks.TASK_LEASE_SECONDS_BY_COMMAND.get(command, - tasks.TASK_LEASE_SECONDS) - - WeightedSubconfig = collections.namedtuple('WeightedSubconfig', ['name', 'weight']) @@ -293,7 +288,7 @@ def _get_specs_from_config(batch_tasks: List[RemoteTask]) -> Dict: # Lower numbers are a lower priority, meaning less likely to run From: # https://cloud.google.com/batch/docs/reference/rest/v1/projects.locations.jobs priority = 0 if task.command == 'fuzz' else 1 - max_run_duration = f'{_get_task_duration(task.command)}s' + max_run_duration = f'{tasks.get_task_duration(task.command)}s' # This saves us time and reduces fragementation, e.g. every linux fuzz task # run in this call will run in the same zone. if config_name not in subconfig_map: diff --git a/src/clusterfuzz/_internal/bot/tasks/task_types.py b/src/clusterfuzz/_internal/bot/tasks/task_types.py index de3e0d01364..7b3324f1cf6 100644 --- a/src/clusterfuzz/_internal/bot/tasks/task_types.py +++ b/src/clusterfuzz/_internal/bot/tasks/task_types.py @@ -225,7 +225,7 @@ def execute(self, task_argument, job_type, uworker_env): 'analyze': UTask, 'blame': TrustedTask, 'corpus_pruning': UTask, - 'fuzz': UTaskLocalExecutor, + 'fuzz': UTask, 'impact': TrustedTask, 'minimize': UTask, 'progression': UTask, diff --git a/src/clusterfuzz/_internal/k8s/service.py b/src/clusterfuzz/_internal/k8s/service.py index 70836bf350b..323e2900052 100644 --- a/src/clusterfuzz/_internal/k8s/service.py +++ b/src/clusterfuzz/_internal/k8s/service.py @@ -24,7 +24,9 @@ from google.auth.transport import requests as google_requests from googleapiclient import discovery from kubernetes import client as k8s_client +from kubernetes import config as k8s_config +from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils from clusterfuzz._internal.base.tasks import task_utils from clusterfuzz._internal.config import local_config @@ -35,8 +37,7 @@ from clusterfuzz._internal.remote_task import RemoteTaskInterface from clusterfuzz._internal.system import environment -# See https://cloud.google.com/batch/quotas#job_limits -MAX_CONCURRENT_VMS_PER_JOB = 1000 +MAX_PENDING_JOBS = 1000 CLUSTER_NAME = 'clusterfuzz-cronjobs-gke' KubernetesJobConfig = collections.namedtuple('KubernetesJobConfig', [ @@ -92,6 +93,7 @@ def _get_k8s_job_configs(remote_tasks: List[RemoteTask]) -> Dict: if not remote_tasks: return {} + #TODO(javanlacerda): Create remote task config batch_config = local_config.BatchConfig() config_map = _get_config_names(remote_tasks) configs = {} @@ -129,7 +131,8 @@ def _get_k8s_job_configs(remote_tasks: List[RemoteTask]) -> Dict: return configs -def _create_job_body(config: KubernetesJobConfig, input_url: str) -> dict: +def _create_job_body(config: KubernetesJobConfig, input_url: str, + service_account_name: str) -> dict: """Creates the body of a Kubernetes job.""" job_name = config.job_type.replace('_', '-') + '-' + str(uuid.uuid4()).split( @@ -143,10 +146,11 @@ def _create_job_body(config: KubernetesJobConfig, input_url: str) -> dict: 'name': job_name }, 'spec': { + 'activeDeadlineSeconds': tasks.get_task_duration(config.command), 'template': { 'spec': { - 'hostNetwork': - True, + 'serviceAccountName': + service_account_name, 'containers': [{ 'name': job_name, @@ -181,12 +185,16 @@ def _create_job_body(config: KubernetesJobConfig, input_url: str) -> dict: }, { 'name': 'IS_K8S_ENV', - 'value': 'True' + 'value': 'true' }, { 'name': 'DISABLE_MOUNTS', 'value': 'true' }, + { + 'name': 'UPDATE_WEB_TESTS', + 'value': 'False' + }, ], 'securityContext': { 'privileged': True, @@ -228,21 +236,24 @@ def __init__(self, k8s_config_loaded: bool = False): def _load_gke_credentials(self): """Loads GKE credentials and configures the Kubernetes client.""" - credentials, project = google.auth.default() + credentials, _ = google.auth.default() + project = utils.get_application_id() service = discovery.build('container', 'v1', credentials=credentials) - parent = f'projects/{project}/locations/-' - # pylint: disable=no-member - response = service.projects().locations().clusters().list( - parent=parent).execute() - - cluster = None - for c in response.get('clusters', []): - if c['name'] == CLUSTER_NAME: - cluster = c - break - - if not cluster: - logs.error(f'Cluster {CLUSTER_NAME} not found.') + parent = f"projects/{project}/locations/-" + + try: + response = service.projects().locations().clusters().list( + parent=parent).execute() + clusters = response.get('clusters', []) + cluster = next((c for c in clusters if c['name'] == CLUSTER_NAME), None) + + if not cluster: + logs.error(f"Cluster {CLUSTER_NAME} not found in project {project}.") + print(f"DEBUG: Cluster {CLUSTER_NAME} not found in project {project}.") + return + + except Exception as e: + logs.error(f"Failed to list clusters in {project}: {e}") return endpoint = cluster['endpoint'] @@ -250,9 +261,6 @@ def _load_gke_credentials(self): ca_cert = base64.b64decode(cluster['masterAuth']['clusterCaCertificate']) # Write CA cert to a temporary file. - # Note: This file persists for the lifetime of the process/container. - # Ideally it should be cleaned up, but standard k8s client usage involves - # file paths. fd, ca_cert_path = tempfile.mkstemp() with os.fdopen(fd, 'wb') as f: f.write(ca_cert) @@ -263,17 +271,38 @@ def _load_gke_credentials(self): configuration.verify_ssl = True def get_token(creds): - if not creds.valid: - request = google_requests.Request() + request = google_requests.Request() + if not creds.valid or creds.expired: creds.refresh(request) - return creds.token + return {"authorization": "Bearer " + creds.token} - # Hook to refresh token on API calls. - configuration.refresh_api_key_hook = lambda _: { - "authorization": "Bearer " + get_token(credentials) - } + configuration.refresh_api_key_hook = lambda _: get_token(credentials) + configuration.api_key = get_token(credentials) k8s_client.Configuration.set_default(configuration) + logs.info("GKE credentials loaded successfully.") + + def _create_service_account_if_needed(self, + service_account_email: str) -> str: + """Creates a Kubernetes Service Account if it doesn't exist.""" + service_account_name = service_account_email.split('@')[0] + namespace = 'default' + try: + self._core_api.read_namespaced_service_account(service_account_name, + namespace) + return service_account_name + except k8s_client.rest.ApiException as e: + if e.status != 404: + raise + + logs.info(f'Creating Service Account {service_account_name} for ' + f'{service_account_email}.') + metadata = k8s_client.V1ObjectMeta( + name=service_account_name, + annotations={'iam.gke.io/gcp-service-account': service_account_email}) + body = k8s_client.V1ServiceAccount(metadata=metadata) + self._core_api.create_namespaced_service_account(namespace, body) + return service_account_name def create_job(self, config: KubernetesJobConfig, input_url: str) -> str: """Creates a Kubernetes job. @@ -284,11 +313,25 @@ def create_job(self, config: KubernetesJobConfig, input_url: str) -> str: Returns: The name of the created Kubernetes job. """ - - job_body = _create_job_body(config, input_url) + service_account_name = self._create_service_account_if_needed( + config.service_account_email) + job_body = _create_job_body(config, input_url, service_account_name) self._batch_api.create_namespaced_job(body=job_body, namespace='default') return job_body['metadata']['name'] + def _get_pending_jobs_count(self) -> int: + """Returns the number of pending jobs.""" + try: + pods = self._core_api.list_namespaced_pod( + namespace='default', + label_selector='app.kubernetes.io/name=clusterfuzz-kata-job', + field_selector='status.phase=Pending') + logs.info(f"Found {len(pods.items)} pending jobs.") + return len(pods.items) + except Exception as e: + logs.error(f"Failed to list pods: {e}") + return 0 + def create_uworker_main_batch_job(self, module: str, job_type: str, input_download_url: str): """Creates a single batch job for a uworker main task.""" @@ -308,6 +351,15 @@ def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]): separate batch job for each group. This allows tasks with similar requirements to be processed together, which can improve efficiency. """ + if self._get_pending_jobs_count() >= MAX_PENDING_JOBS: + logs.warning( + f'Kubernetes job limit reached. Not acking {len(remote_tasks)} tasks.' + ) + for task in remote_tasks: + if task.pubsub_task: + task.pubsub_task.do_not_ack = True + return [] + job_specs = collections.defaultdict(list) configs = _get_k8s_job_configs(remote_tasks) for remote_task in remote_tasks: @@ -330,6 +382,8 @@ def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]): def create_kata_container_job(self, config: KubernetesJobConfig, input_url: str) -> str: """Creates a Kubernetes job that runs in a Kata container.""" + service_account_name = self._create_service_account_if_needed( + config.service_account_email) job_name = 'clusterfuzz-kata-job-' + str(uuid.uuid4()).split( '-', maxsplit=1)[0] @@ -340,6 +394,7 @@ def create_kata_container_job(self, config: KubernetesJobConfig, 'name': job_name }, 'spec': { + 'activeDeadlineSeconds': tasks.get_task_duration(config.command), 'template': { 'metadata': { 'labels': { @@ -349,8 +404,8 @@ def create_kata_container_job(self, config: KubernetesJobConfig, 'spec': { 'runtimeClassName': 'kata', - 'hostNetwork': - True, + 'serviceAccountName': + service_account_name, 'dnsPolicy': 'ClusterFirstWithHostNet', 'containers': [{ @@ -414,12 +469,16 @@ def create_kata_container_job(self, config: KubernetesJobConfig, }, { 'name': 'IS_K8S_ENV', - 'value': 'True' + 'value': 'true' }, { 'name': 'DISABLE_MOUNTS', 'value': 'true' }, + { + 'name': 'UPDATE_WEB_TESTS', + 'value': 'False' + }, ] }], 'restartPolicy': diff --git a/src/clusterfuzz/_internal/remote_task/__init__.py b/src/clusterfuzz/_internal/remote_task/__init__.py index 4b4a03d53ce..fdd7841c2cf 100644 --- a/src/clusterfuzz/_internal/remote_task/__init__.py +++ b/src/clusterfuzz/_internal/remote_task/__init__.py @@ -23,6 +23,7 @@ import random from typing import List +from clusterfuzz._internal.metrics import logs from clusterfuzz._internal.remote_task import job_frequency @@ -34,10 +35,11 @@ class RemoteTask: is used to enqueue tasks and track their state. """ - def __init__(self, command, job_type, input_download_url): + def __init__(self, command, job_type, input_download_url, pubsub_task=None): self.command = command self.job_type = job_type self.input_download_url = input_download_url + self.pubsub_task = pubsub_task class RemoteTaskInterface(abc.ABC): @@ -74,13 +76,13 @@ def __init__(self): self._gcp_batch_service = GcpBatchService() self._kubernetes_service = KubernetesService() - def _should_use_kubernetes(self, job_type: str) -> bool: + def _should_use_kubernetes(self) -> bool: """Determines whether to use the Kubernetes backend for a given job. The decision is made based on a random roll and the configured frequency for the given job type. """ - frequencies = job_frequency.get_job_frequency(job_type) + frequencies = job_frequency.get_job_frequency() return random.random() < frequencies['kubernetes'] def create_uworker_main_batch_job(self, module: str, job_type: str, @@ -89,7 +91,7 @@ def create_uworker_main_batch_job(self, module: str, job_type: str, The choice of backend is determined by the `_should_use_kubernetes` method. """ - if self._should_use_kubernetes(job_type): + if self._should_use_kubernetes(): return self._kubernetes_service.create_uworker_main_batch_job( module, job_type, input_download_url) return self._gcp_batch_service.create_uworker_main_batch_job( @@ -104,39 +106,33 @@ def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]): gcp_batch_tasks = [] kubernetes_tasks = [] - # Group tasks by job_type to respect per-job frequencies - tasks_by_job = collections.defaultdict(list) - for task in remote_tasks: - tasks_by_job[task.job_type].append(task) - - for job_type, tasks in tasks_by_job.items(): - # Use random distribution if there is only one task - if len(tasks) == 1: - if self._should_use_kubernetes(job_type): - kubernetes_tasks.extend(tasks) - else: - gcp_batch_tasks.extend(tasks) - continue - + # Use random distribution if there is only one task + if len(remote_tasks) == 1: + if self._should_use_kubernetes(): + kubernetes_tasks.extend(remote_tasks) + else: + gcp_batch_tasks.extend(remote_tasks) + else: # Use deterministic slicing for multiple tasks - frequencies = job_frequency.get_job_frequency(job_type) + frequencies = job_frequency.get_job_frequency() k8s_ratio = frequencies['kubernetes'] - k8s_count = int(len(tasks) * k8s_ratio) + k8s_count = int(len(remote_tasks) * k8s_ratio) # We take the first chunk for Kubernetes - kubernetes_tasks.extend(tasks[:k8s_count]) - gcp_batch_tasks.extend(tasks[k8s_count:]) + kubernetes_tasks.extend(remote_tasks[:k8s_count]) + gcp_batch_tasks.extend(remote_tasks[k8s_count:]) - print(f'Sending {len(gcp_batch_tasks)} tasks to GCP Batch.') - print(f'Sending {len(kubernetes_tasks)} tasks to Kubernetes.') + logs.info(f'Sending {len(gcp_batch_tasks)} tasks to GCP Batch.') + logs.info(f'Sending {len(kubernetes_tasks)} tasks to Kubernetes.') results = [] - if gcp_batch_tasks: - results.extend( - self._gcp_batch_service.create_uworker_main_batch_jobs( - gcp_batch_tasks)) if kubernetes_tasks: results.extend( self._kubernetes_service.create_uworker_main_batch_jobs( kubernetes_tasks)) + + if gcp_batch_tasks: + results.extend( + self._gcp_batch_service.create_uworker_main_batch_jobs( + gcp_batch_tasks)) return results diff --git a/src/clusterfuzz/_internal/remote_task/job_frequency.py b/src/clusterfuzz/_internal/remote_task/job_frequency.py index 1b0423efab6..0347738cbb2 100644 --- a/src/clusterfuzz/_internal/remote_task/job_frequency.py +++ b/src/clusterfuzz/_internal/remote_task/job_frequency.py @@ -26,35 +26,10 @@ DEFAULT_FREQUENCY = {'gcp_batch': 1.0, 'kubernetes': 0.0} -def _get_job_frequencies_from_env(): - """Parses the `K8S_JOBS_FREQUENCY` environment variable. - - The variable should be a comma-separated list of key-value pairs, where the - key is the job name and the value is the frequency (a float between 0 and 1). - For example: `libfuzzer_asan_chrome=0.5,libfuzzer_msan_chrome=0.2`. - """ - job_frequencies = {} - frequency_string = environment.get_value('K8S_JOBS_FREQUENCY') - if not frequency_string: - return {} - - for item in frequency_string.split(','): - key, value = item.split('=') - job_frequencies[key] = float(value) - return job_frequencies - - -def get_job_frequency(job_name): +def get_job_frequency(): """Returns the frequency for a given job. If the frequency is not explicitly defined in the `K8S_JOBS_FREQUENCY` environment variable, the default frequency is returned. """ - job_frequencies = _get_job_frequencies_from_env() - if job_name in job_frequencies: - kubernetes_frequency = job_frequencies[job_name] - return { - 'gcp_batch': 1.0 - kubernetes_frequency, - 'kubernetes': kubernetes_frequency - } return DEFAULT_FREQUENCY diff --git a/src/clusterfuzz/_internal/tests/core/k8s/k8s_integration_test.py b/src/clusterfuzz/_internal/tests/core/k8s/k8s_integration_test.py new file mode 100644 index 00000000000..d3c9e024eb1 --- /dev/null +++ b/src/clusterfuzz/_internal/tests/core/k8s/k8s_integration_test.py @@ -0,0 +1,93 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Integration tests for KubernetesService.""" + +import base64 +import os +import unittest +from unittest import mock + +from kubernetes import client + +from clusterfuzz._internal.k8s import service + + +class KubernetesIntegrationTest(unittest.TestCase): + """Integration tests for KubernetesService.""" + + @mock.patch('googleapiclient.discovery.build') + def test_load_credentials(self, mock_discovery_build): + """Test that credentials can be loaded manually using the fallback logic.""" + # Ensure no kubeconfig interferes to force manual path (if local kubeconfig exists) + # Note: os.environ changes are process-local. + old_kubeconfig = os.environ.get('KUBECONFIG') + os.environ['KUBECONFIG'] = '/dev/null' + + # Mock GKE response + mock_service = mock.Mock() + mock_discovery_build.return_value = mock_service + + mock_clusters_list = mock_service.projects().locations().clusters().list( + ).execute + mock_clusters_list.return_value = { + 'clusters': [{ + 'name': 'clusterfuzz-cronjobs-gke', + 'endpoint': '1.2.3.4', + 'masterAuth': { + 'clusterCaCertificate': + base64.b64encode(b'fake-cert').decode('utf-8') + } + }] + } + + # Mock list_namespaced_job to avoid actual network call to 1.2.3.4 + with mock.patch('kubernetes.client.BatchV1Api.list_namespaced_job'): + try: + # This will trigger _load_gke_credentials + # It should try load_kube_config (fail), load_incluster (fail), then manual. + k8s_service = service.KubernetesService() + + # Verify api client is initialized + self.assertIsNotNone(k8s_service._batch_api) + self.assertIsInstance(k8s_service._batch_api, client.BatchV1Api) + + # Verify configuration + config = client.Configuration.get_default_copy() + print(f"Loaded Host: {config.host}") + + # Check that we got a valid https endpoint + self.assertTrue(config.host.startswith("https://")) + self.assertTrue(config.verify_ssl) + self.assertIsNotNone(config.ssl_ca_cert) + + # Verify API key fix is present (Crucial for manual path) + self.assertIn("authorization", config.api_key) + + # Verify hook is present + self.assertIsNotNone(config.refresh_api_key_hook) + + # Verify actual connectivity and auth + print("Attempting to list jobs to verify authentication...") + k8s_service._batch_api.list_namespaced_job(namespace='default', limit=1) + print("Successfully listed jobs.") + + finally: + if old_kubeconfig: + os.environ['KUBECONFIG'] = old_kubeconfig + else: + del os.environ['KUBECONFIG'] + + +if __name__ == '__main__': + unittest.main() diff --git a/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_limit_test.py b/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_limit_test.py new file mode 100644 index 00000000000..f1e15369e56 --- /dev/null +++ b/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_limit_test.py @@ -0,0 +1,85 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for the Kubernetes batch client limit logic.""" + +import unittest +from unittest import mock + +from clusterfuzz._internal.k8s import service +from clusterfuzz._internal.tests.test_libs import test_utils + + +@test_utils.with_cloud_emulators('datastore') +@mock.patch('kubernetes.config.load_kube_config') +class KubernetesServiceLimitTest(unittest.TestCase): + """Tests for the KubernetesService limit logic.""" + + def setUp(self): + patcher = mock.patch( + 'clusterfuzz._internal.k8s.service.KubernetesService._load_gke_credentials' + ) + self.addCleanup(patcher.stop) + self.mock_load_gke = patcher.start() + + # Create a job to prevent KeyError in _get_k8s_job_configs + from clusterfuzz._internal.datastore import data_types + data_types.Job(name='job1', platform='LINUX').put() + + @mock.patch.object(service.KubernetesService, '_get_pending_jobs_count') + def test_create_uworker_main_batch_jobs_limit_not_reached( + self, mock_get_pending_count, _): + """Tests that create_uworker_main_batch_jobs proceeds when limit not reached.""" + mock_get_pending_count.return_value = 99 + kube_service = service.KubernetesService() + + # We expect this to proceed to job creation logic (which we mock to avoid actual creation) + with mock.patch.object(service.KubernetesService, + 'create_kata_container_job') as mock_create: + kube_service.create_uworker_main_batch_jobs( + [service.RemoteTask('fuzz', 'job1', 'url1')]) + self.assertTrue(mock_create.called) + + @mock.patch.object(service.KubernetesService, '_get_pending_jobs_count') + def test_create_uworker_main_batch_jobs_limit_reached( + self, mock_get_pending_count, _): + """Tests that create_uworker_main_batch_jobs nacks tasks when limit reached.""" + mock_get_pending_count.return_value = 100 + kube_service = service.KubernetesService() + + mock_pubsub_task = mock.Mock() + mock_pubsub_task.do_not_ack = False + task = service.RemoteTask( + 'fuzz', 'job1', 'url1', pubsub_task=mock_pubsub_task) + + result = kube_service.create_uworker_main_batch_jobs([task]) + + self.assertEqual(result, []) + self.assertTrue(mock_pubsub_task.do_not_ack) + + @mock.patch.object(service.KubernetesService, '_get_pending_jobs_count') + def test_create_uworker_main_batch_jobs_limit_exceeded( + self, mock_get_pending_count, _): + """Tests that create_uworker_main_batch_jobs nacks tasks when limit exceeded.""" + mock_get_pending_count.return_value = 101 + kube_service = service.KubernetesService() + + mock_pubsub_task = mock.Mock() + mock_pubsub_task.do_not_ack = False + task = service.RemoteTask( + 'fuzz', 'job1', 'url1', pubsub_task=mock_pubsub_task) + + result = kube_service.create_uworker_main_batch_jobs([task]) + + self.assertEqual(result, []) + self.assertTrue(mock_pubsub_task.do_not_ack) diff --git a/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_test.py b/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_test.py index 81d61af7bad..69e622484d2 100644 --- a/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_test.py +++ b/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_test.py @@ -38,11 +38,13 @@ def setUp(self): name='job2', platform='LINUX', environment_string='CUSTOM_VAR = value').put() + @mock.patch.object(service.KubernetesService, '_get_pending_jobs_count') @mock.patch.object(service.KubernetesService, 'create_kata_container_job') @mock.patch.object(service.KubernetesService, 'create_job') - def test_create_uworker_main_batch_jobs(self, mock_create_job, - mock_create_kata_job, _): + def test_create_uworker_main_batch_jobs( + self, mock_create_job, mock_create_kata_job, mock_get_pending_count, _): """Tests the creation of uworker main batch jobs.""" + mock_get_pending_count.return_value = 0 tasks = [ service.RemoteTask('fuzz', 'job1', 'url1'), service.RemoteTask('fuzz', 'job1', 'url2'), @@ -61,20 +63,55 @@ def test_create_uworker_main_batch_jobs(self, mock_create_job, [call.args[1] for call in mock_create_kata_job.call_args_list]) self.assertEqual(urls, ['url1', 'url2', 'url3']) + @mock.patch('kubernetes.client.CoreV1Api') + def test_get_pending_jobs_count(self, mock_core_api_cls, _): + """Tests _get_pending_jobs_count.""" + mock_core_api = mock_core_api_cls.return_value + kube_service = service.KubernetesService() + + # Mock pods + mock_core_api.list_namespaced_pod.return_value.items = [ + mock.Mock(), mock.Mock() + ] + + self.assertEqual(2, kube_service._get_pending_jobs_count()) + mock_core_api.list_namespaced_pod.assert_called_with( + namespace='default', + label_selector='app.kubernetes.io/name=clusterfuzz-kata-job', + field_selector='status.phase=Pending') + + @mock.patch.object(service.KubernetesService, '_get_pending_jobs_count') + def test_create_uworker_main_batch_jobs_limit_reached( + self, mock_get_pending_count, _): + """Tests that create_uworker_main_batch_jobs nacks when limit reached.""" + mock_get_pending_count.return_value = service.MAX_PENDING_JOBS + kube_service = service.KubernetesService() + + mock_pubsub_task = mock.Mock() + mock_pubsub_task.do_not_ack = False + task = service.RemoteTask( + 'fuzz', 'job1', 'url1', pubsub_task=mock_pubsub_task) + + result = kube_service.create_uworker_main_batch_jobs([task]) + self.assertEqual(result, []) + self.assertTrue(mock_pubsub_task.do_not_ack) + @mock.patch('kubernetes.client.BatchV1Api') def test_create_kata_container_job_spec(self, mock_batch_api_cls, _): """Tests that create_kata_container_job generates the correct spec.""" mock_batch_api = mock_batch_api_cls.return_value kube_service = service.KubernetesService() - # Force _batch_api to be our mock (though init usually does it if we patched class before init) - # The patch is applied for this method, so init inside will use the mock class. + # Mock _create_service_account_if_needed + kube_service._create_service_account_if_needed = mock.Mock( + return_value='test') config = service.KubernetesJobConfig( job_type='test-job', docker_image='test-image', command='fuzz', disk_size_gb=10, - service_account_email='email', + service_account_email= + 'test@clusterfuzz-test.iam.gserviceaccount.com', clusterfuzz_release='prod', is_kata=True) @@ -88,9 +125,6 @@ def test_create_kata_container_job_spec(self, mock_batch_api_cls, _): pod_spec = job_body['spec']['template']['spec'] container = pod_spec['containers'][0] - # Check hostNetwork - self.assertTrue(pod_spec['hostNetwork']) - # Check capabilities self.assertEqual(['ALL'], container['securityContext']['capabilities']['add']) @@ -103,6 +137,40 @@ def test_create_kata_container_job_spec(self, mock_batch_api_cls, _): volumes = {v['name']: v for v in pod_spec['volumes']} self.assertEqual('1.9Gi', volumes['dshm']['emptyDir']['sizeLimit']) + # Check Service Account + self.assertEqual('test', pod_spec['serviceAccountName']) + kube_service._create_service_account_if_needed.assert_called_with( + 'test@clusterfuzz-test.iam.gserviceaccount.com') + + @mock.patch('kubernetes.client.BatchV1Api') + def test_create_job(self, mock_batch_api_cls, _): + """Tests create_job.""" + mock_batch_api = mock_batch_api_cls.return_value + kube_service = service.KubernetesService() + kube_service._create_service_account_if_needed = mock.Mock( + return_value='test-sa') + + config = service.KubernetesJobConfig( + job_type='test-job', + docker_image='test-image', + command='fuzz', + disk_size_gb=10, + service_account_email='test-email', + clusterfuzz_release='prod', + is_kata=False) + + kube_service.create_job(config, 'input_url') + + self.assertTrue(mock_batch_api.create_namespaced_job.called) + call_args = mock_batch_api.create_namespaced_job.call_args + job_body = call_args.kwargs['body'] + + # Check Service Account + self.assertEqual('test-sa', + job_body['spec']['template']['spec']['serviceAccountName']) + kube_service._create_service_account_if_needed.assert_called_with( + 'test-email') + @mock.patch( 'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module') @mock.patch.object(service.KubernetesService, diff --git a/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_test.py b/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_test.py new file mode 100644 index 00000000000..3e29ef2cb6e --- /dev/null +++ b/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_test.py @@ -0,0 +1,79 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for remote_task.""" + +import unittest +from unittest import mock + +from clusterfuzz._internal.k8s import service as k8s_service +from clusterfuzz._internal.remote_task import RemoteTask +from clusterfuzz._internal.remote_task import RemoteTaskGate +from clusterfuzz._internal.tests.test_libs import test_utils + + +@test_utils.with_cloud_emulators('datastore') +class RemoteTaskGateTest(unittest.TestCase): + """Tests for RemoteTaskGate.""" + + def setUp(self): + self.gate = RemoteTaskGate() + + @mock.patch( + 'clusterfuzz._internal.remote_task.job_frequency.get_job_frequency') + @mock.patch.object(k8s_service.KubernetesService, + 'create_uworker_main_batch_jobs') + @mock.patch( + 'clusterfuzz._internal.batch.service.GcpBatchService.create_uworker_main_batch_jobs' + ) + def test_create_uworker_main_batch_jobs_k8s_limit_reached( + self, mock_gcp_create, mock_k8s_create, mock_get_frequency): + """Test delegation when K8s limit is reached (handled by service).""" + # Setup tasks to go to Kubernetes + mock_get_frequency.return_value = {'kubernetes': 1.0} + + task = RemoteTask('fuzz', 'job1', 'url1') + + # Simulate K8s service returning empty list (limit reached) + mock_k8s_create.return_value = [] + + result = self.gate.create_uworker_main_batch_jobs([task]) + + # Verify K8s was attempted + self.assertTrue(mock_k8s_create.called) + + # Verify GCP was NOT attempted + self.assertFalse(mock_gcp_create.called) + + # Verify result is empty list + self.assertEqual(result, []) + + @mock.patch( + 'clusterfuzz._internal.remote_task.job_frequency.get_job_frequency') + @mock.patch.object(k8s_service.KubernetesService, + 'create_uworker_main_batch_jobs') + @mock.patch( + 'clusterfuzz._internal.batch.service.GcpBatchService.create_uworker_main_batch_jobs' + ) + def test_create_uworker_main_batch_jobs_success( + self, mock_gcp_create, mock_k8s_create, mock_get_frequency): + """Test successful creation.""" + mock_get_frequency.return_value = {'kubernetes': 1.0} + mock_pubsub_task = mock.Mock() + mock_pubsub_task.do_not_ack = False + task = RemoteTask('fuzz', 'job1', 'url1', pubsub_task=mock_pubsub_task) + + self.gate.create_uworker_main_batch_jobs([task]) + + self.assertTrue(mock_k8s_create.called) + self.assertFalse(mock_pubsub_task.do_not_ack) diff --git a/src/local/butler/scripts/run_remote_task.py b/src/local/butler/scripts/run_remote_task.py index f64b930a719..1e290bae91f 100644 --- a/src/local/butler/scripts/run_remote_task.py +++ b/src/local/butler/scripts/run_remote_task.py @@ -45,7 +45,6 @@ def schedule_utask_mains(): return [] print(f'Combining {len(utask_mains)} batch tasks.') - utask_mains = utask_mains[:5] results = [] with lease_all_tasks(utask_mains): batch_tasks = [ diff --git a/src/python/bot/startup/run_bot.py b/src/python/bot/startup/run_bot.py index 5e80e98f8ac..849609736c8 100644 --- a/src/python/bot/startup/run_bot.py +++ b/src/python/bot/startup/run_bot.py @@ -99,7 +99,7 @@ def schedule_utask_mains(): with lease_all_tasks(utask_mains): batch_tasks = [ - RemoteTask(task.command, task.job, task.argument) + RemoteTask(task.command, task.job, task.argument, pubsub_task=task) for task in utask_mains ] RemoteTaskGate().create_uworker_main_batch_jobs(batch_tasks)