Skip to content

Commit b35cef0

Browse files
committed
fixes
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
1 parent e7226af commit b35cef0

15 files changed

Lines changed: 90 additions & 196 deletions

File tree

src/clusterfuzz/_internal/base/feature_flags.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class FeatureFlags(Enum):
2929
K8S_JOBS_FREQUENCY = 'k8s_jobs_frequency'
3030
K8S_JOBS_PENDING_LIMIT = 'k8s_jobs_pending_limit'
3131

32+
UTASK_MAIN_QUEUE_LIMIT = 'utask_main_queue_limit'
33+
3234
@property
3335
def flag(self):
3436
"""Get the feature flag."""

src/clusterfuzz/_internal/bot/tasks/task_types.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from clusterfuzz._internal import swarming
1919
from clusterfuzz._internal.base import errors
20+
from clusterfuzz._internal.base import feature_flags
2021
from clusterfuzz._internal.base import tasks
2122
from clusterfuzz._internal.base.tasks import task_utils
2223
from clusterfuzz._internal.batch import service as batch_service
@@ -25,6 +26,8 @@
2526
from clusterfuzz._internal.metrics import logs
2627
from clusterfuzz._internal.system import environment
2728

29+
UTASK_MAIN_QUEUE_LIMIT_DEFAULT = 10000
30+
2831

2932
class BaseTask:
3033
"""Base module for tasks."""
@@ -157,7 +160,11 @@ def execute(self, task_argument, job_type, uworker_env):
157160

158161
utask_main_queue_size = tasks.get_utask_main_queue_size()
159162

160-
if utask_main_queue_size > 10000:
163+
utask_main_queue_limit = UTASK_MAIN_QUEUE_LIMIT_DEFAULT
164+
utask_flag = feature_flags.FeatureFlags.UTASK_MAIN_QUEUE_LIMIT.flag
165+
if utask_flag and utask_flag.enabled:
166+
utask_main_queue_limit = utask_flag.content
167+
if utask_main_queue_size > utask_main_queue_limit:
161168
raise errors.QueueLimitReachedError(utask_main_queue_size)
162169

163170
logs.info('Preprocessing utask.')

src/clusterfuzz/_internal/k8s/service.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import collections
1717
import ipaddress
1818
import os
19-
import typing
2019
import uuid
2120

2221
import google.auth
@@ -53,7 +52,7 @@
5352
])
5453

5554

56-
def _get_config_names(remote_tasks: typing.List[remote_task_types.RemoteTask]):
55+
def _get_config_names(remote_tasks: list[remote_task_types.RemoteTask]):
5756
"""Gets the name of the configs for each batch_task. Returns a dict
5857
5958
that is indexed by command and job_type for efficient lookup."""
@@ -90,9 +89,8 @@ def _get_config_names(remote_tasks: typing.List[remote_task_types.RemoteTask]):
9089
return config_map
9190

9291

93-
def _get_k8s_job_configs(
94-
remote_tasks: typing.List[remote_task_types.RemoteTask]
95-
) -> typing.Dict[typing.Tuple[str, str], KubernetesJobConfig]:
92+
def _get_k8s_job_configs(remote_tasks: list[remote_task_types.RemoteTask]
93+
) -> dict[tuple[str, str], KubernetesJobConfig]:
9694
"""Gets the configured specifications for a batch workload."""
9795

9896
if not remote_tasks:
@@ -286,7 +284,7 @@ def _get_pending_jobs_count(self) -> int:
286284

287285
def create_utask_main_job(self, module: str, job_type: str,
288286
input_download_url: str):
289-
"""Creates a single batch job for a uworker main task."""
287+
"""Creates a single Kubernetes job for a uworker main task."""
290288

