-
Notifications
You must be signed in to change notification settings - Fork 614
Implement Kubernetes job limiter #5150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,14 +16,19 @@ | |
| on base/tasks.py (i.e. avoiding circular imports).""" | ||
|
|
||
| from clusterfuzz._internal import swarming | ||
| from clusterfuzz._internal.base import errors | ||
| from clusterfuzz._internal.base import feature_flags | ||
| from clusterfuzz._internal.base import tasks | ||
| from clusterfuzz._internal.base.tasks import task_utils | ||
| from clusterfuzz._internal.base.tasks import UTASK_MAIN_QUEUE | ||
| from clusterfuzz._internal.batch import service as batch_service | ||
| from clusterfuzz._internal.bot.tasks import utasks | ||
| from clusterfuzz._internal.metrics import events | ||
| from clusterfuzz._internal.metrics import logs | ||
| from clusterfuzz._internal.system import environment | ||
|
|
||
| UTASK_MAIN_QUEUE_LIMIT_DEFAULT = 10000 | ||
|
|
||
|
|
||
| class BaseTask: | ||
| """Base module for tasks.""" | ||
|
|
@@ -154,6 +159,18 @@ def execute(self, task_argument, job_type, uworker_env): | |
| self.execute_locally(task_argument, job_type, uworker_env) | ||
| return | ||
|
|
||
| utask_main_queue_size = tasks.get_utask_main_queue_size() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if this is in the correct place, shouldn't this be on utask main? Here the task would be doing preprocess.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correcting myself here, think I got it, the idea is to not even execute preprocess if the utask main queue is full, right?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exactly! |
||
|
|
||
| utask_main_queue_limit = UTASK_MAIN_QUEUE_LIMIT_DEFAULT | ||
| utask_flag = feature_flags.FeatureFlags.UTASK_MAIN_QUEUE_LIMIT.flag | ||
| if utask_flag and utask_flag.enabled: | ||
| utask_main_queue_limit = utask_flag.content | ||
| if utask_main_queue_size > utask_main_queue_limit: | ||
| base_os_version = environment.get_value('BASE_OS_VERSION') | ||
| queue_name = UTASK_MAIN_QUEUE if not base_os_version else \ | ||
| f'{UTASK_MAIN_QUEUE}-{base_os_version}' | ||
| raise errors.QueueLimitReachedError(utask_main_queue_size, queue_name) | ||
|
|
||
| logs.info('Preprocessing utask.') | ||
| download_url = self.preprocess(task_argument, job_type, uworker_env) | ||
| if download_url is None: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,6 @@ | |
|
|
||
| import collections | ||
| import random | ||
| from typing import List | ||
|
|
||
| from clusterfuzz._internal.metrics import logs | ||
| from clusterfuzz._internal.remote_task import remote_task_adapters | ||
|
|
@@ -106,12 +105,13 @@ def get_job_frequency(self): | |
| return frequencies | ||
|
|
||
| def create_utask_main_job(self, module, job_type, input_download_url): | ||
| """Creates a single remote task, selecting a backend dynamically.""" | ||
| adapter_id = self._get_adapter() | ||
| service = self._service_map[adapter_id] | ||
| return service.create_utask_main_job(module, job_type, input_download_url) | ||
|
|
||
| def create_utask_main_jobs(self, | ||
| remote_tasks: List[remote_task_types.RemoteTask]): | ||
| remote_tasks: list[remote_task_types.RemoteTask]): | ||
| """Creates a batch of remote tasks, distributing them across backends. | ||
|
|
||
| This method handles two cases: | ||
|
|
@@ -128,6 +128,7 @@ def create_utask_main_jobs(self, | |
| # For a single task, use a random distribution. | ||
| adapter_id = self._get_adapter() | ||
| tasks_by_adapter[adapter_id].extend(remote_tasks) | ||
| unscheduled_tasks = [] | ||
| else: | ||
| # For multiple tasks, use deterministic slicing to ensure the | ||
| # distribution precisely matches the frequency configuration. | ||
|
|
@@ -139,20 +140,25 @@ def create_utask_main_jobs(self, | |
| remote_tasks[start_index:start_index + count]) | ||
| start_index += count | ||
|
|
||
| # Distribute any remainder tasks (due to rounding) one by one. This | ||
| # ensures that all tasks are assigned to a backend. | ||
| remaining_tasks = remote_tasks[start_index:] | ||
| for i, task in enumerate(remaining_tasks): | ||
| adapter_id = list(frequencies.keys())[i % len(frequencies)] | ||
| tasks_by_adapter[adapter_id].append(task) | ||
| if sum(frequencies.values()) >= 0.999: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I understand this logic, if the frequencies don't add up to 100%, we are simply not scheduling all tasks?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's the idea. Because as the scheduling of the tasks has a fixed flow, we should be able to use the gate not only to set a proportion for scheduling all task, but also creating a bottleneck for any adapter. For instance, considering the fleet of utask-main-schedulers is able to schedule 1000 tasks per minute, we should be able to set 10% for Batch and 5% for K8s, and then, we know that we'll have 100 tasks per minute being scheduled to Batch and 50 for K8s. |
||
| # Distribute any remainder tasks (due to rounding) one by one. This | ||
| # ensures that all tasks are assigned to a backend. | ||
| for i, task in enumerate(remaining_tasks): | ||
| adapter_id = list(frequencies.keys())[i % len(frequencies)] | ||
| tasks_by_adapter[adapter_id].append(task) | ||
| unscheduled_tasks = [] | ||
| else: | ||
| unscheduled_tasks = list(remaining_tasks) | ||
|
|
||
| results = [] | ||
| for adapter_id, tasks in tasks_by_adapter.items(): | ||
| if tasks: | ||
| try: | ||
| logs.info(f'Sending {len(tasks)} tasks to {adapter_id}.') | ||
| service = self._service_map[adapter_id] | ||
| results.extend(service.create_utask_main_jobs(tasks)) | ||
| unscheduled_tasks.extend(service.create_utask_main_jobs(tasks)) | ||
| except Exception: # pylint: disable=broad-except | ||
| logs.error(f'Failed to send {len(tasks)} tasks to {adapter_id}.') | ||
| return results | ||
| unscheduled_tasks.extend(tasks) | ||
|
|
||
| return unscheduled_tasks | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to have a method to get the queue_name with the optional base_os_version, to centralize this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but this is the pattern that is being used in the code, building the queue name in the same place that uses it.
Ideally we should create this centralized function and to refactor the code for using it.