Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/clusterfuzz/_internal/base/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,10 @@ def __init__(self, key_name):

def __str__(self):
return 'Invalid config key %s.' % self.key_name


class QueueLimitReachedError(Error):
"""Error thrown when the queue limit is reached."""

def __init__(self, size, queue):
super().__init__(f'Queue {queue} has reached the limit of {size}.')
5 changes: 5 additions & 0 deletions src/clusterfuzz/_internal/base/feature_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ class FeatureFlags(Enum):
TEST_FLOAT_FLAG = 'test_float_flag'

K8S_JOBS_FREQUENCY = 'k8s_jobs_frequency'
K8S_JOBS_PENDING_LIMIT = 'k8s_jobs_pending_limit'

UTASK_MAIN_QUEUE_LIMIT = 'utask_main_queue_limit'

GCP_BATCH_JOBS_FREQUENCY = 'gcp_batch_jobs_frequency'

@property
def flag(self):
Expand Down
58 changes: 57 additions & 1 deletion src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from typing import List
from typing import Optional

from google.cloud import monitoring_v3

from clusterfuzz._internal.base import external_tasks
from clusterfuzz._internal.base import persistent_cache
from clusterfuzz._internal.base import utils
Expand All @@ -31,6 +33,7 @@
from clusterfuzz._internal.datastore import data_types
from clusterfuzz._internal.datastore import ndb_utils
from clusterfuzz._internal.fuzzing import fuzzer_selection
from clusterfuzz._internal.google_cloud_utils import credentials
from clusterfuzz._internal.google_cloud_utils import pubsub
from clusterfuzz._internal.google_cloud_utils import storage
from clusterfuzz._internal.metrics import logs
Expand Down Expand Up @@ -106,6 +109,12 @@ def get_task_duration(command):
# scheduling on batch.
MAX_UTASKS = 3000

# Time window to get the metrics.
# We should look for metrics in
# start = now - _QUEUE_LIMIT_INTERVAL
# end = now
_QUEUE_LIMIT_INTERVAL = 5 * 60 # 5 minutes.


class Error(Exception):
"""Base exception class."""
Expand Down Expand Up @@ -509,6 +518,7 @@ def __init__(self, pubsub_message):
}

self.eta = datetime.datetime.utcfromtimestamp(float(self.attribute('eta')))
self._should_ack = True

def attribute(self, key):
"""Return attribute value."""
Expand All @@ -534,9 +544,14 @@ def defer(self):
min(pubsub.MAX_ACK_DEADLINE, time_until_eta))
return True

def cancel_lease_ack(self):
"""Cancels acknowledgement of the lease."""
self._should_ack = False

@contextlib.contextmanager
def lease(self, _event=None): # pylint: disable=arguments-differ
"""Maintain a lease for the task."""
self._should_ack = True
task_lease_timeout = TASK_LEASE_SECONDS_BY_COMMAND.get(
self.command, get_task_lease_timeout())

Expand All @@ -556,7 +571,8 @@ def lease(self, _event=None): # pylint: disable=arguments-differ
leaser_thread.join()

# If we get here the task succeeded in running. Acknowledge the message.
self._pubsub_message.ack()
if self._should_ack:
self._pubsub_message.ack()
track_task_end()

def dont_retry(self):
Expand Down Expand Up @@ -678,6 +694,46 @@ def get_utask_mains() -> List[PubSubTask]:
return handle_multiple_utask_main_messages(messages, UTASK_MAIN_QUEUE)


def get_utask_main_queue_size():
"""Returns the size of the utask main queue."""
queue_name = UTASK_MAIN_QUEUE
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have a method to get the queue_name with the optional base_os_version, to centralize this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but this is the pattern that is being used in the code, building the queue name in the same place that uses it.

Ideally we should create this centralized function and to refactor the code for using it.

base_os_version = environment.get_value('BASE_OS_VERSION')
if base_os_version:
queue_name = f'{queue_name}-{base_os_version}'

