Skip to content
2 changes: 2 additions & 0 deletions src/clusterfuzz/_internal/base/feature_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class FeatureFlags(Enum):
PREPROCESS_QUEUE_SIZE_LIMIT = 'preprocess_queue_size_limit'

SWARMING_REMOTE_EXECUTION = 'swarming_remote_execution'
# TODO(b/516630567): Set this value based off dev & stage metrics and tests.
SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT = 'swarming_preprocess_queue_size_limit'

@property
def flag(self):
Expand Down
5 changes: 5 additions & 0 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ def get_task_duration(command):
UTASK_MAIN_QUEUE = 'utask_main'
PREPROCESS_QUEUE = 'preprocess'

SWARMING_QUEUES = {
PREPROCESS_QUEUE: 'preprocess-swarming',
UTASK_MAIN_QUEUE: 'utask_main-swarming',
}

# See https://github.com/google/clusterfuzz/issues/3347 for usage
SUBQUEUE_IDENTIFIER = ':'

Expand Down
210 changes: 138 additions & 72 deletions src/clusterfuzz/_internal/cron/schedule_fuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,55 @@
# limitations under the License.
"""Cron job to schedule fuzz tasks that run on batch."""

from abc import ABC
from abc import abstractmethod
import collections
from dataclasses import dataclass
import random
import time

from google.cloud import monitoring_v3

from clusterfuzz._internal.base import feature_flags
from clusterfuzz._internal import swarming
from clusterfuzz._internal.base import memoize
from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.base.feature_flags import FeatureFlags
from clusterfuzz._internal.datastore import data_types
from clusterfuzz._internal.datastore import ndb_utils
from clusterfuzz._internal.google_cloud_utils import credentials
from clusterfuzz._internal.metrics import logs

PREPROCESS_TARGET_SIZE_DEFAULT = 10000
SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT = 10


@dataclass
Comment thread
IvanBM18 marked this conversation as resolved.
class Queue:
"""Data class that holds information about a pub/sub queue.

Attributes:
name: The name of the Pub/Sub subscription associated with the queue.
default_target_size: Number of tasks that should be kept in the queue.
target_size_flag: Feature flag used to override the default target size.
"""

name: str
Comment thread
IvanBM18 marked this conversation as resolved.
default_target_size: int
target_size_flag: FeatureFlags


_DEFAULT_QUEUE = Queue(
name=tasks.PREPROCESS_QUEUE,
default_target_size=PREPROCESS_TARGET_SIZE_DEFAULT,
target_size_flag=FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT,
)

_SWARMING_QUEUE = Queue(
name=tasks.SWARMING_QUEUES[tasks.PREPROCESS_QUEUE],
default_target_size=SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT,
target_size_flag=FeatureFlags.SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT,
)


@memoize.wrap(memoize.InMemory(60))
Expand Down Expand Up @@ -62,14 +95,12 @@ def get_queue_size(creds, project_id, subscription_id):
return 0


class BaseFuzzTaskScheduler:
"""Base fuzz task scheduler for any deployment of ClusterFuzz."""
class BaseFuzzTaskProvider(ABC):
"""Base fuzz task provider for any deployment of ClusterFuzz."""

def __init__(self, num_tasks):
self.num_tasks = num_tasks

def get_fuzz_tasks(self):
raise NotImplementedError('Child class must implement.')
@abstractmethod
def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]:
"""Returns a list of fuzz tasks."""


class FuzzTaskCandidate:
Expand Down Expand Up @@ -98,10 +129,10 @@ def copy(self):
base_os_version=self.base_os_version)


class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler):
"""Fuzz task scheduler for OSS-Fuzz."""
class OssfuzzFuzzTaskProvider(BaseFuzzTaskProvider):
"""Fuzz task provider for OSS-Fuzz."""

def get_fuzz_tasks(self) -> list[tasks.Task]:
def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]:
# TODO(metzman): Handle high end.
# A job's weight is determined by its own weight and the weight of the
# project is a part of. First get project weights.
Expand Down Expand Up @@ -164,11 +195,9 @@ def get_fuzz_tasks(self) -> list[tasks.Task]:
for fuzz_task_candidate in fuzz_task_candidates:
weights.append(fuzz_task_candidate.weight)

fuzz_tasks_count = self.num_tasks
logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for OSS-Fuzz.')
logs.info(f'Scheduling {num_tasks} fuzz tasks for OSS-Fuzz.')

