Skip to content

Commit 9fba2fb

Browse files
IvanBM18letitz
andauthored
[Swarming] Push preprocess tasks to swarming queue (#5282)
## Overview This change adds support for scheduling tasks to the new Swarming backend. Because Swarming uses a different execution model and to be able to later account for `backpressure` it requires its own separate `preprocess` queue and a much lower default target size (5) to prevent unbounded task queuing. By refactoring the cron scheduling logic, we can now simultaneously feed both the Swarming and Batch environments at their respective ideal rates. ## Changes - Add a distinct `SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT` set to `5` to support the Swarming backend's task capacity needs. - Define Swarming-specific queue mappings (`SWARMING_QUEUES`). - Refactored the `scheduler_fuzz` so that the schedulers are composed of multiple helper functions to avoid complex inheritance: - Renamed them to Providers, as they no longer schedule tasks, they only look for them - Create a DTO called `Queue` that contains information regarding the queues - Update `ChromeFuzzTaskScheduler` to independently schedule Swarming tasks alongside standard Batch tasks. - Now it also looks for Android jobs as well - Update `schedule_fuzz_test.py` to match the updated scheduler class signatures. ## Tests performed This changes have been living for 2 days in dev and seems the cron job is scheduling fuzz tasks successfully: <img width="632" height="224" alt="image" src="https://github.com/user-attachments/assets/b7552f73-aeff-4031-b7a7-32e9800d8a3b" /> Also the fuzz task hours seem healthy as well <img width="515" height="339" alt="image" src="https://github.com/user-attachments/assets/ac78a470-2c1f-4ee3-86fe-0b5233482038" /> Note: The spike in fuzz hours corresponds to the spike in more scheduling being performed The only thing thats weird to me(not related to this PR btw) is that it seems that the cron job runs once a day in dev, when the config in dev says it should run every 10 min, maybe this is because of the limited quota? #### TODO - `src/clusterfuzz/_internal/base/feature_flags.py`: Update this value based off dev & stage metrics and tests. --------- Co-authored-by: Titouan Rigoudy <titouan@chromium.org>
1 parent bb4fe2e commit 9fba2fb

6 files changed

Lines changed: 205 additions & 85 deletions

File tree

src/clusterfuzz/_internal/base/feature_flags.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ class FeatureFlags(Enum):
3636
PREPROCESS_QUEUE_SIZE_LIMIT = 'preprocess_queue_size_limit'
3737

3838
SWARMING_REMOTE_EXECUTION = 'swarming_remote_execution'
39+
# TODO(b/516630567): Set this value based off dev & stage metrics and tests.
40+
SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT = 'swarming_preprocess_queue_size_limit'
3941

4042
@property
4143
def flag(self):

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ def get_task_duration(command):
101101
UTASK_MAIN_QUEUE = 'utask_main'
102102
PREPROCESS_QUEUE = 'preprocess'
103103

104+
SWARMING_QUEUES = {
105+
PREPROCESS_QUEUE: 'preprocess-swarming',
106+
UTASK_MAIN_QUEUE: 'utask_main-swarming',
107+
}
108+
104109
# See https://github.com/google/clusterfuzz/issues/3347 for usage
105110
SUBQUEUE_IDENTIFIER = ':'
106111

src/clusterfuzz/_internal/cron/schedule_fuzz.py

Lines changed: 138 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,55 @@
1313
# limitations under the License.
1414
"""Cron job to schedule fuzz tasks that run on batch."""
1515

16+
from abc import ABC
17+
from abc import abstractmethod
1618
import collections
19+
from dataclasses import dataclass
1720
import random
1821
import time
1922

2023
from google.cloud import monitoring_v3
2124

22-
from clusterfuzz._internal.base import feature_flags
25+
from clusterfuzz._internal import swarming
2326
from clusterfuzz._internal.base import memoize
2427
from clusterfuzz._internal.base import tasks
2528
from clusterfuzz._internal.base import utils
29+
from clusterfuzz._internal.base.feature_flags import FeatureFlags
2630
from clusterfuzz._internal.datastore import data_types
2731
from clusterfuzz._internal.datastore import ndb_utils
2832
from clusterfuzz._internal.google_cloud_utils import credentials
2933
from clusterfuzz._internal.metrics import logs
3034

3135
PREPROCESS_TARGET_SIZE_DEFAULT = 10000
36+
SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT = 10
37+
38+
39+
@dataclass
40+
class Queue:
41+
"""Data class that holds information about a pub/sub queue.
42+
43+
Attributes:
44+
name: The name of the Pub/Sub subscription associated with the queue.
45+
default_target_size: Number of tasks that should be kept in the queue.
46+
target_size_flag: Feature flag used to override the default target size.
47+
"""
48+
49+
name: str
50+
default_target_size: int
51+
target_size_flag: FeatureFlags
52+
53+
54+
_DEFAULT_QUEUE = Queue(
55+
name=tasks.PREPROCESS_QUEUE,
56+
default_target_size=PREPROCESS_TARGET_SIZE_DEFAULT,
57+
target_size_flag=FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT,
58+
)
59+
60+
_SWARMING_QUEUE = Queue(
61+
name=tasks.SWARMING_QUEUES[tasks.PREPROCESS_QUEUE],
62+
default_target_size=SWARMING_PREPROCESS_TARGET_SIZE_DEFAULT,
63+
target_size_flag=FeatureFlags.SWARMING_PREPROCESS_QUEUE_SIZE_LIMIT,
64+
)
3265

3366

3467
@memoize.wrap(memoize.InMemory(60))
@@ -62,14 +95,12 @@ def get_queue_size(creds, project_id, subscription_id):
6295
return 0
6396

6497

65-
class BaseFuzzTaskScheduler:
66-
"""Base fuzz task scheduler for any deployment of ClusterFuzz."""
98+
class BaseFuzzTaskProvider(ABC):
99+
"""Base fuzz task provider for any deployment of ClusterFuzz."""
67100

68-
def __init__(self, num_tasks):
69-
self.num_tasks = num_tasks
70-
71-
def get_fuzz_tasks(self):
72-
raise NotImplementedError('Child class must implement.')
101+
@abstractmethod
102+
def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]:
103+
"""Returns a list of fuzz tasks."""
73104