application_id = utils.get_application_id()
metric = 'pubsub.googleapis.com/subscription/num_undelivered_messages'
query_filter = (f'metric.type="{metric}" AND '
f'resource.labels.subscription_id="{queue_name}"')

try:
creds = credentials.get_default()[0]
client = monitoring_v3.MetricServiceClient(credentials=creds)

now = time.time()
interval = monitoring_v3.TimeInterval(
end_time={'seconds': int(now)},
start_time={'seconds': int(now - _QUEUE_LIMIT_INTERVAL)},
)

results = client.list_time_series(
request={
'filter': query_filter,
'interval': interval,
'name': f'projects/{application_id}',
'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
})

for result in results:
if not result.points:
continue
return result.points[0].value.int64_value
except Exception:
logs.error('Failed to get utask_main queue size.')

return 0


def handle_multiple_utask_main_messages(messages, queue) -> List[PubSubTask]:
"""Merges tasks specified in |messages| into a list for processing on this
bot."""
Expand Down
10 changes: 4 additions & 6 deletions src/clusterfuzz/_internal/batch/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ def create_utask_main_job(self, module: str, job_type: str,
remote_task_types.RemoteTask(command, job_type, input_download_url)
]
result = self.create_utask_main_jobs(batch_tasks)
if result is None:
return result
if not result:
return None
return result[0]