291289
command = task_utils.get_command_from_module(module)
292290
batch_tasks = [
@@ -295,13 +293,13 @@ def create_utask_main_job(self, module: str, job_type: str,
295293
uncreated_tasks = self.create_utask_main_jobs(batch_tasks)
296294
return uncreated_tasks
297295

298-
def create_utask_main_jobs(
299-
self, remote_tasks: typing.List[remote_task_types.RemoteTask]):
300-
"""Creates a batch job for a list of uworker main tasks.
296+
def create_utask_main_jobs(self,
297+
remote_tasks: list[remote_task_types.RemoteTask]):
298+
"""Creates Kubernetes jobs for a list of uworker main tasks.
301299
302-
This method groups the tasks by their workload specification and creates a
303-
separate batch job for each group. This allows tasks with similar
304-
requirements to be processed together, which can improve efficiency.
300+
This method groups the tasks by their workload specification to efficiently
301+
schedule them. It then iterates through the groups and creates a separate
302+
Kubernetes job for each task.
305303
"""
306304

307305
flag = feature_flags.FeatureFlags.K8S_JOBS_PENDING_LIMIT.flag
@@ -311,7 +309,8 @@ def create_utask_main_jobs(
311309
pending_jobs_count = self._get_pending_jobs_count()
312310
if pending_jobs_count >= limit:
313311
logs.warning(
314-
f'Pending jobs count {pending_jobs_count} reached limit {limit}.')
312+
f'Pending jobs count {pending_jobs_count} reached limit {limit} '
313+
f'for k8s.')
315314
return remote_tasks
316315

317316
job_specs = collections.defaultdict(list)
@@ -321,7 +320,7 @@ def create_utask_main_jobs(
321320
f'Scheduling {remote_task.command}, {remote_task.job_type} in K8s.')
322321
config = configs[(remote_task.command, remote_task.job_type)]
323322
job_specs[config].append(remote_task.input_download_url)
324-
logs.info('Creating GCP batch jobs.')
323+
logs.info('Creating Kubernetes jobs.')
325324
for config, input_urls in job_specs.items():
326325
for input_url in input_urls:
327326
self.create_job(config, input_url)

src/clusterfuzz/_internal/remote_task/remote_task_gate.py

Lines changed: 13 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121

2222
import collections
2323
import random
24-
import typing
2524

26-
from clusterfuzz._internal.base.tasks.task_utils import is_testcase_based_task
27-
from clusterfuzz._internal.bot.tasks.utasks import uworker_io
2825
from clusterfuzz._internal.metrics import logs
2926
from clusterfuzz._internal.remote_task import remote_task_adapters
3027
from clusterfuzz._internal.remote_task import remote_task_types
@@ -107,44 +104,14 @@ def get_job_frequency(self):
107104
logs.info('Job frequencies', frequencies=frequencies)
108105
return frequencies
109106

110-
def prepare_unscheduled_tasks(
111-
self, unscheduled_remote_tasks: typing.List[remote_task_types.RemoteTask]
112-
) -> typing.List[remote_task_types.RemoteTask]:
113-
"""Prepares the unscheduled remote tasks to be sent back to the
114-
preprocess queue.
115-
116-
The messages in the preprocess queue expects tasks with
117-
the properties followng task_name, fuzzer, job, eta.
118-
The utasks has the signed url as argument instead of the fuzzer,
119-
then for sending it back to the preprocess, it recovers its fuzzer
120-
and override the argument.
121-
"""
122-
123-
prepared_tasks = []
124-
if not unscheduled_remote_tasks:
125-
return prepared_tasks
126-
127-
for task in unscheduled_remote_tasks:
128-
try:
129-
uworker_input = uworker_io.download_and_deserialize_uworker_input(
130-
task.input_download_url)
131-
if is_testcase_based_task(task.command):
132-
task.argument = uworker_input.testcase_id
133-
else:
134-
task.argument = uworker_input.fuzzer_name
135-
prepared_tasks.append(task)
136-
except Exception:
137-
logs.error('Could not prepare the task due to an error.')
138-
139-
return prepared_tasks
140-
141107
def create_utask_main_job(self, module, job_type, input_download_url):
108+
"""Creates a single remote task, selecting a backend dynamically."""
142109
adapter_id = self._get_adapter()
143110
service = self._service_map[adapter_id]
144111
return service.create_utask_main_job(module, job_type, input_download_url)
145112

146-
def create_utask_main_jobs(
147-
self, remote_tasks: typing.List[remote_task_types.RemoteTask]):
113+
def create_utask_main_jobs(self,
114+
remote_tasks: list[remote_task_types.RemoteTask]):
148115
"""Creates a batch of remote tasks, distributing them across backends.
149116
150117
This method handles two cases:
@@ -161,6 +128,7 @@ def create_utask_main_jobs(
161128
# For a single task, use a random distribution.
162129
adapter_id = self._get_adapter()
163130
tasks_by_adapter[adapter_id].extend(remote_tasks)
131+
unscheduled_tasks = []
164132
else:
165133
# For multiple tasks, use deterministic slicing to ensure the
166134
# distribution precisely matches the frequency configuration.
@@ -172,14 +140,17 @@ def create_utask_main_jobs(
172140
remote_tasks[start_index:start_index + count])
173141
start_index += count
174142

175-
# Distribute any remainder tasks (due to rounding) one by one. This
176-
# ensures that all tasks are assigned to a backend.
177143
remaining_tasks = remote_tasks[start_index:]
178-
for i, task in enumerate(remaining_tasks):
179-
adapter_id = list(frequencies.keys())[i % len(frequencies)]
180-
tasks_by_adapter[adapter_id].append(task)
144+
if sum(frequencies.values()) >= 0.999:
145+
# Distribute any remainder tasks (due to rounding) one by one. This
146+
# ensures that all tasks are assigned to a backend.
147+
for i, task in enumerate(remaining_tasks):
148+
adapter_id = list(frequencies.keys())[i % len(frequencies)]
149+
tasks_by_adapter[adapter_id].append(task)
150+
unscheduled_tasks = []
151+
else:
152+
unscheduled_tasks = list(remaining_tasks)
181153

182-
unscheduled_tasks = []
183154
for adapter_id, tasks in tasks_by_adapter.items():
184155
if tasks:
185156
try:

src/clusterfuzz/_internal/remote_task/remote_task_types.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
"""Remote task types."""
1515

1616
import abc
17-
import typing
1817

1918
from clusterfuzz._internal.base import tasks
2019

@@ -53,7 +52,7 @@ def create_utask_main_job(self, module: str, job_type: str,
5352

5453
@abc.abstractmethod
5554
def create_utask_main_jobs(
56-
self, remote_tasks: typing.List[RemoteTask]) -> typing.List[RemoteTask]:
55+
self, remote_tasks: list[RemoteTask]) -> list[RemoteTask]:
5756
"""Creates many remote tasks for uworker main tasks.
5857
Returns the tasks that couldn't be created.
5958
"""

src/clusterfuzz/_internal/tests/core/base/tasks/queue_size_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2023 Google LLC
1+
# Copyright 2026 Google LLC
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.

src/clusterfuzz/_internal/tests/core/batch/batch_service_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ def setUp(self):
157157
self.batch_service = batch_service.GcpBatchService()
158158
self.mock.uuid4.side_effect = [uuid.UUID(u) for u in UUIDS]
159159

160-
def test_create_uworker_main_batch_jobs(self):
160+
def test_create_utask_main_jobs(self):
161161
"""Tests that create_utask_main_jobs works as expected."""
162162
# Create mock data.
163163
spec1 = batch_service.BatchWorkloadSpec(
@@ -217,7 +217,7 @@ def test_create_uworker_main_batch_jobs(self):
217217
mock.call(expected_create_request_2),
218218
])
219219

220-
def test_create_uworker_main_batch_job(self):
220+
def test_create_utask_main_job(self):
221221
"""Tests that create_utask_main_job works as expected."""
222222
# Create mock data.
223223
spec1 = batch_service.BatchWorkloadSpec(

src/clusterfuzz/_internal/tests/core/bot/startup/run_bot_test.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ def setUp(self):
135135
helpers.patch(self, [
136136
'clusterfuzz._internal.base.tasks.get_utask_mains',
137137
'clusterfuzz._internal.remote_task.remote_task_gate.RemoteTaskGate',
138-
'clusterfuzz._internal.base.tasks.bulk_add_tasks',
139138
])
140139

141140
def test_schedule_tasks_requeue_uncreated(self):
@@ -157,9 +156,6 @@ def test_schedule_tasks_requeue_uncreated(self):
157156
self.mock.RemoteTaskGate.return_value.create_utask_main_jobs.assert_called_once(
158157
)
159158

160-
# Check that bulk_add_tasks was NOT called.
161-
self.mock.bulk_add_tasks.assert_not_called()
162-
163159
# Verify that cancel_lease_ack was called on the pubsub task.
164160
mock_task.cancel_lease_ack.assert_called_once()
165161

@@ -179,4 +175,3 @@ def test_schedule_tasks_success(self):
179175

180176
self.mock.RemoteTaskGate.return_value.create_utask_main_jobs.assert_called_once(
181177
)
182-
self.mock.bulk_add_tasks.assert_not_called()

src/clusterfuzz/_internal/tests/core/bot/tasks/task_types_test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ def test_event_emit_during_exception(self):
137137
self.mock.emit.assert_any_call(event_finished)
138138

139139

140+
@test_utils.with_cloud_emulators('datastore')
140141
class UTaskExecuteTest(unittest.TestCase):
141142
"""Tests for UTask execution."""
142143

src/clusterfuzz/_internal/tests/core/k8s/k8s_service_e2e_test.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ def test_create_job(self):
150150
@mock.patch('clusterfuzz._internal.k8s.service._get_k8s_job_configs')
151151
@mock.patch(
152152
'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module')
153-
def test_create_uworker_main_batch_job(self, mock_get_command_from_module,
154-
mock_get_k8s_job_configs):
153+
def test_create_utask_main_job(self, mock_get_command_from_module,
154+
mock_get_k8s_job_configs):
155155
"""Tests creating a single uworker main batch job."""
156156
mock_get_command_from_module.return_value = 'fuzz'
157157
config = KubernetesJobConfig(
@@ -175,8 +175,8 @@ def test_create_uworker_main_batch_job(self, mock_get_command_from_module,
175175
@mock.patch('clusterfuzz._internal.k8s.service._get_k8s_job_configs')
176176
@mock.patch(
177177
'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module')
178-
def test_create_uworker_main_batch_jobs(self, mock_get_command_from_module,
179-
mock_get_k8s_job_configs):
178+
def test_create_utask_main_jobs(self, mock_get_command_from_module,
179+
mock_get_k8s_job_configs):
180180
"""Tests creating multiple uworker main batch jobs."""
181181
mock_get_command_from_module.return_value = 'fuzz'
182182
config1 = KubernetesJobConfig(

0 commit comments

Comments
 (0)