Skip to content

Commit b90da76

Browse files
committed
Implement job limiter for k8s service jobs
Signed-off-by: Javan Lacerda <javanlacerda@google.com> update tests Signed-off-by: Javan Lacerda <javanlacerda@google.com> fix e2e tests Signed-off-by: Javan Lacerda <javanlacerda@google.com> lint Signed-off-by: Javan Lacerda <javanlacerda@google.com> prepare unscheduled_tasks Signed-off-by: Javan Lacerda <javanlacerda@google.com> fix Signed-off-by: Javan Lacerda <javanlacerda@google.com> fix Signed-off-by: Javan Lacerda <javanlacerda@google.com> add testcase based tasks to prepare tasks Signed-off-by: Javan Lacerda <javanlacerda@google.com> check Utask Signed-off-by: Javan Lacerda <javanlacerda@google.com> move to not ack not scheduled tasks instead of pushing to pre Signed-off-by: Javan Lacerda <javanlacerda@google.com> fixes Signed-off-by: Javan Lacerda <javanlacerda@google.com>
1 parent e139717 commit b90da76

17 files changed

Lines changed: 499 additions & 89 deletions

File tree

src/clusterfuzz/_internal/base/errors.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,10 @@ def __init__(self, key_name):
110110

111111
def __str__(self):
112112
return 'Invalid config key %s.' % self.key_name
113+
114+
115+
class QueueLimitReachedError(Error):
116+
"""Error thrown when the queue limit is reached."""
117+
118+
def __init__(self, size):
119+
super().__init__(f'Queue limit reached {size}.')

src/clusterfuzz/_internal/base/feature_flags.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ class FeatureFlags(Enum):
2727
TEST_FLOAT_FLAG = 'test_float_flag'
2828

2929
K8S_JOBS_FREQUENCY = 'k8s_jobs_frequency'
30+
K8S_JOBS_PENDING_LIMIT = 'k8s_jobs_pending_limit'
31+
32+
UTASK_MAIN_QUEUE_LIMIT = 'utask_main_queue_limit'
3033

3134
@property
3235
def flag(self):

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
from typing import List
2424
from typing import Optional
2525

26+
from google.cloud import monitoring_v3
27+
2628
from clusterfuzz._internal.base import external_tasks
2729
from clusterfuzz._internal.base import persistent_cache
2830
from clusterfuzz._internal.base import utils
@@ -31,6 +33,7 @@
3133
from clusterfuzz._internal.datastore import data_types
3234
from clusterfuzz._internal.datastore import ndb_utils
3335
from clusterfuzz._internal.fuzzing import fuzzer_selection
36+
from clusterfuzz._internal.google_cloud_utils import credentials
3437
from clusterfuzz._internal.google_cloud_utils import pubsub
3538
from clusterfuzz._internal.google_cloud_utils import storage
3639
from clusterfuzz._internal.metrics import logs
@@ -509,6 +512,7 @@ def __init__(self, pubsub_message):
509512
}
510513

511514
self.eta = datetime.datetime.utcfromtimestamp(float(self.attribute('eta')))
515+
self._should_ack = True
512516

513517
def attribute(self, key):
514518
"""Return attribute value."""
@@ -534,9 +538,14 @@ def defer(self):
534538
min(pubsub.MAX_ACK_DEADLINE, time_until_eta))
535539
return True
536540

541+
def cancel_lease_ack(self):
542+
"""Cancels acknowledgement of the lease."""
543+
self._should_ack = False
544+
537545
@contextlib.contextmanager
538546
def lease(self, _event=None): # pylint: disable=arguments-differ
539547
"""Maintain a lease for the task."""
548+
self._should_ack = True
540549
task_lease_timeout = TASK_LEASE_SECONDS_BY_COMMAND.get(
541550
self.command, get_task_lease_timeout())
542551

@@ -556,7 +565,8 @@ def lease(self, _event=None): # pylint: disable=arguments-differ
556565
leaser_thread.join()
557566

558567
# If we get here the task succeeded in running. Acknowledge the message.
559-
self._pubsub_message.ack()
568+
if self._should_ack:
569+
self._pubsub_message.ack()
560570
track_task_end()
561571

562572
def dont_retry(self):
@@ -678,6 +688,46 @@ def get_utask_mains() -> List[PubSubTask]:
678688
return handle_multiple_utask_main_messages(messages, UTASK_MAIN_QUEUE)
679689

680690