choices = random.choices(
fuzz_task_candidates, weights=weights, k=fuzz_tasks_count)
choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks)
fuzz_tasks = [
tasks.Task(
'fuzz',
Expand All @@ -183,45 +212,25 @@ def get_fuzz_tasks(self) -> list[tasks.Task]:
return fuzz_tasks


class ChromeFuzzTaskScheduler(BaseFuzzTaskScheduler):
"""Fuzz task scheduler for Chrome."""
class ChromeFuzzTaskProvider(BaseFuzzTaskProvider):
"""Fuzz task provider for Chrome."""

def get_fuzz_tasks(self) -> list[tasks.Task]:
"""Returns fuzz tasks for chrome, weighted by job weight."""
logs.info('Getting jobs for Chrome.')
_candidates: list[FuzzTaskCandidate]

candidates_by_job = {}
# Only consider linux jobs for chrome fuzzing.
job_query = data_types.Job.query(data_types.Job.platform == 'LINUX')
for job in ndb_utils.get_all_from_query(job_query):
base_os_version = None
if job.base_os_version:
base_os_version = job.base_os_version
def __init__(self, jobs: list[data_types.Job]):
self._candidates = _create_candidates_from_jobs(jobs)

candidates_by_job[job.name] = FuzzTaskCandidate(
job=job.name, project=job.project, base_os_version=base_os_version)

fuzz_task_candidates = []
fuzzer_job_query = ndb_utils.get_all_from_query(
data_types.FuzzerJob.query())

for fuzzer_job in fuzzer_job_query:
if fuzzer_job.job not in candidates_by_job:
continue
fuzz_task_candidate = candidates_by_job[fuzzer_job.job].copy()
fuzz_task_candidate.fuzzer = fuzzer_job.fuzzer
fuzz_task_candidate.weight = fuzzer_job.actual_weight
fuzz_task_candidates.append(fuzz_task_candidate)
def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]:
"""Returns fuzz tasks for chrome, weighted by job weight."""
logs.info('Getting jobs for Chrome.')

weights = [candidate.weight for candidate in fuzz_task_candidates]
fuzz_tasks_count = self.num_tasks
logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for Chrome.')
weights = [candidate.weight for candidate in self._candidates]
logs.info(f'Scheduling {num_tasks} fuzz tasks for Chrome.')

if not fuzz_task_candidates:
if not self._candidates:
return []

choices = random.choices(
fuzz_task_candidates, weights=weights, k=fuzz_tasks_count)
choices = random.choices(self._candidates, weights=weights, k=num_tasks)
fuzz_tasks = [
tasks.Task(
'fuzz',
Expand All @@ -233,50 +242,107 @@ def get_fuzz_tasks(self) -> list[tasks.Task]:
return fuzz_tasks


def get_fuzz_tasks(num_tasks: int) -> list[tasks.Task]:
if utils.is_oss_fuzz():
scheduler = OssfuzzFuzzTaskScheduler(num_tasks)
else:
scheduler = ChromeFuzzTaskScheduler(num_tasks)
fuzz_tasks = scheduler.get_fuzz_tasks()
return fuzz_tasks
def _get_jobs_for_platforms(platforms: list[str]) -> list[data_types.Job]:
"""Returns all jobs for the given platforms."""
return list(data_types.Job.query(data_types.Job.platform.IN(platforms)))


def _get_swarming_jobs():
"""Returns all jobs that have swarming environment variables."""
jobs = _get_jobs_for_platforms(['ANDROID', 'LINUX'])
return [
job for job in jobs
if swarming.has_swarming_env_vars(job.get_environment())
]
Comment thread
IvanBM18 marked this conversation as resolved.

def schedule_fuzz_tasks() -> bool:
"""Schedules fuzz tasks."""

def _remaining_queue_capacity(queue: Queue) -> int:
"""Returns the remaining capacity of the given queue."""
project = utils.get_application_id()
start = time.time()
creds = credentials.get_default()[0]
preprocess_queue_size = get_queue_size(creds, project, tasks.PREPROCESS_QUEUE)
preprocess_queue_size = get_queue_size(creds, project, queue.name)

target_size = PREPROCESS_TARGET_SIZE_DEFAULT
target_size_flag = feature_flags.FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT
if target_size_flag.enabled and target_size_flag.content:
target_size = int(target_size_flag.content)
target_size = queue.default_target_size
if queue.target_size_flag.enabled and queue.target_size_flag.content:
target_size = int(queue.target_size_flag.content)

num_tasks = target_size - preprocess_queue_size
logs.info(f'Preprocess queue size: {preprocess_queue_size}. '
logs.info(f'Queue {queue.name} size: {preprocess_queue_size}. '
f'Target: {target_size}. Needed: {num_tasks}.')

return num_tasks


def _fill_queue(queue: Queue, provider: BaseFuzzTaskProvider):
"""Fills the given queue with tasks from the provider."""
start = time.time()
num_tasks = _remaining_queue_capacity(queue)

if num_tasks <= 0:
logs.info('Queue size met or exceeded. Not scheduling tasks.')
return False
return

fuzz_tasks = get_fuzz_tasks(num_tasks)
fuzz_tasks = provider.get_fuzz_tasks(num_tasks)
if not fuzz_tasks:
logs.error('No fuzz tasks found to schedule.')
return False
logs.error(f'No fuzz tasks found to schedule in queue {queue.name}.')
return

logs.info(f'Adding {fuzz_tasks} to preprocess queue.')
tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True)
logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.')
logs.info(f'Adding {len(fuzz_tasks)} tasks to queue {queue.name}.')
tasks.bulk_add_tasks(fuzz_tasks, queue=queue.name, eta_now=True)
logs.info(f'Scheduled {len(fuzz_tasks)} tasks on queue {queue.name}.')

end = time.time()
total = end - start
logs.info(f'Task scheduling took {total} seconds.')
return True


def _create_candidates_from_jobs(
jobs: list[data_types.Job]) -> list[FuzzTaskCandidate]:
"""Create candidates from jobs & assign weights to them."""
if not jobs:
return []

jobs_by_name = {job.name: job for job in jobs}
fuzzer_job_query = ndb_utils.get_all_from_query(
data_types.FuzzerJob.query(
data_types.FuzzerJob.job.IN(list(jobs_by_name.keys()))))
fuzz_task_candidates = []

for fuzzer_job in fuzzer_job_query:
job = jobs_by_name[fuzzer_job.job]
fuzz_task_candidate = FuzzTaskCandidate(
job=job.name,
project=job.project,
base_os_version=job.base_os_version,
fuzzer=fuzzer_job.fuzzer,
weight=fuzzer_job.actual_weight,
)
fuzz_task_candidates.append(fuzz_task_candidate)

return fuzz_task_candidates


def schedule_chrome_fuzz_tasks():
"""Schedules fuzz tasks for Chrome."""
default_jobs = _get_jobs_for_platforms(['LINUX'])
default_provider = ChromeFuzzTaskProvider(default_jobs)
_fill_queue(_DEFAULT_QUEUE, default_provider)

if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled:
return

swarming_jobs = _get_swarming_jobs()
swarming_provider = ChromeFuzzTaskProvider(swarming_jobs)
_fill_queue(_SWARMING_QUEUE, swarming_provider)


def schedule_fuzz_tasks():
"""Schedules fuzz tasks based on deployment type."""
if utils.is_oss_fuzz():
_fill_queue(_DEFAULT_QUEUE, OssfuzzFuzzTaskProvider())
else:
schedule_chrome_fuzz_tasks()


def main():
return schedule_fuzz_tasks()
schedule_fuzz_tasks()
11 changes: 8 additions & 3 deletions src/clusterfuzz/_internal/swarming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@
]


def has_swarming_env_vars(job_environment: dict) -> bool:
"""Returns True if the job environment contains swarming env vars."""
return bool(
utils.string_is_true(job_environment.get('IS_SWARMING_JOB')) or
job_environment.get('SWARMING_DIMENSIONS'))


def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool:
"""Returns True if the task is supposed to run on swarming."""
if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled:
Expand All @@ -48,9 +55,7 @@ def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool:
logs.info('[Swarming DEBUG] Job not found', job_name=job_name)
return False

job_environment = job.get_environment()
if not utils.string_is_true(job_environment.get(
'IS_SWARMING_JOB')) and not job_environment.get('SWARMING_DIMENSIONS'):
if not has_swarming_env_vars(job.get_environment()):
logs.info('[Swarming DEBUG] No swarming env var', job_name=job_name)
return False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ def test_get_fuzz_tasks(self):
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()

num_tasks = 5
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks)
tasks = scheduler.get_fuzz_tasks()
provider = schedule_fuzz.OssfuzzFuzzTaskProvider()
tasks = provider.get_fuzz_tasks(num_tasks)
comparable_results = []
for task in tasks:
comparable_results.append((task.command, task.argument, task.job))
Expand Down Expand Up @@ -108,8 +108,8 @@ def test_os_version_precedence_project_over_job(self):
name=project_name, base_os_version='project-version').put()
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()

scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1)
tasks = scheduler.get_fuzz_tasks()
provider = schedule_fuzz.OssfuzzFuzzTaskProvider()
tasks = provider.get_fuzz_tasks(num_tasks=1)
self.assertEqual(len(tasks), 1)
task = tasks[0]

Expand Down Expand Up @@ -147,8 +147,8 @@ def test_os_version_fallback_to_job(self):
data_types.OssFuzzProject(name=project_name).put()
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()

scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1)
tasks = scheduler.get_fuzz_tasks()
provider = schedule_fuzz.OssfuzzFuzzTaskProvider()
tasks = provider.get_fuzz_tasks(num_tasks=1)
self.assertEqual(len(tasks), 1)
task = tasks[0]

Expand Down Expand Up @@ -186,8 +186,8 @@ def test_os_version_no_version(self):
data_types.OssFuzzProject(name=project_name).put()
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()

scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1)
tasks = scheduler.get_fuzz_tasks()
provider = schedule_fuzz.OssfuzzFuzzTaskProvider()
tasks = provider.get_fuzz_tasks(num_tasks=1)
self.assertEqual(len(tasks), 1)
task = tasks[0]

Expand Down Expand Up @@ -216,8 +216,9 @@ def _setup_chrome_entities(self, job_os_version=None):

def _run_and_get_task(self):
"""Runs the scheduler and returns the single task created."""
scheduler = schedule_fuzz.ChromeFuzzTaskScheduler(num_tasks=1)
tasks = scheduler.get_fuzz_tasks()
jobs = list(data_types.Job.query())
provider = schedule_fuzz.ChromeFuzzTaskProvider(jobs)
tasks = provider.get_fuzz_tasks(num_tasks=1)
self.assertEqual(len(tasks), 1)
return tasks[0]

Expand Down
Loading
Loading