def create_utask_main_jobs(self,
Expand All @@ -396,12 +396,10 @@ def create_utask_main_jobs(self,
job_specs[spec].append(remote_task.input_download_url)

logs.info('Creating batch jobs.')
jobs = []

logs.info('Batching utask_mains.')
for spec, input_urls in job_specs.items():
for input_urls_portion in utils.batched(input_urls,
MAX_CONCURRENT_VMS_PER_JOB - 1):
jobs.append(self.create_job(spec, input_urls_portion).name)
self.create_job(spec, input_urls_portion).name

return jobs
return []
Comment thread
ViniciustCosta marked this conversation as resolved.
17 changes: 17 additions & 0 deletions src/clusterfuzz/_internal/bot/tasks/task_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@
on base/tasks.py (i.e. avoiding circular imports)."""

from clusterfuzz._internal import swarming
from clusterfuzz._internal.base import errors
from clusterfuzz._internal.base import feature_flags
from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.base.tasks import task_utils
from clusterfuzz._internal.base.tasks import UTASK_MAIN_QUEUE
from clusterfuzz._internal.batch import service as batch_service
from clusterfuzz._internal.bot.tasks import utasks
from clusterfuzz._internal.metrics import events
from clusterfuzz._internal.metrics import logs
from clusterfuzz._internal.system import environment

UTASK_MAIN_QUEUE_LIMIT_DEFAULT = 10000


class BaseTask:
"""Base module for tasks."""
Expand Down Expand Up @@ -154,6 +159,18 @@ def execute(self, task_argument, job_type, uworker_env):
self.execute_locally(task_argument, job_type, uworker_env)
return

utask_main_queue_size = tasks.get_utask_main_queue_size()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is in the correct place, shouldn't this be on utask main? Here the task would be doing preprocess.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correcting myself here, think I got it, the idea is to not even execute preprocess if the utask main queue is full, right?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly!


utask_main_queue_limit = UTASK_MAIN_QUEUE_LIMIT_DEFAULT
utask_flag = feature_flags.FeatureFlags.UTASK_MAIN_QUEUE_LIMIT.flag
if utask_flag and utask_flag.enabled:
utask_main_queue_limit = utask_flag.content
if utask_main_queue_size > utask_main_queue_limit:
base_os_version = environment.get_value('BASE_OS_VERSION')
queue_name = UTASK_MAIN_QUEUE if not base_os_version else \
f'{UTASK_MAIN_QUEUE}-{base_os_version}'
raise errors.QueueLimitReachedError(utask_main_queue_size, queue_name)

logs.info('Preprocessing utask.')
download_url = self.preprocess(task_argument, job_type, uworker_env)
if download_url is None:
Expand Down
58 changes: 34 additions & 24 deletions src/clusterfuzz/_internal/k8s/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import collections
import ipaddress
import os
import typing
import uuid

import google.auth
Expand All @@ -26,6 +25,7 @@
from kubernetes import client as k8s_client
import yaml

from clusterfuzz._internal.base import feature_flags
from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.base.tasks import task_utils
Expand All @@ -39,6 +39,8 @@
CLUSTER_NAME = project_config = local_config.ProjectConfig().get(
'cluster_name', 'clusterfuzz-cronjobs-gke')

K8S_JOBS_PENDING_LIMIT_DEFAULT = 1000

KubernetesJobConfig = collections.namedtuple('KubernetesJobConfig', [
'job_type',
'docker_image',
Expand All @@ -50,7 +52,7 @@
])


def _get_config_names(remote_tasks: typing.List[remote_task_types.RemoteTask]):
def _get_config_names(remote_tasks: list[remote_task_types.RemoteTask]):
"""Gets the name of the configs for each batch_task. Returns a dict

that is indexed by command and job_type for efficient lookup."""
Expand Down Expand Up @@ -87,9 +89,8 @@ def _get_config_names(remote_tasks: typing.List[remote_task_types.RemoteTask]):
return config_map


def _get_k8s_job_configs(
remote_tasks: typing.List[remote_task_types.RemoteTask]
) -> typing.Dict[typing.Tuple[str, str], KubernetesJobConfig]:
def _get_k8s_job_configs(remote_tasks: list[remote_task_types.RemoteTask]
) -> dict[tuple[str, str], KubernetesJobConfig]:
"""Gets the configured specifications for a batch workload."""

if not remote_tasks:
Expand Down Expand Up @@ -283,37 +284,46 @@ def _get_pending_jobs_count(self) -> int:

def create_utask_main_job(self, module: str, job_type: str,
input_download_url: str):
"""Creates a single batch job for a uworker main task."""
"""Creates a single Kubernetes job for a uworker main task."""

command = task_utils.get_command_from_module(module)
batch_tasks = [
remote_task_types.RemoteTask(command, job_type, input_download_url)
]
result = self.create_utask_main_jobs(batch_tasks)

if result is None:
return result
return result[0]
uncreated_tasks = self.create_utask_main_jobs(batch_tasks)
return uncreated_tasks

def create_utask_main_jobs(
self, remote_tasks: typing.List[remote_task_types.RemoteTask]):
"""Creates a batch job for a list of uworker main tasks.
def create_utask_main_jobs(self,
remote_tasks: list[remote_task_types.RemoteTask]):
"""Creates Kubernetes jobs for a list of uworker main tasks.

This method groups the tasks by their workload specification and creates a
separate batch job for each group. This allows tasks with similar
requirements to be processed together, which can improve efficiency.
This method groups the tasks by their workload specification to efficiently
schedule them. It then iterates through the groups and creates a separate
Kubernetes job for each task.
"""

k8s_limit_flag = feature_flags.FeatureFlags.K8S_JOBS_PENDING_LIMIT
if k8s_limit_flag.content and k8s_limit_flag.enabled:
limit = int(k8s_limit_flag.content)
else:
limit = K8S_JOBS_PENDING_LIMIT_DEFAULT

pending_jobs_count = self._get_pending_jobs_count()
if pending_jobs_count >= limit:
logs.warning(
f'Pending jobs count {pending_jobs_count} reached limit {limit} '
f'for k8s.')
return remote_tasks

job_specs = collections.defaultdict(list)
configs = _get_k8s_job_configs(remote_tasks)
for remote_task in remote_tasks:
logs.info(f'Scheduling {remote_task.command}, {remote_task.job_type}.')
logs.info(
f'Scheduling {remote_task.command}, {remote_task.job_type} in K8s.')
config = configs[(remote_task.command, remote_task.job_type)]
job_specs[config].append(remote_task.input_download_url)
logs.info('Creating batch jobs.')
jobs = []
logs.info('Batching utask_mains.')
logs.info('Creating Kubernetes jobs.')
for config, input_urls in job_specs.items():
for input_url in input_urls:
jobs.append(self.create_job(config, input_url))

return jobs
self.create_job(config, input_url)
return []
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class RemoteTaskAdapters(Enum):
"""
KUBERNETES = ('kubernetes', k8s_service.KubernetesService,
feature_flags.FeatureFlags.K8S_JOBS_FREQUENCY, 0.0)
GCP_BATCH = ('gcp_batch', batch_service.GcpBatchService, None, 1.0)
GCP_BATCH = ('gcp_batch', batch_service.GcpBatchService,
feature_flags.FeatureFlags.GCP_BATCH_JOBS_FREQUENCY, 1.0)

def __init__(self, adapter_id, service, feature_flag, default_weight):
self.id = adapter_id
Expand Down
26 changes: 16 additions & 10 deletions src/clusterfuzz/_internal/remote_task/remote_task_gate.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import collections
import random
from typing import List

from clusterfuzz._internal.metrics import logs
from clusterfuzz._internal.remote_task import remote_task_adapters
Expand Down Expand Up @@ -106,12 +105,13 @@ def get_job_frequency(self):
return frequencies

def create_utask_main_job(self, module, job_type, input_download_url):
"""Creates a single remote task, selecting a backend dynamically."""
adapter_id = self._get_adapter()
service = self._service_map[adapter_id]
return service.create_utask_main_job(module, job_type, input_download_url)

def create_utask_main_jobs(self,
remote_tasks: List[remote_task_types.RemoteTask]):
remote_tasks: list[remote_task_types.RemoteTask]):
"""Creates a batch of remote tasks, distributing them across backends.

This method handles two cases:
Expand All @@ -128,6 +128,7 @@ def create_utask_main_jobs(self,
# For a single task, use a random distribution.
adapter_id = self._get_adapter()
tasks_by_adapter[adapter_id].extend(remote_tasks)
unscheduled_tasks = []
else:
# For multiple tasks, use deterministic slicing to ensure the
# distribution precisely matches the frequency configuration.
Expand All @@ -139,20 +140,25 @@ def create_utask_main_jobs(self,
remote_tasks[start_index:start_index + count])
start_index += count

# Distribute any remainder tasks (due to rounding) one by one. This
# ensures that all tasks are assigned to a backend.
remaining_tasks = remote_tasks[start_index:]
for i, task in enumerate(remaining_tasks):
adapter_id = list(frequencies.keys())[i % len(frequencies)]
tasks_by_adapter[adapter_id].append(task)
if sum(frequencies.values()) >= 0.999:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this logic, if the frequencies don't add up to 100%, we are simply not scheduling all tasks?

Copy link
Copy Markdown
Collaborator Author

@javanlacerda javanlacerda Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the idea. Because as the scheduling of the tasks has a fixed flow, we should be able to use the gate not only to set a proportion for scheduling all task, but also creating a bottleneck for any adapter.

For instance, considering the fleet of utask-main-schedulers is able to schedule 1000 tasks per minute, we should be able to set 10% for Batch and 5% for K8s, and then, we know that we'll have 100 tasks per minute being scheduled to Batch and 50 for K8s.

# Distribute any remainder tasks (due to rounding) one by one. This
# ensures that all tasks are assigned to a backend.
for i, task in enumerate(remaining_tasks):
adapter_id = list(frequencies.keys())[i % len(frequencies)]
tasks_by_adapter[adapter_id].append(task)
unscheduled_tasks = []
else:
unscheduled_tasks = list(remaining_tasks)

results = []
for adapter_id, tasks in tasks_by_adapter.items():
if tasks:
try:
logs.info(f'Sending {len(tasks)} tasks to {adapter_id}.')
service = self._service_map[adapter_id]
results.extend(service.create_utask_main_jobs(tasks))
unscheduled_tasks.extend(service.create_utask_main_jobs(tasks))
except Exception: # pylint: disable=broad-except
logs.error(f'Failed to send {len(tasks)} tasks to {adapter_id}.')
return results
unscheduled_tasks.extend(tasks)

return unscheduled_tasks
Loading
Loading