From 234b2c10b255712d3dae7e4ae0a9b5da01debfc9 Mon Sep 17 00:00:00 2001 From: Javan Lacerda Date: Tue, 30 Dec 2025 21:26:24 +0000 Subject: [PATCH 1/3] fix k8s log Signed-off-by: Javan Lacerda --- src/clusterfuzz/_internal/metrics/logs.py | 86 ++++++++++++++--------- 1 file changed, 52 insertions(+), 34 deletions(-) diff --git a/src/clusterfuzz/_internal/metrics/logs.py b/src/clusterfuzz/_internal/metrics/logs.py index cc77185abfa..bc25979e6c1 100644 --- a/src/clusterfuzz/_internal/metrics/logs.py +++ b/src/clusterfuzz/_internal/metrics/logs.py @@ -429,42 +429,60 @@ def configure_appengine(): def configure_k8s(): - """Configure logging for K8S and reporting errors.""" + """Configure logging for K8S.""" import google.cloud.logging + from google.cloud.logging.handlers import CloudLoggingHandler + from google.cloud.logging.handlers.transports import BackgroundThreadTransport + client = google.cloud.logging.Client() - client.setup_logging() - old_factory = logging.getLogRecordFactory() - - def record_factory(*args, **kwargs): - """Insert jsonPayload fields to all logs.""" - - record = old_factory(*args, **kwargs) - if not hasattr(record, 'json_fields'): - record.json_fields = {} - - # Add jsonPayload fields to logs that don't contain stack traces to enable - # capturing and grouping by error reporting. - # https://cloud.google.com/error-reporting/docs/formatting-error-messages#log-text - if record.levelno >= logging.ERROR and not record.exc_info: - record.json_fields.update({ - '@type': - 'type.googleapis.com/google.devtools.clouderrorreporting.v1beta1.ReportedErrorEvent', # pylint: disable=line-too-long - 'serviceContext': { - 'service': 'k8s', - }, - 'context': { - 'reportLocation': { - 'filePath': record.pathname, - 'lineNumber': record.lineno, - 'functionName': record.funcName, - } - }, - }) - - return record - - logging.setLogRecordFactory(record_factory) - logging.getLogger().addFilter(json_fields_filter) + labels = { + 'k8s-pod/app': os.getenv('BOT_NAME', 'unknown'), + 'bot_name': os.getenv('BOT_NAME', 'unknown'), + } + + class FlushIntervalTransport(BackgroundThreadTransport): + + def __init__(self, client, name, **kwargs): + super().__init__( + client, + name, + grace_period=int(os.getenv('LOGGING_CLOUD_GRACE_PERIOD', '15')), + max_latency=int(os.getenv('LOGGING_CLOUD_MAX_LATENCY', '10')), + **kwargs) + + handler = CloudLoggingHandler( + client=client, + name='k8s-logs', + labels=labels, + transport=FlushIntervalTransport) + + def k8s_label_filter(record): + handler.labels.update({ + 'task_payload': + os.getenv('TASK_PAYLOAD', 'null'), + 'fuzz_target': + os.getenv('FUZZ_TARGET', 'null'), + 'worker_bot_name': + os.getenv('WORKER_BOT_NAME', 'null'), + 'extra': + truncate( + json.dumps( + getattr(record, 'extras', {}), + default=_handle_unserializable), + STACKDRIVER_LOG_MESSAGE_LIMIT), + 'location': + json.dumps( + getattr(record, 'location', {'Error': True}), + default=_handle_unserializable), + }) + return True + + handler.addFilter(k8s_label_filter) + handler.setLevel(logging.INFO) + formatter = JsonFormatter() + handler.setFormatter(formatter) + + logging.getLogger().addHandler(handler) logging.getLogger().setLevel(logging.INFO) From af8ad7b67802b37ff489cbd8a05dbe778abf7725 Mon Sep 17 00:00:00 2001 From: Javan Lacerda Date: Mon, 5 Jan 2026 18:33:46 +0000 Subject: [PATCH 2/3] emit runtime for crashes Signed-off-by: Javan Lacerda --- .../_internal/bot/tasks/utasks/fuzz_task.py | 26 ++++++++++++------- .../_internal/metrics/monitoring_metrics.py | 6 +++++ .../core/bot/tasks/utasks/fuzz_task_test.py | 13 ++++++++++ 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/src/clusterfuzz/_internal/bot/tasks/utasks/fuzz_task.py b/src/clusterfuzz/_internal/bot/tasks/utasks/fuzz_task.py index cbb6538a34b..6c1266a7ef9 100644 --- a/src/clusterfuzz/_internal/bot/tasks/utasks/fuzz_task.py +++ b/src/clusterfuzz/_internal/bot/tasks/utasks/fuzz_task.py @@ -461,6 +461,7 @@ def clamp(val, minimum, maximum): 'return_code': return_code, 'platform': environment.platform(), 'job': job_type, + 'runtime': environment.get_runtime().value }) @@ -480,20 +481,26 @@ def _track_testcase_run_result(fuzzer, job_type, new_crash_count, known_crash_count, { 'fuzzer': fuzzer, 'platform': environment.platform(), + 'runtime': environment.get_runtime().value }) monitoring_metrics.FUZZER_NEW_CRASH_COUNT.increment_by( new_crash_count, { 'fuzzer': fuzzer, 'platform': environment.platform(), + 'runtime': environment.get_runtime().value + }) + monitoring_metrics.JOB_KNOWN_CRASH_COUNT.increment_by( + known_crash_count, { + 'job': job_type, + 'platform': environment.platform(), + 'runtime': environment.get_runtime().value + }) + monitoring_metrics.JOB_NEW_CRASH_COUNT.increment_by( + new_crash_count, { + 'job': job_type, + 'platform': environment.platform(), + 'runtime': environment.get_runtime().value }) - monitoring_metrics.JOB_KNOWN_CRASH_COUNT.increment_by(known_crash_count, { - 'job': job_type, - 'platform': environment.platform(), - }) - monitoring_metrics.JOB_NEW_CRASH_COUNT.increment_by(new_crash_count, { - 'job': job_type, - 'platform': environment.platform() - }) def _last_sync_time(sync_file_path): @@ -2020,7 +2027,8 @@ def run(self): fuzzing_session_duration, { 'fuzzer': self.fuzzer_name, 'job': self.job_type, - 'platform': environment.platform() + 'platform': environment.platform(), + 'runtime': environment.get_runtime().value, }) return uworker_msg_pb2.Output(fuzz_task_output=self.fuzz_task_output) # pylint: disable=no-member diff --git a/src/clusterfuzz/_internal/metrics/monitoring_metrics.py b/src/clusterfuzz/_internal/metrics/monitoring_metrics.py index d28000378f4..d1272693734 100644 --- a/src/clusterfuzz/_internal/metrics/monitoring_metrics.py +++ b/src/clusterfuzz/_internal/metrics/monitoring_metrics.py @@ -97,6 +97,7 @@ field_spec=[ monitor.StringField('fuzzer'), monitor.StringField('platform'), + monitor.StringField('runtime') ]) FUZZER_NEW_CRASH_COUNT = monitor.CounterMetric( @@ -106,6 +107,7 @@ field_spec=[ monitor.StringField('fuzzer'), monitor.StringField('platform'), + monitor.StringField('runtime') ]) JOB_KNOWN_CRASH_COUNT = monitor.CounterMetric( @@ -115,6 +117,7 @@ field_spec=[ monitor.StringField('job'), monitor.StringField('platform'), + monitor.StringField('runtime') ]) JOB_NEW_CRASH_COUNT = monitor.CounterMetric( @@ -124,6 +127,7 @@ field_spec=[ monitor.StringField('job'), monitor.StringField('platform'), + monitor.StringField('runtime') ]) FUZZER_RETURN_CODE_COUNT = monitor.CounterMetric( @@ -135,6 +139,7 @@ monitor.IntegerField('return_code'), monitor.StringField('platform'), monitor.StringField('job'), + monitor.StringField('runtime') ], ) @@ -163,6 +168,7 @@ monitor.StringField('fuzzer'), monitor.StringField('job'), monitor.StringField('platform'), + monitor.StringField('runtime') ], ) diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/fuzz_task_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/fuzz_task_test.py index 9a3c5f50fae..eafbf5d85b5 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/fuzz_task_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/fuzz_task_test.py @@ -60,6 +60,9 @@ def setUp(self): monitor.metrics_store().reset_for_testing() helpers.patch(self, ['clusterfuzz._internal.system.environment.platform']) self.mock.platform.return_value = 'some_platform' + helpers.patch(self, + ['clusterfuzz._internal.system.environment.get_runtime']) + self.mock.get_runtime.return_value = environment.UtaskMainRuntime.KATA_CONTAINER def test_fuzzer_run_result(self): """Ensure _track_fuzzer_run_result set the right metrics.""" @@ -77,6 +80,7 @@ def test_fuzzer_run_result(self): 'return_code': 2, 'platform': 'some_platform', 'job': 'job', + 'runtime': 'kata_container' })) self.assertEqual( 1, @@ -85,6 +89,7 @@ def test_fuzzer_run_result(self): 'return_code': 0, 'platform': 'some_platform', 'job': 'job', + 'runtime': 'kata_container' })) self.assertEqual( 1, @@ -93,6 +98,7 @@ def test_fuzzer_run_result(self): 'return_code': -1, 'platform': 'some_platform', 'job': 'job', + 'runtime': 'kata_container' })) testcase_count_ratio = ( @@ -143,6 +149,9 @@ def setUp(self): monitor.metrics_store().reset_for_testing() helpers.patch(self, ['clusterfuzz._internal.system.environment.platform']) self.mock.platform.return_value = 'some_platform' + helpers.patch(self, + ['clusterfuzz._internal.system.environment.get_runtime']) + self.mock.get_runtime.return_value = environment.UtaskMainRuntime.KATA_CONTAINER def test_testcase_run_result(self): """Ensure _track_testcase_run_result sets the right metrics.""" @@ -154,24 +163,28 @@ def test_testcase_run_result(self): monitoring_metrics.JOB_NEW_CRASH_COUNT.get({ 'job': 'job', 'platform': 'some_platform', + 'runtime': 'kata_container', })) self.assertEqual( 15, monitoring_metrics.JOB_KNOWN_CRASH_COUNT.get({ 'job': 'job', 'platform': 'some_platform', + 'runtime': 'kata_container', })) self.assertEqual( 7, monitoring_metrics.FUZZER_NEW_CRASH_COUNT.get({ 'fuzzer': 'fuzzer', 'platform': 'some_platform', + 'runtime': 'kata_container', })) self.assertEqual( 15, monitoring_metrics.FUZZER_KNOWN_CRASH_COUNT.get({ 'fuzzer': 'fuzzer', 'platform': 'some_platform', + 'runtime': 'kata_container', })) From 606d749376a54fdc9bb5679374349b0cb328316f Mon Sep 17 00:00:00 2001 From: Javan Lacerda Date: Fri, 9 Jan 2026 11:57:35 -0300 Subject: [PATCH 3/3] Pr/k8s core service (#5116) Signed-off-by: Javan Lacerda --- butler.py | 1 - .../_internal/base/tasks/__init__.py | 13 +- src/clusterfuzz/_internal/batch/service.py | 7 +- .../_internal/bot/tasks/task_types.py | 2 +- src/clusterfuzz/_internal/k8s/service.py | 129 +++++++++++++----- .../_internal/remote_task/__init__.py | 52 ++++--- .../_internal/remote_task/job_frequency.py | 27 +--- .../tests/core/k8s/k8s_integration_test.py | 93 +++++++++++++ .../tests/core/k8s/k8s_service_limit_test.py | 85 ++++++++++++ .../tests/core/k8s/k8s_service_test.py | 84 ++++++++++-- .../core/remote_task/remote_task_test.py | 79 +++++++++++ src/local/butler/scripts/run_remote_task.py | 1 - src/python/bot/startup/run_bot.py | 2 +- 13 files changed, 466 insertions(+), 109 deletions(-) create mode 100644 src/clusterfuzz/_internal/tests/core/k8s/k8s_integration_test.py create mode 100644 src/clusterfuzz/_internal/tests/core/k8s/k8s_service_limit_test.py create mode 100644 src/clusterfuzz/_internal/tests/core/remote_task/remote_task_test.py diff --git a/butler.py b/butler.py index 15bc9f0ef8f..f8757dbac00 100644 --- a/butler.py +++ b/butler.py @@ -435,7 +435,6 @@ def main(): 'clean_indexes', help='Clean up undefined indexes (in index.yaml).') parser_clean_indexes.add_argument( '-c', '--config-dir', required=True, help='Path to application config.') - parser_create_config = subparsers.add_parser( 'create_config', help='Create a new deployment config.') parser_create_config.add_argument( diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index eac1d204154..acb70f89ca1 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -64,6 +64,12 @@ 'regression': 24 * 60 * 60, } + +def get_task_duration(command): + """Gets the duration of a task.""" + return TASK_LEASE_SECONDS_BY_COMMAND.get(command, TASK_LEASE_SECONDS) + + TASK_QUEUE_DISPLAY_NAMES = { 'LINUX': 'Linux', 'LINUX_WITH_GPU': 'Linux (with GPU)', @@ -503,6 +509,7 @@ def __init__(self, pubsub_message): } self.eta = datetime.datetime.utcfromtimestamp(float(self.attribute('eta'))) + self.do_not_ack = False def attribute(self, key): """Return attribute value.""" @@ -550,7 +557,8 @@ def lease(self, _event=None): # pylint: disable=arguments-differ leaser_thread.join() # If we get here the task succeeded in running. Acknowledge the message. - self._pubsub_message.ack() + if not self.do_not_ack: + self._pubsub_message.ack() track_task_end() def dont_retry(self): @@ -587,7 +595,8 @@ def lease(self, _event=None): # pylint: disable=arguments-differ leaser_thread.join() # If we get here the task succeeded in running. Acknowledge the message. - self._pubsub_message.ack() + if not self.do_not_ack: + self._pubsub_message.ack() track_task_end() diff --git a/src/clusterfuzz/_internal/batch/service.py b/src/clusterfuzz/_internal/batch/service.py index a04106a0560..c11aea0a6da 100644 --- a/src/clusterfuzz/_internal/batch/service.py +++ b/src/clusterfuzz/_internal/batch/service.py @@ -238,11 +238,6 @@ def _get_config_names(batch_tasks: List[RemoteTask]): return config_map -def _get_task_duration(command): - return tasks.TASK_LEASE_SECONDS_BY_COMMAND.get(command, - tasks.TASK_LEASE_SECONDS) - - WeightedSubconfig = collections.namedtuple('WeightedSubconfig', ['name', 'weight']) @@ -293,7 +288,7 @@ def _get_specs_from_config(batch_tasks: List[RemoteTask]) -> Dict: # Lower numbers are a lower priority, meaning less likely to run From: # https://cloud.google.com/batch/docs/reference/rest/v1/projects.locations.jobs priority = 0 if task.command == 'fuzz' else 1 - max_run_duration = f'{_get_task_duration(task.command)}s' + max_run_duration = f'{tasks.get_task_duration(task.command)}s' # This saves us time and reduces fragementation, e.g. every linux fuzz task # run in this call will run in the same zone. if config_name not in subconfig_map: diff --git a/src/clusterfuzz/_internal/bot/tasks/task_types.py b/src/clusterfuzz/_internal/bot/tasks/task_types.py index de3e0d01364..7b3324f1cf6 100644 --- a/src/clusterfuzz/_internal/bot/tasks/task_types.py +++ b/src/clusterfuzz/_internal/bot/tasks/task_types.py @@ -225,7 +225,7 @@ def execute(self, task_argument, job_type, uworker_env): 'analyze': UTask, 'blame': TrustedTask, 'corpus_pruning': UTask, - 'fuzz': UTaskLocalExecutor, + 'fuzz': UTask, 'impact': TrustedTask, 'minimize': UTask, 'progression': UTask, diff --git a/src/clusterfuzz/_internal/k8s/service.py b/src/clusterfuzz/_internal/k8s/service.py index 70836bf350b..323e2900052 100644 --- a/src/clusterfuzz/_internal/k8s/service.py +++ b/src/clusterfuzz/_internal/k8s/service.py @@ -24,7 +24,9 @@ from google.auth.transport import requests as google_requests from googleapiclient import discovery from kubernetes import client as k8s_client +from kubernetes import config as k8s_config +from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils from clusterfuzz._internal.base.tasks import task_utils from clusterfuzz._internal.config import local_config @@ -35,8 +37,7 @@ from clusterfuzz._internal.remote_task import RemoteTaskInterface from clusterfuzz._internal.system import environment -# See https://cloud.google.com/batch/quotas#job_limits -MAX_CONCURRENT_VMS_PER_JOB = 1000 +MAX_PENDING_JOBS = 1000 CLUSTER_NAME = 'clusterfuzz-cronjobs-gke' KubernetesJobConfig = collections.namedtuple('KubernetesJobConfig', [ @@ -92,6 +93,7 @@ def _get_k8s_job_configs(remote_tasks: List[RemoteTask]) -> Dict: if not remote_tasks: return {} + #TODO(javanlacerda): Create remote task config batch_config = local_config.BatchConfig() config_map = _get_config_names(remote_tasks) configs = {} @@ -129,7 +131,8 @@ def _get_k8s_job_configs(remote_tasks: List[RemoteTask]) -> Dict: return configs -def _create_job_body(config: KubernetesJobConfig, input_url: str) -> dict: +def _create_job_body(config: KubernetesJobConfig, input_url: str, + service_account_name: str) -> dict: """Creates the body of a Kubernetes job.""" job_name = config.job_type.replace('_', '-') + '-' + str(uuid.uuid4()).split( @@ -143,10 +146,11 @@ def _create_job_body(config: KubernetesJobConfig, input_url: str) -> dict: 'name': job_name }, 'spec': { + 'activeDeadlineSeconds': tasks.get_task_duration(config.command), 'template': { 'spec': { - 'hostNetwork': - True, + 'serviceAccountName': + service_account_name, 'containers': [{ 'name': job_name, @@ -181,12 +185,16 @@ def _create_job_body(config: KubernetesJobConfig, input_url: str) -> dict: }, { 'name': 'IS_K8S_ENV', - 'value': 'True' + 'value': 'true' }, { 'name': 'DISABLE_MOUNTS', 'value': 'true' }, + { + 'name': 'UPDATE_WEB_TESTS', + 'value': 'False' + }, ], 'securityContext': { 'privileged': True, @@ -228,21 +236,24 @@ def __init__(self, k8s_config_loaded: bool = False): def _load_gke_credentials(self): """Loads GKE credentials and configures the Kubernetes client.""" - credentials, project = google.auth.default() + credentials, _ = google.auth.default() + project = utils.get_application_id() service = discovery.build('container', 'v1', credentials=credentials) - parent = f'projects/{project}/locations/-' - # pylint: disable=no-member - response = service.projects().locations().clusters().list( - parent=parent).execute() - - cluster = None - for c in response.get('clusters', []): - if c['name'] == CLUSTER_NAME: - cluster = c - break - - if not cluster: - logs.error(f'Cluster {CLUSTER_NAME} not found.') + parent = f"projects/{project}/locations/-" + + try: + response = service.projects().locations().clusters().list( + parent=parent).execute() + clusters = response.get('clusters', []) + cluster = next((c for c in clusters if c['name'] == CLUSTER_NAME), None) + + if not cluster: + logs.error(f"Cluster {CLUSTER_NAME} not found in project {project}.") + print(f"DEBUG: Cluster {CLUSTER_NAME} not found in project {project}.") + return + + except Exception as e: + logs.error(f"Failed to list clusters in {project}: {e}") return endpoint = cluster['endpoint'] @@ -250,9 +261,6 @@ def _load_gke_credentials(self): ca_cert = base64.b64decode(cluster['masterAuth']['clusterCaCertificate']) # Write CA cert to a temporary file. - # Note: This file persists for the lifetime of the process/container. - # Ideally it should be cleaned up, but standard k8s client usage involves - # file paths. fd, ca_cert_path = tempfile.mkstemp() with os.fdopen(fd, 'wb') as f: f.write(ca_cert) @@ -263,17 +271,38 @@ def _load_gke_credentials(self): configuration.verify_ssl = True def get_token(creds): - if not creds.valid: - request = google_requests.Request() + request = google_requests.Request() + if not creds.valid or creds.expired: creds.refresh(request) - return creds.token + return {"authorization": "Bearer " + creds.token} - # Hook to refresh token on API calls. - configuration.refresh_api_key_hook = lambda _: { - "authorization": "Bearer " + get_token(credentials) - } + configuration.refresh_api_key_hook = lambda _: get_token(credentials) + configuration.api_key = get_token(credentials) k8s_client.Configuration.set_default(configuration) + logs.info("GKE credentials loaded successfully.") + + def _create_service_account_if_needed(self, + service_account_email: str) -> str: + """Creates a Kubernetes Service Account if it doesn't exist.""" + service_account_name = service_account_email.split('@')[0] + namespace = 'default' + try: + self._core_api.read_namespaced_service_account(service_account_name, + namespace) + return service_account_name + except k8s_client.rest.ApiException as e: + if e.status != 404: + raise + + logs.info(f'Creating Service Account {service_account_name} for ' + f'{service_account_email}.') + metadata = k8s_client.V1ObjectMeta( + name=service_account_name, + annotations={'iam.gke.io/gcp-service-account': service_account_email}) + body = k8s_client.V1ServiceAccount(metadata=metadata) + self._core_api.create_namespaced_service_account(namespace, body) + return service_account_name def create_job(self, config: KubernetesJobConfig, input_url: str) -> str: """Creates a Kubernetes job. @@ -284,11 +313,25 @@ def create_job(self, config: KubernetesJobConfig, input_url: str) -> str: Returns: The name of the created Kubernetes job. """ - - job_body = _create_job_body(config, input_url) + service_account_name = self._create_service_account_if_needed( + config.service_account_email) + job_body = _create_job_body(config, input_url, service_account_name) self._batch_api.create_namespaced_job(body=job_body, namespace='default') return job_body['metadata']['name'] + def _get_pending_jobs_count(self) -> int: + """Returns the number of pending jobs.""" + try: + pods = self._core_api.list_namespaced_pod( + namespace='default', + label_selector='app.kubernetes.io/name=clusterfuzz-kata-job', + field_selector='status.phase=Pending') + logs.info(f"Found {len(pods.items)} pending jobs.") + return len(pods.items) + except Exception as e: + logs.error(f"Failed to list pods: {e}") + return 0 + def create_uworker_main_batch_job(self, module: str, job_type: str, input_download_url: str): """Creates a single batch job for a uworker main task.""" @@ -308,6 +351,15 @@ def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]): separate batch job for each group. This allows tasks with similar requirements to be processed together, which can improve efficiency. """ + if self._get_pending_jobs_count() >= MAX_PENDING_JOBS: + logs.warning( + f'Kubernetes job limit reached. Not acking {len(remote_tasks)} tasks.' + ) + for task in remote_tasks: + if task.pubsub_task: + task.pubsub_task.do_not_ack = True + return [] + job_specs = collections.defaultdict(list) configs = _get_k8s_job_configs(remote_tasks) for remote_task in remote_tasks: @@ -330,6 +382,8 @@ def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]): def create_kata_container_job(self, config: KubernetesJobConfig, input_url: str) -> str: """Creates a Kubernetes job that runs in a Kata container.""" + service_account_name = self._create_service_account_if_needed( + config.service_account_email) job_name = 'clusterfuzz-kata-job-' + str(uuid.uuid4()).split( '-', maxsplit=1)[0] @@ -340,6 +394,7 @@ def create_kata_container_job(self, config: KubernetesJobConfig, 'name': job_name }, 'spec': { + 'activeDeadlineSeconds': tasks.get_task_duration(config.command), 'template': { 'metadata': { 'labels': { @@ -349,8 +404,8 @@ def create_kata_container_job(self, config: KubernetesJobConfig, 'spec': { 'runtimeClassName': 'kata', - 'hostNetwork': - True, + 'serviceAccountName': + service_account_name, 'dnsPolicy': 'ClusterFirstWithHostNet', 'containers': [{ @@ -414,12 +469,16 @@ def create_kata_container_job(self, config: KubernetesJobConfig, }, { 'name': 'IS_K8S_ENV', - 'value': 'True' + 'value': 'true' }, { 'name': 'DISABLE_MOUNTS', 'value': 'true' }, + { + 'name': 'UPDATE_WEB_TESTS', + 'value': 'False' + }, ] }], 'restartPolicy': diff --git a/src/clusterfuzz/_internal/remote_task/__init__.py b/src/clusterfuzz/_internal/remote_task/__init__.py index 4b4a03d53ce..fdd7841c2cf 100644 --- a/src/clusterfuzz/_internal/remote_task/__init__.py +++ b/src/clusterfuzz/_internal/remote_task/__init__.py @@ -23,6 +23,7 @@ import random from typing import List +from clusterfuzz._internal.metrics import logs from clusterfuzz._internal.remote_task import job_frequency @@ -34,10 +35,11 @@ class RemoteTask: is used to enqueue tasks and track their state. """ - def __init__(self, command, job_type, input_download_url): + def __init__(self, command, job_type, input_download_url, pubsub_task=None): self.command = command self.job_type = job_type self.input_download_url = input_download_url + self.pubsub_task = pubsub_task class RemoteTaskInterface(abc.ABC): @@ -74,13 +76,13 @@ def __init__(self): self._gcp_batch_service = GcpBatchService() self._kubernetes_service = KubernetesService() - def _should_use_kubernetes(self, job_type: str) -> bool: + def _should_use_kubernetes(self) -> bool: """Determines whether to use the Kubernetes backend for a given job. The decision is made based on a random roll and the configured frequency for the given job type. """ - frequencies = job_frequency.get_job_frequency(job_type) + frequencies = job_frequency.get_job_frequency() return random.random() < frequencies['kubernetes'] def create_uworker_main_batch_job(self, module: str, job_type: str, @@ -89,7 +91,7 @@ def create_uworker_main_batch_job(self, module: str, job_type: str, The choice of backend is determined by the `_should_use_kubernetes` method. """ - if self._should_use_kubernetes(job_type): + if self._should_use_kubernetes(): return self._kubernetes_service.create_uworker_main_batch_job( module, job_type, input_download_url) return self._gcp_batch_service.create_uworker_main_batch_job( @@ -104,39 +106,33 @@ def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]): gcp_batch_tasks = [] kubernetes_tasks = [] - # Group tasks by job_type to respect per-job frequencies - tasks_by_job = collections.defaultdict(list) - for task in remote_tasks: - tasks_by_job[task.job_type].append(task) - - for job_type, tasks in tasks_by_job.items(): - # Use random distribution if there is only one task - if len(tasks) == 1: - if self._should_use_kubernetes(job_type): - kubernetes_tasks.extend(tasks) - else: - gcp_batch_tasks.extend(tasks) - continue - + # Use random distribution if there is only one task + if len(remote_tasks) == 1: + if self._should_use_kubernetes(): + kubernetes_tasks.extend(remote_tasks) + else: + gcp_batch_tasks.extend(remote_tasks) + else: # Use deterministic slicing for multiple tasks - frequencies = job_frequency.get_job_frequency(job_type) + frequencies = job_frequency.get_job_frequency() k8s_ratio = frequencies['kubernetes'] - k8s_count = int(len(tasks) * k8s_ratio) + k8s_count = int(len(remote_tasks) * k8s_ratio) # We take the first chunk for Kubernetes - kubernetes_tasks.extend(tasks[:k8s_count]) - gcp_batch_tasks.extend(tasks[k8s_count:]) + kubernetes_tasks.extend(remote_tasks[:k8s_count]) + gcp_batch_tasks.extend(remote_tasks[k8s_count:]) - print(f'Sending {len(gcp_batch_tasks)} tasks to GCP Batch.') - print(f'Sending {len(kubernetes_tasks)} tasks to Kubernetes.') + logs.info(f'Sending {len(gcp_batch_tasks)} tasks to GCP Batch.') + logs.info(f'Sending {len(kubernetes_tasks)} tasks to Kubernetes.') results = [] - if gcp_batch_tasks: - results.extend( - self._gcp_batch_service.create_uworker_main_batch_jobs( - gcp_batch_tasks)) if kubernetes_tasks: results.extend( self._kubernetes_service.create_uworker_main_batch_jobs( kubernetes_tasks)) + + if gcp_batch_tasks: + results.extend( + self._gcp_batch_service.create_uworker_main_batch_jobs( + gcp_batch_tasks)) return results diff --git a/src/clusterfuzz/_internal/remote_task/job_frequency.py b/src/clusterfuzz/_internal/remote_task/job_frequency.py index 1b0423efab6..0347738cbb2 100644 --- a/src/clusterfuzz/_internal/remote_task/job_frequency.py +++ b/src/clusterfuzz/_internal/remote_task/job_frequency.py @@ -26,35 +26,10 @@ DEFAULT_FREQUENCY = {'gcp_batch': 1.0, 'kubernetes': 0.0} -def _get_job_frequencies_from_env(): - """Parses the `K8S_JOBS_FREQUENCY` environment variable. - - The variable should be a comma-separated list of key-value pairs, where the - key is the job name and the value is the frequency (a float between 0 and 1). - For example: `libfuzzer_asan_chrome=0.5,libfuzzer_msan_chrome=0.2`. - """ - job_frequencies = {} - frequency_string = environment.get_value('K8S_JOBS_FREQUENCY') - if not frequency_string: - return {} - - for item in frequency_string.split(','): - key, value = item.split('=') - job_frequencies[key] = float(value) - return job_frequencies - - -def get_job_frequency(job_name): +def get_job_frequency(): """Returns the frequency for a given job. If the frequency is not explicitly defined in the `K8S_JOBS_FREQUENCY` environment variable, the default frequency is returned. """ - job_frequencies = _get_job_frequencies_from_env() - if job_name in job_frequencies: - kubernetes_frequency = job_frequencies[job_name] - return { - 'gcp_batch': 1.0 - kubernetes_frequency, - 'kubernetes': kubernetes_frequency - } return DEFAULT_FREQUENCY diff --git a/src/clusterfuzz/_internal/tests/core/k8s/k8s_integration_test.py b/src/clusterfuzz/_internal/tests/core/k8s/k8s_integration_test.py new file mode 100644 index 00000000000..d3c9e024eb1 --- /dev/null +++ b/src/clusterfuzz/_internal/tests/core/k8s/k8s_integration_test.py @@ -0,0 +1,93 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Integration tests for KubernetesService.""" + +import base64 +import os +import unittest +from unittest import mock + +from kubernetes import client + +from clusterfuzz._internal.k8s import service + + +class KubernetesIntegrationTest(unittest.TestCase): + """Integration tests for KubernetesService.""" + + @mock.patch('googleapiclient.discovery.build') + def test_load_credentials(self, mock_discovery_build): + """Test that credentials can be loaded manually using the fallback logic.""" + # Ensure no kubeconfig interferes to force manual path (if local kubeconfig exists) + # Note: os.environ changes are process-local. + old_kubeconfig = os.environ.get('KUBECONFIG') + os.environ['KUBECONFIG'] = '/dev/null' + + # Mock GKE response + mock_service = mock.Mock() + mock_discovery_build.return_value = mock_service + + mock_clusters_list = mock_service.projects().locations().clusters().list( + ).execute + mock_clusters_list.return_value = { + 'clusters': [{ + 'name': 'clusterfuzz-cronjobs-gke', + 'endpoint': '1.2.3.4', + 'masterAuth': { + 'clusterCaCertificate': + base64.b64encode(b'fake-cert').decode('utf-8') + } + }] + } + + # Mock list_namespaced_job to avoid actual network call to 1.2.3.4 + with mock.patch('kubernetes.client.BatchV1Api.list_namespaced_job'): + try: + # This will trigger _load_gke_credentials + # It should try load_kube_config (fail), load_incluster (fail), then manual. + k8s_service = service.KubernetesService() + + # Verify api client is initialized + self.assertIsNotNone(k8s_service._batch_api) + self.assertIsInstance(k8s_service._batch_api, client.BatchV1Api) + + # Verify configuration + config = client.Configuration.get_default_copy() + print(f"Loaded Host: {config.host}") + + # Check that we got a valid https endpoint + self.assertTrue(config.host.startswith("https://")) + self.assertTrue(config.verify_ssl) + self.assertIsNotNone(config.ssl_ca_cert) + + # Verify API key fix is present (Crucial for manual path) + self.assertIn("authorization", config.api_key) + + # Verify hook is present + self.assertIsNotNone(config.refresh_api_key_hook) + + # Verify actual connectivity and auth + print("Attempting to list jobs to verify authentication...") + k8s_service._batch_api.list_namespaced_job(namespace='default', limit=1) + print("Successfully listed jobs.") + + finally: + if old_kubeconfig: + os.environ['KUBECONFIG'] = old_kubeconfig + else: + del os.environ['KUBECONFIG'] + + +if __name__ == '__main__': + unittest.main() diff --git a/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_limit_test.py b/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_limit_test.py new file mode 100644 index 00000000000..f1e15369e56 --- /dev/null +++ b/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_limit_test.py @@ -0,0 +1,85 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for the Kubernetes batch client limit logic.""" + +import unittest +from unittest import mock + +from clusterfuzz._internal.k8s import service +from clusterfuzz._internal.tests.test_libs import test_utils + + +@test_utils.with_cloud_emulators('datastore') +@mock.patch('kubernetes.config.load_kube_config') +class KubernetesServiceLimitTest(unittest.TestCase): + """Tests for the KubernetesService limit logic.""" + + def setUp(self): + patcher = mock.patch( + 'clusterfuzz._internal.k8s.service.KubernetesService._load_gke_credentials' + ) + self.addCleanup(patcher.stop) + self.mock_load_gke = patcher.start() + + # Create a job to prevent KeyError in _get_k8s_job_configs + from clusterfuzz._internal.datastore import data_types + data_types.Job(name='job1', platform='LINUX').put() + + @mock.patch.object(service.KubernetesService, '_get_pending_jobs_count') + def test_create_uworker_main_batch_jobs_limit_not_reached( + self, mock_get_pending_count, _): + """Tests that create_uworker_main_batch_jobs proceeds when limit not reached.""" + mock_get_pending_count.return_value = 99 + kube_service = service.KubernetesService() + + # We expect this to proceed to job creation logic (which we mock to avoid actual creation) + with mock.patch.object(service.KubernetesService, + 'create_kata_container_job') as mock_create: + kube_service.create_uworker_main_batch_jobs( + [service.RemoteTask('fuzz', 'job1', 'url1')]) + self.assertTrue(mock_create.called) + + @mock.patch.object(service.KubernetesService, '_get_pending_jobs_count') + def test_create_uworker_main_batch_jobs_limit_reached( + self, mock_get_pending_count, _): + """Tests that create_uworker_main_batch_jobs nacks tasks when limit reached.""" + mock_get_pending_count.return_value = 100 + kube_service = service.KubernetesService() + + mock_pubsub_task = mock.Mock() + mock_pubsub_task.do_not_ack = False + task = service.RemoteTask( + 'fuzz', 'job1', 'url1', pubsub_task=mock_pubsub_task) + + result = kube_service.create_uworker_main_batch_jobs([task]) + + self.assertEqual(result, []) + self.assertTrue(mock_pubsub_task.do_not_ack) + + @mock.patch.object(service.KubernetesService, '_get_pending_jobs_count') + def test_create_uworker_main_batch_jobs_limit_exceeded( + self, mock_get_pending_count, _): + """Tests that create_uworker_main_batch_jobs nacks tasks when limit exceeded.""" + mock_get_pending_count.return_value = 101 + kube_service = service.KubernetesService() + + mock_pubsub_task = mock.Mock() + mock_pubsub_task.do_not_ack = False + task = service.RemoteTask( + 'fuzz', 'job1', 'url1', pubsub_task=mock_pubsub_task) + + result = kube_service.create_uworker_main_batch_jobs([task]) + + self.assertEqual(result, []) + self.assertTrue(mock_pubsub_task.do_not_ack) diff --git a/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_test.py b/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_test.py index 81d61af7bad..69e622484d2 100644 --- a/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_test.py +++ b/src/clusterfuzz/_internal/tests/core/k8s/k8s_service_test.py @@ -38,11 +38,13 @@ def setUp(self): name='job2', platform='LINUX', environment_string='CUSTOM_VAR = value').put() + @mock.patch.object(service.KubernetesService, '_get_pending_jobs_count') @mock.patch.object(service.KubernetesService, 'create_kata_container_job') @mock.patch.object(service.KubernetesService, 'create_job') - def test_create_uworker_main_batch_jobs(self, mock_create_job, - mock_create_kata_job, _): + def test_create_uworker_main_batch_jobs( + self, mock_create_job, mock_create_kata_job, mock_get_pending_count, _): """Tests the creation of uworker main batch jobs.""" + mock_get_pending_count.return_value = 0 tasks = [ service.RemoteTask('fuzz', 'job1', 'url1'), service.RemoteTask('fuzz', 'job1', 'url2'), @@ -61,20 +63,55 @@ def test_create_uworker_main_batch_jobs(self, mock_create_job, [call.args[1] for call in mock_create_kata_job.call_args_list]) self.assertEqual(urls, ['url1', 'url2', 'url3']) + @mock.patch('kubernetes.client.CoreV1Api') + def test_get_pending_jobs_count(self, mock_core_api_cls, _): + """Tests _get_pending_jobs_count.""" + mock_core_api = mock_core_api_cls.return_value + kube_service = service.KubernetesService() + + # Mock pods + mock_core_api.list_namespaced_pod.return_value.items = [ + mock.Mock(), mock.Mock() + ] + + self.assertEqual(2, kube_service._get_pending_jobs_count()) + mock_core_api.list_namespaced_pod.assert_called_with( + namespace='default', + label_selector='app.kubernetes.io/name=clusterfuzz-kata-job', + field_selector='status.phase=Pending') + + @mock.patch.object(service.KubernetesService, '_get_pending_jobs_count') + def test_create_uworker_main_batch_jobs_limit_reached( + self, mock_get_pending_count, _): + """Tests that create_uworker_main_batch_jobs nacks when limit reached.""" + mock_get_pending_count.return_value = service.MAX_PENDING_JOBS + kube_service = service.KubernetesService() + + mock_pubsub_task = mock.Mock() + mock_pubsub_task.do_not_ack = False + task = service.RemoteTask( + 'fuzz', 'job1', 'url1', pubsub_task=mock_pubsub_task) + + result = kube_service.create_uworker_main_batch_jobs([task]) + self.assertEqual(result, []) + self.assertTrue(mock_pubsub_task.do_not_ack) + @mock.patch('kubernetes.client.BatchV1Api') def test_create_kata_container_job_spec(self, mock_batch_api_cls, _): """Tests that create_kata_container_job generates the correct spec.""" mock_batch_api = mock_batch_api_cls.return_value kube_service = service.KubernetesService() - # Force _batch_api to be our mock (though init usually does it if we patched class before init) - # The patch is applied for this method, so init inside will use the mock class. + # Mock _create_service_account_if_needed + kube_service._create_service_account_if_needed = mock.Mock( + return_value='test') config = service.KubernetesJobConfig( job_type='test-job', docker_image='test-image', command='fuzz', disk_size_gb=10, - service_account_email='email', + service_account_email= + 'test@clusterfuzz-test.iam.gserviceaccount.com', clusterfuzz_release='prod', is_kata=True) @@ -88,9 +125,6 @@ def test_create_kata_container_job_spec(self, mock_batch_api_cls, _): pod_spec = job_body['spec']['template']['spec'] container = pod_spec['containers'][0] - # Check hostNetwork - self.assertTrue(pod_spec['hostNetwork']) - # Check capabilities self.assertEqual(['ALL'], container['securityContext']['capabilities']['add']) @@ -103,6 +137,40 @@ def test_create_kata_container_job_spec(self, mock_batch_api_cls, _): volumes = {v['name']: v for v in pod_spec['volumes']} self.assertEqual('1.9Gi', volumes['dshm']['emptyDir']['sizeLimit']) + # Check Service Account + self.assertEqual('test', pod_spec['serviceAccountName']) + kube_service._create_service_account_if_needed.assert_called_with( + 'test@clusterfuzz-test.iam.gserviceaccount.com') + + @mock.patch('kubernetes.client.BatchV1Api') + def test_create_job(self, mock_batch_api_cls, _): + """Tests create_job.""" + mock_batch_api = mock_batch_api_cls.return_value + kube_service = service.KubernetesService() + kube_service._create_service_account_if_needed = mock.Mock( + return_value='test-sa') + + config = service.KubernetesJobConfig( + job_type='test-job', + docker_image='test-image', + command='fuzz', + disk_size_gb=10, + service_account_email='test-email', + clusterfuzz_release='prod', + is_kata=False) + + kube_service.create_job(config, 'input_url') + + self.assertTrue(mock_batch_api.create_namespaced_job.called) + call_args = mock_batch_api.create_namespaced_job.call_args + job_body = call_args.kwargs['body'] + + # Check Service Account + self.assertEqual('test-sa', + job_body['spec']['template']['spec']['serviceAccountName']) + kube_service._create_service_account_if_needed.assert_called_with( + 'test-email') + @mock.patch( 'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module') @mock.patch.object(service.KubernetesService, diff --git a/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_test.py b/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_test.py new file mode 100644 index 00000000000..3e29ef2cb6e --- /dev/null +++ b/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_test.py @@ -0,0 +1,79 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for remote_task.""" + +import unittest +from unittest import mock + +from clusterfuzz._internal.k8s import service as k8s_service +from clusterfuzz._internal.remote_task import RemoteTask +from clusterfuzz._internal.remote_task import RemoteTaskGate +from clusterfuzz._internal.tests.test_libs import test_utils + + +@test_utils.with_cloud_emulators('datastore') +class RemoteTaskGateTest(unittest.TestCase): + """Tests for RemoteTaskGate.""" + + def setUp(self): + self.gate = RemoteTaskGate() + + @mock.patch( + 'clusterfuzz._internal.remote_task.job_frequency.get_job_frequency') + @mock.patch.object(k8s_service.KubernetesService, + 'create_uworker_main_batch_jobs') + @mock.patch( + 'clusterfuzz._internal.batch.service.GcpBatchService.create_uworker_main_batch_jobs' + ) + def test_create_uworker_main_batch_jobs_k8s_limit_reached( + self, mock_gcp_create, mock_k8s_create, mock_get_frequency): + """Test delegation when K8s limit is reached (handled by service).""" + # Setup tasks to go to Kubernetes + mock_get_frequency.return_value = {'kubernetes': 1.0} + + task = RemoteTask('fuzz', 'job1', 'url1') + + # Simulate K8s service returning empty list (limit reached) + mock_k8s_create.return_value = [] + + result = self.gate.create_uworker_main_batch_jobs([task]) + + # Verify K8s was attempted + self.assertTrue(mock_k8s_create.called) + + # Verify GCP was NOT attempted + self.assertFalse(mock_gcp_create.called) + + # Verify result is empty list + self.assertEqual(result, []) + + @mock.patch( + 'clusterfuzz._internal.remote_task.job_frequency.get_job_frequency') + @mock.patch.object(k8s_service.KubernetesService, + 'create_uworker_main_batch_jobs') + @mock.patch( + 'clusterfuzz._internal.batch.service.GcpBatchService.create_uworker_main_batch_jobs' + ) + def test_create_uworker_main_batch_jobs_success( + self, mock_gcp_create, mock_k8s_create, mock_get_frequency): + """Test successful creation.""" + mock_get_frequency.return_value = {'kubernetes': 1.0} + mock_pubsub_task = mock.Mock() + mock_pubsub_task.do_not_ack = False + task = RemoteTask('fuzz', 'job1', 'url1', pubsub_task=mock_pubsub_task) + + self.gate.create_uworker_main_batch_jobs([task]) + + self.assertTrue(mock_k8s_create.called) + self.assertFalse(mock_pubsub_task.do_not_ack) diff --git a/src/local/butler/scripts/run_remote_task.py b/src/local/butler/scripts/run_remote_task.py index f64b930a719..1e290bae91f 100644 --- a/src/local/butler/scripts/run_remote_task.py +++ b/src/local/butler/scripts/run_remote_task.py @@ -45,7 +45,6 @@ def schedule_utask_mains(): return [] print(f'Combining {len(utask_mains)} batch tasks.') - utask_mains = utask_mains[:5] results = [] with lease_all_tasks(utask_mains): batch_tasks = [ diff --git a/src/python/bot/startup/run_bot.py b/src/python/bot/startup/run_bot.py index 5e80e98f8ac..849609736c8 100644 --- a/src/python/bot/startup/run_bot.py +++ b/src/python/bot/startup/run_bot.py @@ -99,7 +99,7 @@ def schedule_utask_mains(): with lease_all_tasks(utask_mains): batch_tasks = [ - RemoteTask(task.command, task.job, task.argument) + RemoteTask(task.command, task.job, task.argument, pubsub_task=task) for task in utask_mains ] RemoteTaskGate().create_uworker_main_batch_jobs(batch_tasks)