691+
def get_utask_main_queue_size():
692+
"""Returns the size of the utask main queue."""
693+
queue_name = UTASK_MAIN_QUEUE
694+
base_os_version = environment.get_value('BASE_OS_VERSION')
695+
if base_os_version:
696+
queue_name = f'{queue_name}-{base_os_version}'
697+
698+
application_id = utils.get_application_id()
699+
metric = 'pubsub.googleapis.com/subscription/num_undelivered_messages'
700+
query_filter = (f'metric.type="{metric}" AND '
701+
f'resource.labels.subscription_id="{queue_name}"')
702+
703+
try:
704+
creds = credentials.get_default()[0]
705+
client = monitoring_v3.MetricServiceClient(credentials=creds)
706+
707+
now = time.time()
708+
interval = monitoring_v3.TimeInterval(
709+
end_time={'seconds': int(now)},
710+
start_time={'seconds': int(now - 5 * 60)},
711+
)
712+
713+
results = client.list_time_series(
714+
request={
715+
'filter': query_filter,
716+
'interval': interval,
717+
'name': f'projects/{application_id}',
718+
'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
719+
})
720+
721+
for result in results:
722+
if not result.points:
723+
continue
724+
return result.points[0].value.int64_value
725+
except Exception:
726+
logs.error('Failed to get utask_main queue size.')
727+
728+
return 0
729+
730+
681731
def handle_multiple_utask_main_messages(messages, queue) -> List[PubSubTask]:
682732
"""Merges tasks specified in |messages| into a list for processing on this
683733
bot."""

src/clusterfuzz/_internal/batch/service.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,8 @@ def create_utask_main_job(self, module: str, job_type: str,
376376
remote_task_types.RemoteTask(command, job_type, input_download_url)
377377
]
378378
result = self.create_utask_main_jobs(batch_tasks)
379-
if result is None:
380-
return result
379+
if not result:
380+
return None
381381
return result[0]
382382

