Skip to content

Commit 3de649a

Browse files
authored
Pr/task refactoring (#5117)
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
1 parent f595ff8 commit 3de649a

7 files changed

Lines changed: 27 additions & 56 deletions

File tree

butler.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,6 @@ def main():
435435
'clean_indexes', help='Clean up undefined indexes (in index.yaml).')
436436
parser_clean_indexes.add_argument(
437437
'-c', '--config-dir', required=True, help='Path to application config.')
438-
439438
parser_create_config = subparsers.add_parser(
440439
'create_config', help='Create a new deployment config.')
441440
parser_create_config.add_argument(

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@
6464
'regression': 24 * 60 * 60,
6565
}
6666

67+
68+
def get_task_duration(command):
69+
"""Gets the duration of a task."""
70+
return TASK_LEASE_SECONDS_BY_COMMAND.get(command, TASK_LEASE_SECONDS)
71+
72+
6773
TASK_QUEUE_DISPLAY_NAMES = {
6874
'LINUX': 'Linux',
6975
'LINUX_WITH_GPU': 'Linux (with GPU)',

src/clusterfuzz/_internal/batch/service.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,6 @@ def _get_config_names(batch_tasks: List[RemoteTask]):
238238
return config_map
239239

240240

241-
def _get_task_duration(command):
242-
return tasks.TASK_LEASE_SECONDS_BY_COMMAND.get(command,
243-
tasks.TASK_LEASE_SECONDS)
244-
245-
246241
WeightedSubconfig = collections.namedtuple('WeightedSubconfig',
247242
['name', 'weight'])
248243

@@ -293,7 +288,7 @@ def _get_specs_from_config(batch_tasks: List[RemoteTask]) -> Dict:
293288
# Lower numbers are a lower priority, meaning less likely to run From:
294289
# https://cloud.google.com/batch/docs/reference/rest/v1/projects.locations.jobs
295290
priority = 0 if task.command == 'fuzz' else 1
296-
max_run_duration = f'{_get_task_duration(task.command)}s'
291+
max_run_duration = f'{tasks.get_task_duration(task.command)}s'
297292
# This saves us time and reduces fragementation, e.g. every linux fuzz task
298293
# run in this call will run in the same zone.
299294
if config_name not in subconfig_map:

src/clusterfuzz/_internal/bot/tasks/task_types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def execute(self, task_argument, job_type, uworker_env):
225225
'analyze': UTask,
226226
'blame': TrustedTask,
227227
'corpus_pruning': UTask,
228-
'fuzz': UTaskLocalExecutor,
228+
'fuzz': UTask,
229229
'impact': TrustedTask,
230230
'minimize': UTask,
231231
'progression': UTask,

src/clusterfuzz/_internal/k8s/service.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from kubernetes import client as k8s_client
2727
from kubernetes import config as k8s_config
2828

29+
from clusterfuzz._internal.base import tasks
2930
from clusterfuzz._internal.base import utils
3031
from clusterfuzz._internal.base.tasks import task_utils
3132
from clusterfuzz._internal.config import local_config
@@ -146,6 +147,7 @@ def _create_job_body(config: KubernetesJobConfig, input_url: str,
146147
'name': job_name
147148
},
148149
'spec': {
150+
'activeDeadlineSeconds': tasks.get_task_duration(config.command),
149151
'template': {
150152
'spec': {
151153
'serviceAccountName':
@@ -395,6 +397,7 @@ def create_kata_container_job(self, config: KubernetesJobConfig,
395397
'name': job_name
396398
},
397399
'spec': {
400+
'activeDeadlineSeconds': tasks.get_task_duration(config.command),
398401
'template': {
399402
'metadata': {
400403
'labels': {

src/clusterfuzz/_internal/remote_task/__init__.py

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,13 @@ def __init__(self):
7676
self._gcp_batch_service = GcpBatchService()
7777
self._kubernetes_service = KubernetesService()
7878

79-
def _should_use_kubernetes(self, job_type: str) -> bool:
79+
def _should_use_kubernetes(self) -> bool:
8080
"""Determines whether to use the Kubernetes backend for a given job.
8181
8282
The decision is made based on a random roll and the configured frequency
8383
for the given job type.
8484
"""
85-
frequencies = job_frequency.get_job_frequency(job_type)
85+
frequencies = job_frequency.get_job_frequency()
8686
return random.random() < frequencies['kubernetes']
8787

8888
def create_uworker_main_batch_job(self, module: str, job_type: str,
@@ -91,7 +91,7 @@ def create_uworker_main_batch_job(self, module: str, job_type: str,
9191
9292
The choice of backend is determined by the `_should_use_kubernetes` method.
9393
"""
94-
if self._should_use_kubernetes(job_type):
94+
if self._should_use_kubernetes():
9595
return self._kubernetes_service.create_uworker_main_batch_job(
9696
module, job_type, input_download_url)
9797
return self._gcp_batch_service.create_uworker_main_batch_job(
@@ -106,28 +106,21 @@ def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]):
106106
gcp_batch_tasks = []
107107
kubernetes_tasks = []
108108

109-
# Group tasks by job_type to respect per-job frequencies
110-
tasks_by_job = collections.defaultdict(list)
111-
for task in remote_tasks:
112-
tasks_by_job[task.job_type].append(task)
113-
114-
for job_type, tasks in tasks_by_job.items():
115-
# Use random distribution if there is only one task
116-
if len(tasks) == 1:
117-
if self._should_use_kubernetes(job_type):
118-
kubernetes_tasks.extend(tasks)
119-
else:
120-
gcp_batch_tasks.extend(tasks)
121-
continue
122-
109+
# Use random distribution if there is only one task
110+
if len(remote_tasks) == 1:
111+
if self._should_use_kubernetes():
112+
kubernetes_tasks.extend(remote_tasks)
113+
else:
114+
gcp_batch_tasks.extend(remote_tasks)
115+
else:
123116
# Use deterministic slicing for multiple tasks
124-
frequencies = job_frequency.get_job_frequency(job_type)
117+
frequencies = job_frequency.get_job_frequency()
125118
k8s_ratio = frequencies['kubernetes']
126-
k8s_count = int(len(tasks) * k8s_ratio)
119+
k8s_count = int(len(remote_tasks) * k8s_ratio)
127120

128121
# We take the first chunk for Kubernetes
129-
kubernetes_tasks.extend(tasks[:k8s_count])
130-
gcp_batch_tasks.extend(tasks[k8s_count:])
122+
kubernetes_tasks.extend(remote_tasks[:k8s_count])
123+
gcp_batch_tasks.extend(remote_tasks[k8s_count:])
131124

132125
logs.info(f'Sending {len(gcp_batch_tasks)} tasks to GCP Batch.')
133126
logs.info(f'Sending {len(kubernetes_tasks)} tasks to Kubernetes.')

src/clusterfuzz/_internal/remote_task/job_frequency.py

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,38 +23,13 @@
2323
# By default, all jobs are sent to the GCP Batch backend. This can be
2424
# overridden on a per-job basis by setting the `K8S_JOBS_FREQUENCY`
2525
# environment variable.
26-
DEFAULT_FREQUENCY = {'gcp_batch': 1.0, 'kubernetes': 0.1}
26+
DEFAULT_FREQUENCY = {'gcp_batch': 1.0, 'kubernetes': 0.0}
2727

2828

29-
def _get_job_frequencies_from_env():
30-
"""Parses the `K8S_JOBS_FREQUENCY` environment variable.
31-
32-
The variable should be a comma-separated list of key-value pairs, where the
33-
key is the job name and the value is the frequency (a float between 0 and 1).
34-
For example: `libfuzzer_asan_chrome=0.5,libfuzzer_msan_chrome=0.2`.
35-
"""
36-
job_frequencies = {}
37-
frequency_string = environment.get_value('K8S_JOBS_FREQUENCY')
38-
if not frequency_string:
39-
return {}
40-
41-
for item in frequency_string.split(','):
42-
key, value = item.split('=')
43-
job_frequencies[key] = float(value)
44-
return job_frequencies
45-
46-
47-
def get_job_frequency(job_name):
29+
def get_job_frequency():
4830
"""Returns the frequency for a given job.
4931
5032
If the frequency is not explicitly defined in the `K8S_JOBS_FREQUENCY`
5133
environment variable, the default frequency is returned.
5234
"""
53-
job_frequencies = _get_job_frequencies_from_env()
54-
if job_name in job_frequencies:
55-
kubernetes_frequency = job_frequencies[job_name]
56-
return {
57-
'gcp_batch': 1.0 - kubernetes_frequency,
58-
'kubernetes': kubernetes_frequency
59-
}
6035
return DEFAULT_FREQUENCY

0 commit comments

Comments
 (0)