Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ def main():
'clean_indexes', help='Clean up undefined indexes (in index.yaml).')
parser_clean_indexes.add_argument(
'-c', '--config-dir', required=True, help='Path to application config.')

parser_create_config = subparsers.add_parser(
'create_config', help='Create a new deployment config.')
parser_create_config.add_argument(
Expand Down
6 changes: 6 additions & 0 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@
'regression': 24 * 60 * 60,
}


def get_task_duration(command):
"""Gets the duration of a task."""
return TASK_LEASE_SECONDS_BY_COMMAND.get(command, TASK_LEASE_SECONDS)


TASK_QUEUE_DISPLAY_NAMES = {
'LINUX': 'Linux',
'LINUX_WITH_GPU': 'Linux (with GPU)',
Expand Down
7 changes: 1 addition & 6 deletions src/clusterfuzz/_internal/batch/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,6 @@ def _get_config_names(batch_tasks: List[RemoteTask]):
return config_map


def _get_task_duration(command):
return tasks.TASK_LEASE_SECONDS_BY_COMMAND.get(command,
tasks.TASK_LEASE_SECONDS)


WeightedSubconfig = collections.namedtuple('WeightedSubconfig',
['name', 'weight'])

Expand Down Expand Up @@ -293,7 +288,7 @@ def _get_specs_from_config(batch_tasks: List[RemoteTask]) -> Dict:
# Lower numbers are a lower priority, meaning less likely to run From:
# https://cloud.google.com/batch/docs/reference/rest/v1/projects.locations.jobs
priority = 0 if task.command == 'fuzz' else 1
max_run_duration = f'{_get_task_duration(task.command)}s'
max_run_duration = f'{tasks.get_task_duration(task.command)}s'
# This saves us time and reduces fragementation, e.g. every linux fuzz task
# run in this call will run in the same zone.
if config_name not in subconfig_map:
Expand Down
3 changes: 3 additions & 0 deletions src/clusterfuzz/_internal/k8s/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config

from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.base.tasks import task_utils
from clusterfuzz._internal.config import local_config
Expand Down Expand Up @@ -146,6 +147,7 @@ def _create_job_body(config: KubernetesJobConfig, input_url: str,
'name': job_name
},
'spec': {
'activeDeadlineSeconds': tasks.get_task_duration(config.command),
'template': {
'spec': {
'serviceAccountName':
Expand Down Expand Up @@ -395,6 +397,7 @@ def create_kata_container_job(self, config: KubernetesJobConfig,
'name': job_name
},
'spec': {
'activeDeadlineSeconds': tasks.get_task_duration(config.command),
'template': {
'metadata': {
'labels': {
Expand Down
35 changes: 14 additions & 21 deletions src/clusterfuzz/_internal/remote_task/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ def __init__(self):
self._gcp_batch_service = GcpBatchService()
self._kubernetes_service = KubernetesService()

def _should_use_kubernetes(self, job_type: str) -> bool:
def _should_use_kubernetes(self) -> bool:
"""Determines whether to use the Kubernetes backend for a given job.

The decision is made based on a random roll and the configured frequency
for the given job type.
"""
frequencies = job_frequency.get_job_frequency(job_type)
frequencies = job_frequency.get_job_frequency()
return random.random() < frequencies['kubernetes']

def create_uworker_main_batch_job(self, module: str, job_type: str,
Expand All @@ -91,7 +91,7 @@ def create_uworker_main_batch_job(self, module: str, job_type: str,

The choice of backend is determined by the `_should_use_kubernetes` method.
"""
if self._should_use_kubernetes(job_type):
if self._should_use_kubernetes():
return self._kubernetes_service.create_uworker_main_batch_job(
module, job_type, input_download_url)
return self._gcp_batch_service.create_uworker_main_batch_job(
Expand All @@ -106,28 +106,21 @@ def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]):
gcp_batch_tasks = []
kubernetes_tasks = []

# Group tasks by job_type to respect per-job frequencies
tasks_by_job = collections.defaultdict(list)
for task in remote_tasks:
tasks_by_job[task.job_type].append(task)

for job_type, tasks in tasks_by_job.items():
# Use random distribution if there is only one task
if len(tasks) == 1:
if self._should_use_kubernetes(job_type):
kubernetes_tasks.extend(tasks)
else:
gcp_batch_tasks.extend(tasks)
continue

# Use random distribution if there is only one task
if len(remote_tasks) == 1:
if self._should_use_kubernetes():
kubernetes_tasks.extend(remote_tasks)
else:
gcp_batch_tasks.extend(remote_tasks)
else:
# Use deterministic slicing for multiple tasks
frequencies = job_frequency.get_job_frequency(job_type)
frequencies = job_frequency.get_job_frequency()
k8s_ratio = frequencies['kubernetes']
k8s_count = int(len(tasks) * k8s_ratio)
k8s_count = int(len(remote_tasks) * k8s_ratio)

# We take the first chunk for Kubernetes
kubernetes_tasks.extend(tasks[:k8s_count])
gcp_batch_tasks.extend(tasks[k8s_count:])
kubernetes_tasks.extend(remote_tasks[:k8s_count])
gcp_batch_tasks.extend(remote_tasks[k8s_count:])

logs.info(f'Sending {len(gcp_batch_tasks)} tasks to GCP Batch.')
logs.info(f'Sending {len(kubernetes_tasks)} tasks to Kubernetes.')
Expand Down
29 changes: 2 additions & 27 deletions src/clusterfuzz/_internal/remote_task/job_frequency.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,13 @@
# By default, all jobs are sent to the GCP Batch backend. This can be
# overridden on a per-job basis by setting the `K8S_JOBS_FREQUENCY`
# environment variable.
DEFAULT_FREQUENCY = {'gcp_batch': 1.0, 'kubernetes': 0.1}
DEFAULT_FREQUENCY = {'gcp_batch': 1.0, 'kubernetes': 0.0}


def _get_job_frequencies_from_env():
"""Parses the `K8S_JOBS_FREQUENCY` environment variable.

The variable should be a comma-separated list of key-value pairs, where the
key is the job name and the value is the frequency (a float between 0 and 1).
For example: `libfuzzer_asan_chrome=0.5,libfuzzer_msan_chrome=0.2`.
"""
job_frequencies = {}
frequency_string = environment.get_value('K8S_JOBS_FREQUENCY')
if not frequency_string:
return {}

for item in frequency_string.split(','):
key, value = item.split('=')
job_frequencies[key] = float(value)
return job_frequencies


def get_job_frequency(job_name):
def get_job_frequency():
"""Returns the frequency for a given job.

If the frequency is not explicitly defined in the `K8S_JOBS_FREQUENCY`
environment variable, the default frequency is returned.
"""
job_frequencies = _get_job_frequencies_from_env()
if job_name in job_frequencies:
kubernetes_frequency = job_frequencies[job_name]
return {
'gcp_batch': 1.0 - kubernetes_frequency,
'kubernetes': kubernetes_frequency
}
return DEFAULT_FREQUENCY
Loading