383383
def create_utask_main_jobs(self,
@@ -396,12 +396,10 @@ def create_utask_main_jobs(self,
396396
job_specs[spec].append(remote_task.input_download_url)
397397

398398
logs.info('Creating batch jobs.')
399-
jobs = []
400-
401399
logs.info('Batching utask_mains.')
402400
for spec, input_urls in job_specs.items():
403401
for input_urls_portion in utils.batched(input_urls,
404402
MAX_CONCURRENT_VMS_PER_JOB - 1):
405-
jobs.append(self.create_job(spec, input_urls_portion).name)
403+
self.create_job(spec, input_urls_portion).name
406404

407-
return jobs
405+
return []

src/clusterfuzz/_internal/bot/tasks/task_types.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
on base/tasks.py (i.e. avoiding circular imports)."""
1717

1818
from clusterfuzz._internal import swarming
19+
from clusterfuzz._internal.base import errors
20+
from clusterfuzz._internal.base import feature_flags
1921
from clusterfuzz._internal.base import tasks
2022
from clusterfuzz._internal.base.tasks import task_utils
2123
from clusterfuzz._internal.batch import service as batch_service
@@ -24,6 +26,8 @@
2426
from clusterfuzz._internal.metrics import logs
2527
from clusterfuzz._internal.system import environment
2628

29+
UTASK_MAIN_QUEUE_LIMIT_DEFAULT = 10000
30+
2731

2832
class BaseTask:
2933
"""Base module for tasks."""
@@ -154,6 +158,15 @@ def execute(self, task_argument, job_type, uworker_env):
154158
self.execute_locally(task_argument, job_type, uworker_env)
155159
return
156160

161+
utask_main_queue_size = tasks.get_utask_main_queue_size()
162+
163+
utask_main_queue_limit = UTASK_MAIN_QUEUE_LIMIT_DEFAULT
164+
utask_flag = feature_flags.FeatureFlags.UTASK_MAIN_QUEUE_LIMIT.flag
165+
if utask_flag and utask_flag.enabled:
166+
utask_main_queue_limit = utask_flag.content
167+
if utask_main_queue_size > utask_main_queue_limit:
168+
raise errors.QueueLimitReachedError(utask_main_queue_size)
169+
157170
logs.info('Preprocessing utask.')
158171
download_url = self.preprocess(task_argument, job_type, uworker_env)
159172
if download_url is None:

src/clusterfuzz/_internal/k8s/service.py

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import collections
1717
import ipaddress
1818
import os
19-
import typing
2019
import uuid
2120

2221
import google.auth
@@ -26,6 +25,7 @@
2625
from kubernetes import client as k8s_client
2726
import yaml
2827

28+
from clusterfuzz._internal.base import feature_flags
2929
from clusterfuzz._internal.base import tasks
3030
from clusterfuzz._internal.base import utils
3131
from clusterfuzz._internal.base.tasks import task_utils
@@ -39,6 +39,8 @@
3939
CLUSTER_NAME = project_config = local_config.ProjectConfig().get(
4040
'cluster_name', 'clusterfuzz-cronjobs-gke')
4141

42+
K8S_JOBS_PENDING_LIMIT_DEFAULT = 1000
43+
4244
KubernetesJobConfig = collections.namedtuple('KubernetesJobConfig', [
4345
'job_type',
4446
'docker_image',
@@ -50,7 +52,7 @@
5052
])
5153

5254

53-
def _get_config_names(remote_tasks: typing.List[remote_task_types.RemoteTask]):
55+
def _get_config_names(remote_tasks: list[remote_task_types.RemoteTask]):
5456
"""Gets the name of the configs for each batch_task. Returns a dict
5557
5658
that is indexed by command and job_type for efficient lookup."""
@@ -87,9 +89,8 @@ def _get_config_names(remote_tasks: typing.List[remote_task_types.RemoteTask]):
8789
return config_map
8890

8991

90-
def _get_k8s_job_configs(
91-
remote_tasks: typing.List[remote_task_types.RemoteTask]
92-
) -> typing.Dict[typing.Tuple[str, str], KubernetesJobConfig]:
92+
def _get_k8s_job_configs(remote_tasks: list[remote_task_types.RemoteTask]
93+
) -> dict[tuple[str, str], KubernetesJobConfig]:
9394
"""Gets the configured specifications for a batch workload."""
9495

9596
if not remote_tasks:
@@ -283,37 +284,46 @@ def _get_pending_jobs_count(self) -> int:
283284

284285
def create_utask_main_job(self, module: str, job_type: str,
285286
input_download_url: str):
286-
"""Creates a single batch job for a uworker main task."""
287+
"""Creates a single Kubernetes job for a uworker main task."""
287288

288289
command = task_utils.get_command_from_module(module)
289290
batch_tasks = [
290291
remote_task_types.RemoteTask(command, job_type, input_download_url)
291292
]
292-
result = self.create_utask_main_jobs(batch_tasks)
293-
294-
if result is None:
295-
return result
296-
return result[0]
293+
uncreated_tasks = self.create_utask_main_jobs(batch_tasks)
294+
return uncreated_tasks
297295

298-
def create_utask_main_jobs(
299-
self, remote_tasks: typing.List[remote_task_types.RemoteTask]):
300-
"""Creates a batch job for a list of uworker main tasks.
296+
def create_utask_main_jobs(self,
297+
remote_tasks: list[remote_task_types.RemoteTask]):
298+
"""Creates Kubernetes jobs for a list of uworker main tasks.
301299
302-
This method groups the tasks by their workload specification and creates a
303-
separate batch job for each group. This allows tasks with similar
304-
requirements to be processed together, which can improve efficiency.
300+
This method groups the tasks by their workload specification to efficiently
301+
schedule them. It then iterates through the groups and creates a separate
302+
Kubernetes job for each task.
305303
"""
304+
305+
k8s_limit_flag = feature_flags.FeatureFlags.K8S_JOBS_PENDING_LIMIT
306+
if k8s_limit_flag.content and k8s_limit_flag.enabled:
307+
limit = int(k8s_limit_flag.content)
308+
else:
309+
limit = K8S_JOBS_PENDING_LIMIT_DEFAULT
310+
311+
pending_jobs_count = self._get_pending_jobs_count()
312+
if pending_jobs_count >= limit:
313+
logs.warning(
314+
f'Pending jobs count {pending_jobs_count} reached limit {limit} '
315+
f'for k8s.')
316+
return remote_tasks
317+
306318
job_specs = collections.defaultdict(list)
307319
configs = _get_k8s_job_configs(remote_tasks)
308320
for remote_task in remote_tasks:
309-
logs.info(f'Scheduling {remote_task.command}, {remote_task.job_type}.')
321+
logs.info(
322+
f'Scheduling {remote_task.command}, {remote_task.job_type} in K8s.')
310323
config = configs[(remote_task.command, remote_task.job_type)]
311324
job_specs[config].append(remote_task.input_download_url)
312-
logs.info('Creating batch jobs.')
313-
jobs = []
314-
logs.info('Batching utask_mains.')
325+
logs.info('Creating Kubernetes jobs.')
315326
for config, input_urls in job_specs.items():
316327
for input_url in input_urls:
317-
jobs.append(self.create_job(config, input_url))
318-
319-
return jobs
328+
self.create_job(config, input_url)
329+
return []

src/clusterfuzz/_internal/remote_task/remote_task_gate.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import collections
2323
import random
24-
from typing import List
2524

2625
from clusterfuzz._internal.metrics import logs
2726
from clusterfuzz._internal.remote_task import remote_task_adapters
@@ -106,12 +105,13 @@ def get_job_frequency(self):
106105
return frequencies
107106

108107
def create_utask_main_job(self, module, job_type, input_download_url):
108+
"""Creates a single remote task, selecting a backend dynamically."""
109109
adapter_id = self._get_adapter()
110110
service = self._service_map[adapter_id]
111111
return service.create_utask_main_job(module, job_type, input_download_url)
112112

113113
def create_utask_main_jobs(self,
114-
remote_tasks: List[remote_task_types.RemoteTask]):
114+
remote_tasks: list[remote_task_types.RemoteTask]):
115115
"""Creates a batch of remote tasks, distributing them across backends.
116116
117117
This method handles two cases:
@@ -128,6 +128,7 @@ def create_utask_main_jobs(self,
128128
# For a single task, use a random distribution.
129129
adapter_id = self._get_adapter()
130130
tasks_by_adapter[adapter_id].extend(remote_tasks)
131+
unscheduled_tasks = []
131132
else:
132133
# For multiple tasks, use deterministic slicing to ensure the
133134
# distribution precisely matches the frequency configuration.
@@ -139,20 +140,25 @@ def create_utask_main_jobs(self,
139140
remote_tasks[start_index:start_index + count])
140141
start_index += count
141142

142-
# Distribute any remainder tasks (due to rounding) one by one. This
143-
# ensures that all tasks are assigned to a backend.
144143
remaining_tasks = remote_tasks[start_index:]
145-
for i, task in enumerate(remaining_tasks):
146-
adapter_id = list(frequencies.keys())[i % len(frequencies)]
147-
tasks_by_adapter[adapter_id].append(task)
144+
if sum(frequencies.values()) >= 0.999:
145+
# Distribute any remainder tasks (due to rounding) one by one. This
146+
# ensures that all tasks are assigned to a backend.
147+
for i, task in enumerate(remaining_tasks):
148+
adapter_id = list(frequencies.keys())[i % len(frequencies)]
149+
tasks_by_adapter[adapter_id].append(task)
150+
unscheduled_tasks = []
151+
else:
152+
unscheduled_tasks = list(remaining_tasks)
148153

149-
results = []
150154
for adapter_id, tasks in tasks_by_adapter.items():
151155
if tasks:
152156
try:
153157
logs.info(f'Sending {len(tasks)} tasks to {adapter_id}.')
154158
service = self._service_map[adapter_id]
155-
results.extend(service.create_utask_main_jobs(tasks))
159+
unscheduled_tasks.extend(service.create_utask_main_jobs(tasks))
156160
except Exception: # pylint: disable=broad-except
157161
logs.error(f'Failed to send {len(tasks)} tasks to {adapter_id}.')
158-
return results
162+
unscheduled_tasks.extend(tasks)
163+
164+
return unscheduled_tasks

src/clusterfuzz/_internal/remote_task/remote_task_types.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515

1616
import abc
1717

18+
from clusterfuzz._internal.base import tasks
1819

19-
class RemoteTask:
20+
21+
class RemoteTask(tasks.Task):
2022
"""Represents a single ClusterFuzz task to be executed on a remote worker.
2123
2224
This class holds the necessary information to execute a ClusterFuzz command,
@@ -25,6 +27,7 @@ class RemoteTask:
2527
"""
2628

2729
def __init__(self, command, job_type, input_download_url, pubsub_task=None):
30+
super().__init__(command, input_download_url, job_type)
2831
self.command = command
2932
self.job_type = job_type
3033
self.input_download_url = input_download_url
@@ -42,10 +45,15 @@ class RemoteTaskInterface(abc.ABC):
4245
@abc.abstractmethod
4346
def create_utask_main_job(self, module: str, job_type: str,
4447
input_download_url: str):
45-
"""Creates a single remote task for a uworker main task."""
48+
"""Creates a single remote task for a uworker main task.
49+
Returns the task that couldn't be created.
50+
"""
4651
raise NotImplementedError
4752

4853
@abc.abstractmethod
49-
def create_utask_main_jobs(self, remote_tasks: list[RemoteTask]):
50-
"""Creates many remote tasks for uworker main tasks."""
54+
def create_utask_main_jobs(
55+
self, remote_tasks: list[RemoteTask]) -> list[RemoteTask]:
56+
"""Creates many remote tasks for uworker main tasks.
57+
Returns the tasks that couldn't be created.
58+
"""
5159
raise NotImplementedError

0 commit comments

Comments
 (0)