74105

75106
class FuzzTaskCandidate:
@@ -98,10 +129,10 @@ def copy(self):
98129
base_os_version=self.base_os_version)
99130

100131

101-
class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler):
102-
"""Fuzz task scheduler for OSS-Fuzz."""
132+
class OssfuzzFuzzTaskProvider(BaseFuzzTaskProvider):
133+
"""Fuzz task provider for OSS-Fuzz."""
103134

104-
def get_fuzz_tasks(self) -> list[tasks.Task]:
135+
def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]:
105136
# TODO(metzman): Handle high end.
106137
# A job's weight is determined by its own weight and the weight of the
107138
# project is a part of. First get project weights.
@@ -164,11 +195,9 @@ def get_fuzz_tasks(self) -> list[tasks.Task]:
164195
for fuzz_task_candidate in fuzz_task_candidates:
165196
weights.append(fuzz_task_candidate.weight)
166197

167-
fuzz_tasks_count = self.num_tasks
168-
logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for OSS-Fuzz.')
198+
logs.info(f'Scheduling {num_tasks} fuzz tasks for OSS-Fuzz.')
169199

170-
choices = random.choices(
171-
fuzz_task_candidates, weights=weights, k=fuzz_tasks_count)
200+
choices = random.choices(fuzz_task_candidates, weights=weights, k=num_tasks)
172201
fuzz_tasks = [
173202
tasks.Task(
174203
'fuzz',
@@ -183,45 +212,25 @@ def get_fuzz_tasks(self) -> list[tasks.Task]:
183212
return fuzz_tasks
184213

185214

186-
class ChromeFuzzTaskScheduler(BaseFuzzTaskScheduler):
187-
"""Fuzz task scheduler for Chrome."""
215+
class ChromeFuzzTaskProvider(BaseFuzzTaskProvider):
216+
"""Fuzz task provider for Chrome."""
188217

189-
def get_fuzz_tasks(self) -> list[tasks.Task]:
190-
"""Returns fuzz tasks for chrome, weighted by job weight."""
191-
logs.info('Getting jobs for Chrome.')
218+
_candidates: list[FuzzTaskCandidate]
192219

193-
candidates_by_job = {}
194-
# Only consider linux jobs for chrome fuzzing.
195-
job_query = data_types.Job.query(data_types.Job.platform == 'LINUX')
196-
for job in ndb_utils.get_all_from_query(job_query):
197-
base_os_version = None
198-
if job.base_os_version:
199-
base_os_version = job.base_os_version
220+
def __init__(self, jobs: list[data_types.Job]):
221+
self._candidates = _create_candidates_from_jobs(jobs)
200222

201-
candidates_by_job[job.name] = FuzzTaskCandidate(
202-
job=job.name, project=job.project, base_os_version=base_os_version)
203-
204-
fuzz_task_candidates = []
205-
fuzzer_job_query = ndb_utils.get_all_from_query(
206-
data_types.FuzzerJob.query())
207-
208-
for fuzzer_job in fuzzer_job_query:
209-
if fuzzer_job.job not in candidates_by_job:
210-
continue
211-
fuzz_task_candidate = candidates_by_job[fuzzer_job.job].copy()
212-
fuzz_task_candidate.fuzzer = fuzzer_job.fuzzer
213-
fuzz_task_candidate.weight = fuzzer_job.actual_weight
214-
fuzz_task_candidates.append(fuzz_task_candidate)
223+
def get_fuzz_tasks(self, num_tasks: int) -> list[tasks.Task]:
224+
"""Returns fuzz tasks for chrome, weighted by job weight."""
225+
logs.info('Getting jobs for Chrome.')
215226

216-
weights = [candidate.weight for candidate in fuzz_task_candidates]
217-
fuzz_tasks_count = self.num_tasks
218-
logs.info(f'Scheduling {fuzz_tasks_count} fuzz tasks for Chrome.')
227+
weights = [candidate.weight for candidate in self._candidates]
228+
logs.info(f'Scheduling {num_tasks} fuzz tasks for Chrome.')
219229

220-
if not fuzz_task_candidates:
230+
if not self._candidates:
221231
return []
222232

223-
choices = random.choices(
224-
fuzz_task_candidates, weights=weights, k=fuzz_tasks_count)
233+
choices = random.choices(self._candidates, weights=weights, k=num_tasks)
225234
fuzz_tasks = [
226235
tasks.Task(
227236
'fuzz',
@@ -233,50 +242,107 @@ def get_fuzz_tasks(self) -> list[tasks.Task]:
233242
return fuzz_tasks
234243

235244

236-
def get_fuzz_tasks(num_tasks: int) -> list[tasks.Task]:
237-
if utils.is_oss_fuzz():
238-
scheduler = OssfuzzFuzzTaskScheduler(num_tasks)
239-
else:
240-
scheduler = ChromeFuzzTaskScheduler(num_tasks)
241-
fuzz_tasks = scheduler.get_fuzz_tasks()
242-
return fuzz_tasks
245+
def _get_jobs_for_platforms(platforms: list[str]) -> list[data_types.Job]:
246+
"""Returns all jobs for the given platforms."""
247+
return list(data_types.Job.query(data_types.Job.platform.IN(platforms)))
248+
243249

250+
def _get_swarming_jobs():
251+
"""Returns all jobs that have swarming environment variables."""
252+
jobs = _get_jobs_for_platforms(['ANDROID', 'LINUX'])
253+
return [
254+
job for job in jobs
255+
if swarming.has_swarming_env_vars(job.get_environment())
256+
]
244257

245-
def schedule_fuzz_tasks() -> bool:
246-
"""Schedules fuzz tasks."""
247258

259+
def _remaining_queue_capacity(queue: Queue) -> int:
260+
"""Returns the remaining capacity of the given queue."""
248261
project = utils.get_application_id()
249-
start = time.time()
250262
creds = credentials.get_default()[0]
251-
preprocess_queue_size = get_queue_size(creds, project, tasks.PREPROCESS_QUEUE)
263+
preprocess_queue_size = get_queue_size(creds, project, queue.name)
252264

253-
target_size = PREPROCESS_TARGET_SIZE_DEFAULT
254-
target_size_flag = feature_flags.FeatureFlags.PREPROCESS_QUEUE_SIZE_LIMIT
255-
if target_size_flag.enabled and target_size_flag.content:
256-
target_size = int(target_size_flag.content)
265+
target_size = queue.default_target_size
266+
if queue.target_size_flag.enabled and queue.target_size_flag.content:
267+
target_size = int(queue.target_size_flag.content)
257268

258269
num_tasks = target_size - preprocess_queue_size
259-
logs.info(f'Preprocess queue size: {preprocess_queue_size}. '
270+
logs.info(f'Queue {queue.name} size: {preprocess_queue_size}. '
260271
f'Target: {target_size}. Needed: {num_tasks}.')
261272

273+
return num_tasks
274+
275+
276+
def _fill_queue(queue: Queue, provider: BaseFuzzTaskProvider):
277+
"""Fills the given queue with tasks from the provider."""
278+
start = time.time()
279+
num_tasks = _remaining_queue_capacity(queue)
280+
262281
if num_tasks <= 0:
263282
logs.info('Queue size met or exceeded. Not scheduling tasks.')
264-
return False
283+
return
265284

266-
fuzz_tasks = get_fuzz_tasks(num_tasks)
285+
fuzz_tasks = provider.get_fuzz_tasks(num_tasks)
267286
if not fuzz_tasks:
268-
logs.error('No fuzz tasks found to schedule.')
269-
return False
287+
logs.error(f'No fuzz tasks found to schedule in queue {queue.name}.')
288+
return
270289

271-
logs.info(f'Adding {fuzz_tasks} to preprocess queue.')
272-
tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True)
273-
logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.')
290+
logs.info(f'Adding {len(fuzz_tasks)} tasks to queue {queue.name}.')
291+
tasks.bulk_add_tasks(fuzz_tasks, queue=queue.name, eta_now=True)
292+
logs.info(f'Scheduled {len(fuzz_tasks)} tasks on queue {queue.name}.')
274293

275294
end = time.time()
276295
total = end - start
277296
logs.info(f'Task scheduling took {total} seconds.')
278-
return True
297+
298+
299+
def _create_candidates_from_jobs(
300+
jobs: list[data_types.Job]) -> list[FuzzTaskCandidate]:
301+
"""Create candidates from jobs & assign weights to them."""
302+
if not jobs:
303+
return []
304+
305+
jobs_by_name = {job.name: job for job in jobs}
306+
fuzzer_job_query = ndb_utils.get_all_from_query(
307+
data_types.FuzzerJob.query(
308+
data_types.FuzzerJob.job.IN(list(jobs_by_name.keys()))))
309+
fuzz_task_candidates = []
310+
311+
for fuzzer_job in fuzzer_job_query:
312+
job = jobs_by_name[fuzzer_job.job]
313+
fuzz_task_candidate = FuzzTaskCandidate(
314+
job=job.name,
315+
project=job.project,
316+
base_os_version=job.base_os_version,
317+
fuzzer=fuzzer_job.fuzzer,
318+
weight=fuzzer_job.actual_weight,
319+
)
320+
fuzz_task_candidates.append(fuzz_task_candidate)
321+
322+
return fuzz_task_candidates
323+
324+
325+
def schedule_chrome_fuzz_tasks():
326+
"""Schedules fuzz tasks for Chrome."""
327+
default_jobs = _get_jobs_for_platforms(['LINUX'])
328+
default_provider = ChromeFuzzTaskProvider(default_jobs)
329+
_fill_queue(_DEFAULT_QUEUE, default_provider)
330+
331+
if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled:
332+
return
333+
334+
swarming_jobs = _get_swarming_jobs()
335+
swarming_provider = ChromeFuzzTaskProvider(swarming_jobs)
336+
_fill_queue(_SWARMING_QUEUE, swarming_provider)
337+
338+
339+
def schedule_fuzz_tasks():
340+
"""Schedules fuzz tasks based on deployment type."""
341+
if utils.is_oss_fuzz():
342+
_fill_queue(_DEFAULT_QUEUE, OssfuzzFuzzTaskProvider())
343+
else:
344+
schedule_chrome_fuzz_tasks()
279345

280346

281347
def main():
282-
return schedule_fuzz_tasks()
348+
schedule_fuzz_tasks()

src/clusterfuzz/_internal/swarming/__init__.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@
2929
from clusterfuzz._internal.system import environment
3030

3131

32+
def has_swarming_env_vars(job_environment: dict) -> bool:
33+
"""Returns True if the job environment contains swarming env vars."""
34+
return bool(
35+
utils.string_is_true(job_environment.get('IS_SWARMING_JOB')) or
36+
job_environment.get('SWARMING_DIMENSIONS'))
37+
38+
3239
def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool:
3340
"""Returns True if the task is supposed to run on swarming."""
3441
if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled:
@@ -40,9 +47,7 @@ def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool:
4047
logs.info('[Swarming DEBUG] Job not found', job_name=job_name)
4148
return False
4249

43-
job_environment = job.get_environment()
44-
if not utils.string_is_true(job_environment.get(
45-
'IS_SWARMING_JOB')) and not job_environment.get('SWARMING_DIMENSIONS'):
50+
if not has_swarming_env_vars(job.get_environment()):
4651
logs.info('[Swarming DEBUG] No swarming env var', job_name=job_name)
4752
return False
4853

src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ def test_get_fuzz_tasks(self):
6767
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()
6868

6969
num_tasks = 5
70-
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks)
71-
tasks = scheduler.get_fuzz_tasks()
70+
provider = schedule_fuzz.OssfuzzFuzzTaskProvider()
71+
tasks = provider.get_fuzz_tasks(num_tasks)
7272
comparable_results = []
7373
for task in tasks:
7474
comparable_results.append((task.command, task.argument, task.job))
@@ -108,8 +108,8 @@ def test_os_version_precedence_project_over_job(self):
108108
name=project_name, base_os_version='project-version').put()
109109
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()
110110

111-
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1)
112-
tasks = scheduler.get_fuzz_tasks()
111+
provider = schedule_fuzz.OssfuzzFuzzTaskProvider()
112+
tasks = provider.get_fuzz_tasks(num_tasks=1)
113113
self.assertEqual(len(tasks), 1)
114114
task = tasks[0]
115115

@@ -147,8 +147,8 @@ def test_os_version_fallback_to_job(self):
147147
data_types.OssFuzzProject(name=project_name).put()
148148
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()
149149

150-
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1)
151-
tasks = scheduler.get_fuzz_tasks()
150+
provider = schedule_fuzz.OssfuzzFuzzTaskProvider()
151+
tasks = provider.get_fuzz_tasks(num_tasks=1)
152152
self.assertEqual(len(tasks), 1)
153153
task = tasks[0]
154154

@@ -186,8 +186,8 @@ def test_os_version_no_version(self):
186186
data_types.OssFuzzProject(name=project_name).put()
187187
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()
188188

189-
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_tasks=1)
190-
tasks = scheduler.get_fuzz_tasks()
189+
provider = schedule_fuzz.OssfuzzFuzzTaskProvider()
190+
tasks = provider.get_fuzz_tasks(num_tasks=1)
191191
self.assertEqual(len(tasks), 1)
192192
task = tasks[0]
193193

@@ -216,8 +216,9 @@ def _setup_chrome_entities(self, job_os_version=None):
216216

217217
def _run_and_get_task(self):
218218
"""Runs the scheduler and returns the single task created."""
219-
scheduler = schedule_fuzz.ChromeFuzzTaskScheduler(num_tasks=1)
220-
tasks = scheduler.get_fuzz_tasks()
219+
jobs = list(data_types.Job.query())
220+
provider = schedule_fuzz.ChromeFuzzTaskProvider(jobs)
221+
tasks = provider.get_fuzz_tasks(num_tasks=1)
221222
self.assertEqual(len(tasks), 1)
222223
return tasks[0]
223224

0 commit comments

Comments
 (0)