diff --git a/src/clusterfuzz/_internal/base/feature_flags.py b/src/clusterfuzz/_internal/base/feature_flags.py index 07c0dcff878..85eea86056c 100644 --- a/src/clusterfuzz/_internal/base/feature_flags.py +++ b/src/clusterfuzz/_internal/base/feature_flags.py @@ -28,6 +28,8 @@ class FeatureFlags(Enum): K8S_JOBS_FREQUENCY = 'k8s_jobs_frequency' + PREPROCESS_QUEUE_SIZE_LIMIT = 'preprocess_queue_size_limit' + @property def flag(self): """Get the feature flag.""" diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index d003b47ee6a..ecd7a8d5c72 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -14,40 +14,24 @@ """Cron job to schedule fuzz tasks that run on batch.""" import collections -import multiprocessing import random import time -from typing import Dict -from typing import List from google.cloud import monitoring_v3 -from googleapiclient import discovery +from clusterfuzz._internal.base import feature_flags from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils -from clusterfuzz._internal.batch import service as batch -from clusterfuzz._internal.config import local_config from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.datastore import ndb_utils from clusterfuzz._internal.google_cloud_utils import credentials from clusterfuzz._internal.metrics import logs -# TODO(metzman): Actually implement this. -CPUS_PER_FUZZ_JOB = 2 +PREPROCESS_TARGET_SIZE_DEFAULT = 10000 -# Pretend like our CPU limit is 3% higher than it actually is so that we use the -# full CPU capacity even when scheduling is slow. -CPU_BUFFER_MULTIPLIER = 1.03 - -def _get_quotas(creds, project, region): - compute = discovery.build('compute', 'v1', credentials=creds) - return compute.regions().get( # pylint: disable=no-member - region=region, project=project).execute()['quotas'] - - -def count_unacked(creds, project_id, subscription_id): - """Counts the unacked messages in |subscription_id|.""" +def get_queue_size(creds, project_id, subscription_id): + """Returns the size of the queue (unacked messages).""" # TODO(metzman): Not all of these are fuzz_tasks. Deal with that. metric = 'pubsub.googleapis.com/subscription/num_undelivered_messages' query_filter = (f'metric.type="{metric}" AND ' @@ -76,59 +60,15 @@ def count_unacked(creds, project_id, subscription_id): return 0 -def get_cpu_usage(creds, project: str, region: str) -> int: - """Returns the number of available CPUs in the current GCE region.""" - - quotas = _get_quotas(creds, project, region) - - # Sometimes, the preemptible quota is 0, which means the number of preemptible - # CPUs is actually limited by the CPU quota. - # If preemptible quota is not defined, we need to use CPU quota instead. - cpu_quota = None - preemptible_quota = None - - # Get preemptible_quota and cpu_quota from the list of quotas. - for quota in quotas: - if preemptible_quota and cpu_quota: - break - if quota['metric'] == 'CPUS': - cpu_quota = quota - continue - if quota['metric'] == 'PREEMPTIBLE_CPUS': - preemptible_quota = quota - continue - assert preemptible_quota or cpu_quota - - if not preemptible_quota['limit']: - # Preemptible quota is not set. Obey the CPU quota since that limits us. - quota = cpu_quota - else: - quota = preemptible_quota - assert quota['limit'], quota - - # TODO(metzman): Do this in a more configurable way. - # We need this because us-central1 and us-east4 have different numbers of - # cores alloted to us in their quota. Treat them the same to simplify things. - limit = min(quota['limit'], 100_000) - project_config = local_config.ProjectConfig() - # On OSS-Fuzz there is a limit to the number of CPUs we can use. - limit = min(limit, project_config.get('schedule_fuzz.cpu_limit', limit)) - return limit, quota['usage'] - - class BaseFuzzTaskScheduler: """Base fuzz task scheduler for any deployment of ClusterFuzz.""" - def __init__(self, num_cpus): - self.num_cpus = num_cpus + def __init__(self, num_tasks): + self.num_tasks = num_tasks def get_fuzz_tasks(self): raise NotImplementedError('Child class must implement.') - def _get_cpus_per_fuzz_job(self, job_name): - del job_name - return CPUS_PER_FUZZ_JOB - class FuzzTaskCandidate: """Data class that holds more info about FuzzerJobs than the ndb.Models do. @@ -159,7 +99,7 @@ def copy(self): class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler): """Fuzz task scheduler for OSS-Fuzz.""" - def get_fuzz_tasks(self) -> Dict[str, tasks.Task]: + def get_fuzz_tasks(self) -> list[tasks.Task]: # TODO(metzman): Handle high end. # A job's weight is determined by its own weight and the weight of the # project is a part of. First get project weights. @@ -222,12 +162,11 @@ def get_fuzz_tasks(self) -> Dict[str, tasks.Task]: for fuzz_task_candidate in fuzz_task_candidates: weights.append(fuzz_task_candidate.weight) - # TODO(metzman): Handle high-end jobs correctly. - num_instances = int(self.num_cpus / self._get_cpus_per_fuzz_job(None)) - logs.info(f'Scheduling {num_instances} fuzz tasks for OSS-Fuzz.') + fuzz_tasks_count = self.num_tasks + logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for OSS-Fuzz.') choices = random.choices( - fuzz_task_candidates, weights=weights, k=num_instances) + fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) fuzz_tasks = [ tasks.Task( 'fuzz', @@ -245,13 +184,10 @@ def get_fuzz_tasks(self) -> Dict[str, tasks.Task]: class ChromeFuzzTaskScheduler(BaseFuzzTaskScheduler): """Fuzz task scheduler for Chrome.""" - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.num_cpus = respect_project_max_cpus(self.num_cpus) - - def get_fuzz_tasks(self) -> List[tasks.Task]: + def get_fuzz_tasks(self) -> list[tasks.Task]: """Returns fuzz tasks for chrome, weighted by job weight.""" logs.info('Getting jobs for Chrome.') + candidates_by_job = {} # Only consider linux jobs for chrome fuzzing. job_query = data_types.Job.query(data_types.Job.platform == 'LINUX') @@ -276,14 +212,14 @@ def get_fuzz_tasks(self) -> List[tasks.Task]: fuzz_task_candidates.append(fuzz_task_candidate) weights = [candidate.weight for candidate in fuzz_task_candidates] - num_instances = int(self.num_cpus / self._get_cpus_per_fuzz_job(None)) - logs.info(f'Scheduling {num_instances} fuzz tasks for Chrome.') + fuzz_tasks_count = self.num_tasks + logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for Chrome.') if not fuzz_task_candidates: return [] choices = random.choices( - fuzz_task_candidates, weights=weights, k=num_instances) + fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) fuzz_tasks = [ tasks.Task( 'fuzz', @@ -295,101 +231,38 @@ def get_fuzz_tasks(self) -> List[tasks.Task]: return fuzz_tasks -def get_fuzz_tasks(available_cpus: int) -> [tasks.Task]: +def get_fuzz_tasks(num_tasks: int) -> list[tasks.Task]: if utils.is_oss_fuzz(): - scheduler = OssfuzzFuzzTaskScheduler(available_cpus) + scheduler = OssfuzzFuzzTaskScheduler(num_tasks) else: - scheduler = ChromeFuzzTaskScheduler(available_cpus) + scheduler = ChromeFuzzTaskScheduler(num_tasks) fuzz_tasks = scheduler.get_fuzz_tasks() return fuzz_tasks -def get_batch_regions(batch_config): - fuzz_subconf_names = { - subconf['name'] for subconf in batch_config.get( - 'mapping.LINUX-PREEMPTIBLE-UNPRIVILEGED.subconfigs') - } - - subconfs = batch_config.get('subconfigs') - return list( - set(subconfs[subconf]['region'] - for subconf in subconfs - if subconf in fuzz_subconf_names)) - +def schedule_fuzz_tasks() -> bool: + """Schedules fuzz tasks.""" -def get_available_cpus(project: str, regions: List[str]) -> int: - """Returns the available CPUs for fuzz tasks.""" - # TODO(metzman): This doesn't distinguish between fuzz and non-fuzz - # tasks (nor preemptible and non-preemptible CPUs). Fix this. - # Get total scheduled and queued. + project = utils.get_application_id() + start = time.time() creds = credentials.get_default()[0] + preprocess_queue_size = get_queue_size(creds, project, tasks.PREPROCESS_QUEUE) - target = 0 - usage = 0 - for region in regions: - region_target, region_usage = get_cpu_usage(creds, project, region) - target += region_target - usage += region_usage - waiting_tasks = ( - count_unacked(creds, project, 'preprocess') + count_unacked( - creds, project, 'utask_main')) - - if usage + waiting_tasks * CPUS_PER_FUZZ_JOB > .95 * target: - # Only worry about queueing build up if we are above 95% utilization. - count_args = ((project, region) for region in regions) - with multiprocessing.Pool(2) as pool: - target *= CPU_BUFFER_MULTIPLIER - # These calls are extremely slow (about 30 minutes total). - result = pool.starmap_async( # pylint: disable=no-member - batch.count_queued_or_scheduled_tasks, count_args) - - region_counts = zip(*result.get()) # Group all queued and all scheduled. - # Add up all queued and scheduled. - region_counts = [sum(tup) for tup in region_counts] - logs.info(f'QUEUED/SCHEDULED tasks per region: {region_counts}') - if region_counts[0] > 10_000: - # Check queued tasks. - logs.info('Too many jobs queued, not scheduling more fuzzing.') - return 0 - waiting_tasks += sum(region_counts) # Add up queued and scheduled. + target_size_flag = feature_flags.FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT + if target_size_flag and target_size_flag.enabled: + target_size = int(target_size_flag.value) else: - logs.info('Skipping getting tasks.') - - occupied_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB + usage - logs.info(f'Soon or currently occupied CPUs: {occupied_cpus}') - - logs.info(f'Target number CPUs: {target}') - available_cpus = max(target - occupied_cpus, 0) - logs.info(f'Available CPUs: {available_cpus}') + target_size = PREPROCESS_TARGET_SIZE_DEFAULT - # Don't schedule more than 50K tasks at once. So we don't overload batch. - # This number is arbitrary, but we aren't at full capacity at lower numbers. - available_cpus = min(available_cpus, 50_000 * len(regions)) + num_tasks = target_size - preprocess_queue_size + logs.info(f'Preprocess queue size: {preprocess_queue_size}. ' + f'Target: {target_size}. Needed: {num_tasks}.') - return available_cpus - - -def respect_project_max_cpus(num_cpus): - conf = local_config.ProjectConfig() - max_cpus_per_schedule = conf.get('max_cpus_per_schedule') - if max_cpus_per_schedule is None: - return num_cpus - return min(max_cpus_per_schedule, num_cpus) - - -def schedule_fuzz_tasks() -> bool: - """Schedules fuzz tasks.""" - multiprocessing.set_start_method('spawn') - batch_config = local_config.BatchConfig() - project = batch_config.get('project') - regions = get_batch_regions(batch_config) - start = time.time() - available_cpus = get_available_cpus(project, regions) - logs.info(f'{available_cpus} available CPUs.') - if not available_cpus: + if num_tasks <= 0: + logs.info('Queue size met or exceeded. Not scheduling tasks.') return False - fuzz_tasks = get_fuzz_tasks(available_cpus) + fuzz_tasks = get_fuzz_tasks(num_tasks) if not fuzz_tasks: logs.error('No fuzz tasks found to schedule.') return False diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index 8b960d9977e..ae23d78fd07 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -17,8 +17,6 @@ from clusterfuzz._internal.cron import schedule_fuzz from clusterfuzz._internal.datastore import data_types -from clusterfuzz._internal.google_cloud_utils import credentials -from clusterfuzz._internal.tests.test_libs import helpers as test_helpers from clusterfuzz._internal.tests.test_libs import test_utils # pylint: disable=protected-access @@ -68,8 +66,8 @@ def test_get_fuzz_tasks(self): data_types.OssFuzzProject(name=project_name).put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - num_cpus = 10 - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_cpus) + num_tasks = 5 + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks) tasks = scheduler.get_fuzz_tasks() comparable_results = [] for task in tasks: @@ -110,7 +108,7 @@ def test_os_version_precedence_project_over_job(self): name=project_name, base_os_version='project-version').put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_cpus=2) + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) tasks = scheduler.get_fuzz_tasks() self.assertEqual(len(tasks), 1) task = tasks[0] @@ -149,7 +147,7 @@ def test_os_version_fallback_to_job(self): data_types.OssFuzzProject(name=project_name).put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_cpus=2) + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) tasks = scheduler.get_fuzz_tasks() self.assertEqual(len(tasks), 1) task = tasks[0] @@ -188,7 +186,7 @@ def test_os_version_no_version(self): data_types.OssFuzzProject(name=project_name).put() data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put() - scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_cpus=2) + scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1) tasks = scheduler.get_fuzz_tasks() self.assertEqual(len(tasks), 1) task = tasks[0] @@ -218,7 +216,7 @@ def _setup_chrome_entities(self, job_os_version=None): def _run_and_get_task(self): """Runs the scheduler and returns the single task created.""" - scheduler = schedule_fuzz.ChromeFuzzTaskScheduler(num_cpus=2) + scheduler = schedule_fuzz.ChromeFuzzTaskScheduler(num_tasks=1) tasks = scheduler.get_fuzz_tasks() self.assertEqual(len(tasks), 1) return tasks[0] @@ -234,55 +232,3 @@ def test_os_version_job_without_version(self): self._setup_chrome_entities() task = self._run_and_get_task() self.assertIsNone(task.extra_info.get('base_os_version')) - - -class TestGetCpuUsage(unittest.TestCase): - """Tests for get_cpu_limit_for_regions.""" - - def setUp(self): - test_helpers.patch(self, [ - 'clusterfuzz._internal.cron.schedule_fuzz._get_quotas', - 'clusterfuzz._internal.config.local_config.ProjectConfig.get' - ]) - self.creds = credentials.get_default() - - def test_usage(self): - """Tests that get_cpu_limit_for_regions handles usage properly.""" - self.mock.get.return_value = 100_000 - self.mock._get_quotas.return_value = [{ - 'metric': 'PREEMPTIBLE_CPUS', - 'limit': 5, - 'usage': 2 - }] - self.assertEqual( - schedule_fuzz.get_cpu_usage(self.creds, 'project', 'region'), (5, 2)) - - def test_cpus_and_preemptible_cpus(self): - """Tests that get_cpu_limit_for_regions handles usage properly.""" - self.mock.get.return_value = 100_000 - self.mock._get_quotas.return_value = [{ - 'metric': 'PREEMPTIBLE_CPUS', - 'limit': 5, - 'usage': 0 - }, { - 'metric': 'CPUS', - 'limit': 5, - 'usage': 5 - }] - self.assertEqual( - schedule_fuzz.get_cpu_usage(self.creds, 'region', 'project'), (5, 0)) - - def test_config_limit(self): - """Tests that the config limit is used.""" - self.mock.get.return_value = 2 - self.mock._get_quotas.return_value = [{ - 'metric': 'PREEMPTIBLE_CPUS', - 'limit': 5, - 'usage': 0 - }, { - 'metric': 'CPUS', - 'limit': 5, - 'usage': 5 - }] - self.assertEqual( - schedule_fuzz.get_cpu_usage(self.creds, 'region', 'project'), (2, 0))