-
Notifications
You must be signed in to change notification settings - Fork 617
schedule fuzz based on preprocess queue size #5153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,40 +14,24 @@ | |
| """Cron job to schedule fuzz tasks that run on batch.""" | ||
|
|
||
| import collections | ||
| import multiprocessing | ||
| import random | ||
| import time | ||
| from typing import Dict | ||
| from typing import List | ||
|
|
||
| from google.cloud import monitoring_v3 | ||
| from googleapiclient import discovery | ||
|
|
||
| from clusterfuzz._internal.base import feature_flags | ||
| from clusterfuzz._internal.base import tasks | ||
| from clusterfuzz._internal.base import utils | ||
| from clusterfuzz._internal.batch import service as batch | ||
| from clusterfuzz._internal.config import local_config | ||
| from clusterfuzz._internal.datastore import data_types | ||
| from clusterfuzz._internal.datastore import ndb_utils | ||
| from clusterfuzz._internal.google_cloud_utils import credentials | ||
| from clusterfuzz._internal.metrics import logs | ||
|
|
||
| # TODO(metzman): Actually implement this. | ||
| CPUS_PER_FUZZ_JOB = 2 | ||
| PREPROCESS_TARGET_SIZE_DEFAULT = 10000 | ||
|
|
||
| # Pretend like our CPU limit is 3% higher than it actually is so that we use the | ||
| # full CPU capacity even when scheduling is slow. | ||
| CPU_BUFFER_MULTIPLIER = 1.03 | ||
|
|
||
|
|
||
| def _get_quotas(creds, project, region): | ||
| compute = discovery.build('compute', 'v1', credentials=creds) | ||
| return compute.regions().get( # pylint: disable=no-member | ||
| region=region, project=project).execute()['quotas'] | ||
|
|
||
|
|
||
| def count_unacked(creds, project_id, subscription_id): | ||
| """Counts the unacked messages in |subscription_id|.""" | ||
| def get_queue_size(creds, project_id, subscription_id): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's probably worth notiing somewhere that the queue size metric is delayed by about 5 minutes.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I got your point.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I mean that we should mention the need for delays. If the cron runs too often, it might check the queue size before the metric has actually updated to reflect the jobs it just added. Does this make sense?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good point. But the idea is to tweak this by using the feature flags to have the balance on the size of the queue and the frequency of the cron execution.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if I understood it correctly, the point is to not run |
||
| """Returns the size of the queue (unacked messages).""" | ||
| # TODO(metzman): Not all of these are fuzz_tasks. Deal with that. | ||
| metric = 'pubsub.googleapis.com/subscription/num_undelivered_messages' | ||
| query_filter = (f'metric.type="{metric}" AND ' | ||
|
|
@@ -76,59 +60,15 @@ def count_unacked(creds, project_id, subscription_id): | |
| return 0 | ||
|
|
||
|
|
||
| def get_cpu_usage(creds, project: str, region: str) -> int: | ||
| """Returns the number of available CPUs in the current GCE region.""" | ||
|
|
||
| quotas = _get_quotas(creds, project, region) | ||
|
|
||
| # Sometimes, the preemptible quota is 0, which means the number of preemptible | ||
| # CPUs is actually limited by the CPU quota. | ||
| # If preemptible quota is not defined, we need to use CPU quota instead. | ||
| cpu_quota = None | ||
| preemptible_quota = None | ||
|
|
||
| # Get preemptible_quota and cpu_quota from the list of quotas. | ||
| for quota in quotas: | ||
| if preemptible_quota and cpu_quota: | ||
| break | ||
| if quota['metric'] == 'CPUS': | ||
| cpu_quota = quota | ||
| continue | ||
| if quota['metric'] == 'PREEMPTIBLE_CPUS': | ||
| preemptible_quota = quota | ||
| continue | ||
| assert preemptible_quota or cpu_quota | ||
|
|
||
| if not preemptible_quota['limit']: | ||
| # Preemptible quota is not set. Obey the CPU quota since that limits us. | ||
| quota = cpu_quota | ||
| else: | ||
| quota = preemptible_quota | ||
| assert quota['limit'], quota | ||
|
|
||
| # TODO(metzman): Do this in a more configurable way. | ||
| # We need this because us-central1 and us-east4 have different numbers of | ||
| # cores alloted to us in their quota. Treat them the same to simplify things. | ||
| limit = min(quota['limit'], 100_000) | ||
| project_config = local_config.ProjectConfig() | ||
| # On OSS-Fuzz there is a limit to the number of CPUs we can use. | ||
| limit = min(limit, project_config.get('schedule_fuzz.cpu_limit', limit)) | ||
| return limit, quota['usage'] | ||
|
|
||
|
|
||
| class BaseFuzzTaskScheduler: | ||
| """Base fuzz task scheduler for any deployment of ClusterFuzz.""" | ||
|
|
||
| def __init__(self, num_cpus): | ||
| self.num_cpus = num_cpus | ||
| def __init__(self, num_tasks): | ||
| self.num_tasks = num_tasks | ||
|
|
||
| def get_fuzz_tasks(self): | ||
| raise NotImplementedError('Child class must implement.') | ||
|
|
||
| def _get_cpus_per_fuzz_job(self, job_name): | ||
| del job_name | ||
| return CPUS_PER_FUZZ_JOB | ||
|
|
||
|
|
||
| class FuzzTaskCandidate: | ||
| """Data class that holds more info about FuzzerJobs than the ndb.Models do. | ||
|
|
@@ -159,7 +99,7 @@ def copy(self): | |
| class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler): | ||
| """Fuzz task scheduler for OSS-Fuzz.""" | ||
|
|
||
| def get_fuzz_tasks(self) -> Dict[str, tasks.Task]: | ||
| def get_fuzz_tasks(self) -> list[tasks.Task]: | ||
| # TODO(metzman): Handle high end. | ||
| # A job's weight is determined by its own weight and the weight of the | ||
| # project is a part of. First get project weights. | ||
|
|
@@ -222,12 +162,11 @@ def get_fuzz_tasks(self) -> Dict[str, tasks.Task]: | |
| for fuzz_task_candidate in fuzz_task_candidates: | ||
| weights.append(fuzz_task_candidate.weight) | ||
|
|
||
| # TODO(metzman): Handle high-end jobs correctly. | ||
| num_instances = int(self.num_cpus / self._get_cpus_per_fuzz_job(None)) | ||
| logs.info(f'Scheduling {num_instances} fuzz tasks for OSS-Fuzz.') | ||
| fuzz_tasks_count = self.num_tasks | ||
| logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for OSS-Fuzz.') | ||
|
|
||
| choices = random.choices( | ||
| fuzz_task_candidates, weights=weights, k=num_instances) | ||
| fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) | ||
| fuzz_tasks = [ | ||
| tasks.Task( | ||
| 'fuzz', | ||
|
|
@@ -245,13 +184,10 @@ def get_fuzz_tasks(self) -> Dict[str, tasks.Task]: | |
| class ChromeFuzzTaskScheduler(BaseFuzzTaskScheduler): | ||
| """Fuzz task scheduler for Chrome.""" | ||
|
|
||
| def __init__(self, *args, **kwargs): | ||
| super().__init__(*args, **kwargs) | ||
| self.num_cpus = respect_project_max_cpus(self.num_cpus) | ||
|
|
||
| def get_fuzz_tasks(self) -> List[tasks.Task]: | ||
| def get_fuzz_tasks(self) -> list[tasks.Task]: | ||
| """Returns fuzz tasks for chrome, weighted by job weight.""" | ||
| logs.info('Getting jobs for Chrome.') | ||
|
|
||
| candidates_by_job = {} | ||
| # Only consider linux jobs for chrome fuzzing. | ||
| job_query = data_types.Job.query(data_types.Job.platform == 'LINUX') | ||
|
|
@@ -276,14 +212,14 @@ def get_fuzz_tasks(self) -> List[tasks.Task]: | |
| fuzz_task_candidates.append(fuzz_task_candidate) | ||
|
|
||
| weights = [candidate.weight for candidate in fuzz_task_candidates] | ||
| num_instances = int(self.num_cpus / self._get_cpus_per_fuzz_job(None)) | ||
| logs.info(f'Scheduling {num_instances} fuzz tasks for Chrome.') | ||
| fuzz_tasks_count = self.num_tasks | ||
| logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for Chrome.') | ||
|
|
||
| if not fuzz_task_candidates: | ||
| return [] | ||
|
|
||
| choices = random.choices( | ||
| fuzz_task_candidates, weights=weights, k=num_instances) | ||
| fuzz_task_candidates, weights=weights, k=fuzz_tasks_count) | ||
| fuzz_tasks = [ | ||
| tasks.Task( | ||
| 'fuzz', | ||
|
|
@@ -295,101 +231,38 @@ def get_fuzz_tasks(self) -> List[tasks.Task]: | |
| return fuzz_tasks | ||
|
|
||
|
|
||
| def get_fuzz_tasks(available_cpus: int) -> [tasks.Task]: | ||
| def get_fuzz_tasks(num_tasks: int) -> list[tasks.Task]: | ||
| if utils.is_oss_fuzz(): | ||
| scheduler = OssfuzzFuzzTaskScheduler(available_cpus) | ||
| scheduler = OssfuzzFuzzTaskScheduler(num_tasks) | ||
| else: | ||
| scheduler = ChromeFuzzTaskScheduler(available_cpus) | ||
| scheduler = ChromeFuzzTaskScheduler(num_tasks) | ||
| fuzz_tasks = scheduler.get_fuzz_tasks() | ||
| return fuzz_tasks | ||
|
|
||
|
|
||
| def get_batch_regions(batch_config): | ||
| fuzz_subconf_names = { | ||
| subconf['name'] for subconf in batch_config.get( | ||
| 'mapping.LINUX-PREEMPTIBLE-UNPRIVILEGED.subconfigs') | ||
| } | ||
|
|
||
| subconfs = batch_config.get('subconfigs') | ||
| return list( | ||
| set(subconfs[subconf]['region'] | ||
| for subconf in subconfs | ||
| if subconf in fuzz_subconf_names)) | ||
|
|
||
| def schedule_fuzz_tasks() -> bool: | ||
| """Schedules fuzz tasks.""" | ||
|
|
||
| def get_available_cpus(project: str, regions: List[str]) -> int: | ||
| """Returns the available CPUs for fuzz tasks.""" | ||
| # TODO(metzman): This doesn't distinguish between fuzz and non-fuzz | ||
| # tasks (nor preemptible and non-preemptible CPUs). Fix this. | ||
| # Get total scheduled and queued. | ||
| project = utils.get_application_id() | ||
| start = time.time() | ||
| creds = credentials.get_default()[0] | ||
| preprocess_queue_size = get_queue_size(creds, project, tasks.PREPROCESS_QUEUE) | ||
|
|
||
| target = 0 | ||
| usage = 0 | ||
| for region in regions: | ||
| region_target, region_usage = get_cpu_usage(creds, project, region) | ||
| target += region_target | ||
| usage += region_usage | ||
| waiting_tasks = ( | ||
| count_unacked(creds, project, 'preprocess') + count_unacked( | ||
| creds, project, 'utask_main')) | ||
|
|
||
| if usage + waiting_tasks * CPUS_PER_FUZZ_JOB > .95 * target: | ||
| # Only worry about queueing build up if we are above 95% utilization. | ||
| count_args = ((project, region) for region in regions) | ||
| with multiprocessing.Pool(2) as pool: | ||
| target *= CPU_BUFFER_MULTIPLIER | ||
| # These calls are extremely slow (about 30 minutes total). | ||
| result = pool.starmap_async( # pylint: disable=no-member | ||
| batch.count_queued_or_scheduled_tasks, count_args) | ||
|
|
||
| region_counts = zip(*result.get()) # Group all queued and all scheduled. | ||
| # Add up all queued and scheduled. | ||
| region_counts = [sum(tup) for tup in region_counts] | ||
| logs.info(f'QUEUED/SCHEDULED tasks per region: {region_counts}') | ||
| if region_counts[0] > 10_000: | ||
| # Check queued tasks. | ||
| logs.info('Too many jobs queued, not scheduling more fuzzing.') | ||
| return 0 | ||
| waiting_tasks += sum(region_counts) # Add up queued and scheduled. | ||
| target_size_flag = feature_flags.FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT | ||
| if target_size_flag and target_size_flag.enabled: | ||
| target_size = int(target_size_flag.value) | ||
| else: | ||
| logs.info('Skipping getting tasks.') | ||
|
|
||
| occupied_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB + usage | ||
| logs.info(f'Soon or currently occupied CPUs: {occupied_cpus}') | ||
|
|
||
| logs.info(f'Target number CPUs: {target}') | ||
| available_cpus = max(target - occupied_cpus, 0) | ||
| logs.info(f'Available CPUs: {available_cpus}') | ||
| target_size = PREPROCESS_TARGET_SIZE_DEFAULT | ||
|
|
||
| # Don't schedule more than 50K tasks at once. So we don't overload batch. | ||
| # This number is arbitrary, but we aren't at full capacity at lower numbers. | ||
| available_cpus = min(available_cpus, 50_000 * len(regions)) | ||
| num_tasks = target_size - preprocess_queue_size | ||
| logs.info(f'Preprocess queue size: {preprocess_queue_size}. ' | ||
| f'Target: {target_size}. Needed: {num_tasks}.') | ||
|
|
||
| return available_cpus | ||
|
|
||
|
|
||
| def respect_project_max_cpus(num_cpus): | ||
| conf = local_config.ProjectConfig() | ||
| max_cpus_per_schedule = conf.get('max_cpus_per_schedule') | ||
| if max_cpus_per_schedule is None: | ||
| return num_cpus | ||
| return min(max_cpus_per_schedule, num_cpus) | ||
|
|
||
|
|
||
| def schedule_fuzz_tasks() -> bool: | ||
| """Schedules fuzz tasks.""" | ||
| multiprocessing.set_start_method('spawn') | ||
| batch_config = local_config.BatchConfig() | ||
| project = batch_config.get('project') | ||
| regions = get_batch_regions(batch_config) | ||
| start = time.time() | ||
| available_cpus = get_available_cpus(project, regions) | ||
| logs.info(f'{available_cpus} available CPUs.') | ||
| if not available_cpus: | ||
| if num_tasks <= 0: | ||
| logs.info('Queue size met or exceeded. Not scheduling tasks.') | ||
| return False | ||
|
|
||
| fuzz_tasks = get_fuzz_tasks(available_cpus) | ||
| fuzz_tasks = get_fuzz_tasks(num_tasks) | ||
| if not fuzz_tasks: | ||
| logs.error('No fuzz tasks found to schedule.') | ||
| return False | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did you reach this number? Shouldn't the default depend on the project config(external, internal, ...)?
Also, maybe we should create a rationale to define this number based on the amount of tworkers + batch/k8s quota
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its an arbitrary default number, it will be controlled through the feature flag. And indeed we will tweak each project based on on the fleet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think that regardless of the feature flag, we should have a default defined on the project config (similar to what was done with the cpu count)