Skip to content

Commit 7c57a49

Browse files
committed
deal with circular import
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
1 parent fcc67fe commit 7c57a49

9 files changed

Lines changed: 190 additions & 181 deletions

File tree

src/clusterfuzz/_internal/remote_task/__init__.py

Lines changed: 1 addition & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -11,151 +11,4 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
"""Remote task interface.
15-
16-
This module defines the interface for a remote task execution client. This
17-
abstraction allows ClusterFuzz to support multiple remote execution
18-
environments, such as GCP Batch and Kubernetes, without tightly coupling
19-
the task creation logic to a specific implementation.
20-
"""
21-
import abc
22-
import collections
23-
from enum import Enum
24-
import random
25-
from typing import List
26-
27-
from clusterfuzz._internal.datastore import feature_flags
28-
from clusterfuzz._internal.metrics import logs
29-
from clusterfuzz._internal.remote_task import remote_task_types
30-
31-
32-
class RemoteTaskGate(remote_task_types.RemoteTaskInterface):
33-
"""A generic dispatcher for remote task execution.
34-
35-
This class acts as a high-level manager that abstracts away the specific
36-
details of the underlying remote execution backends. It uses the frequencies
37-
defined in this module to dynamically choose a backend for each task,
38-
allowing for flexible distribution and A/B testing.
39-
"""
40-
41-
def __init__(self):
42-
# Avoiding circular import
43-
from clusterfuzz._internal.remote_task import remote_task_adapters
44-
45-
# Instantiate and cache the service clients for each defined adapter.
46-
self._service_map = {
47-
adapter.id: adapter.service()
48-
for adapter in remote_task_adapters.RemoteTaskAdapters
49-
}
50-
self._adapters = remote_task_adapters.RemoteTaskAdapters
51-
52-
def _get_adapter(self) -> str:
53-
"""Performs a weighted random choice to select a remote backend.
54-
55-
This method is used when creating a single task, ensuring that the
56-
distribution of tasks over time aligns with the configured frequencies.
57-
"""
58-
frequencies = self.get_job_frequency()
59-
population = list(frequencies.keys())
60-
weights = list(frequencies.values())
61-
return random.choices(population, weights)[0]
62-
63-
def get_job_frequency(self):
64-
"""Returns the frequency distribution for all remote task adapters.
65-
66-
This function calculates the proportion of tasks that should be sent to each
67-
remote backend defined in the `RemoteTaskAdapters` enum. The calculation
68-
is based on feature flags, default weights, and ensures the total
69-
distribution sums to 1.0.
70-
71-
The order of adapters in the enum matters, as this function processes them
72-
sequentially, and any remaining weight to sum to 1.0 is assigned to the
73-
last adapter.
74-
75-
Returns:
76-
A dictionary mapping each adapter's ID (e.g., 'gcp_batch') to its
77-
calculated frequency (a float between 0.0 and 1.0).
78-
"""
79-
frequencies = {adapter.id: 0.0 for adapter in self._adapters}
80-
total_weight = 0.0
81-
82-
for adapter in self._adapters:
83-
default_weight = adapter.default_weight
84-
feature_flag = adapter.feature_flag
85-
weight = default_weight
86-
87-
# A feature flag can override the default weight for an adapter, allowing
88-
# for dynamic adjustments to task distribution.
89-
if (feature_flag and feature_flag.enabled and
90-
isinstance(feature_flag.content, float)):
91-
feature_flag_weight = feature_flag.content
92-
if 0 <= feature_flag_weight <= 1:
93-
weight = feature_flag_weight
94-
95-
if total_weight >= 1.0 and weight > 0.0:
96-
logs.warning(
97-
'Total weight for jobs frequency bigger than 1.0. Adapter starving',
98-
adapter=adapter.id)
99-
break
100-
101-
# Ensure the cumulative weight does not exceed 1.0. If adding the
102-
# current weight would push the total over, we cap it.
103-
if weight + total_weight > 1.0:
104-
weight = 1.0 - total_weight
105-
106-
total_weight += weight
107-
frequencies[adapter.id] = weight if weight >= 0 else 0.0
108-
109-
logs.info('Job frequencies', frequencies=frequencies)
110-
return frequencies
111-
112-
def create_utask_main_job(self, module: str, job_type: str,
113-
input_download_url: str):
114-
"""Creates a single remote task on a dynamically chosen backend."""
115-
adapter_id = self._get_adapter()
116-
service = self._service_map[adapter_id]
117-
return service.create_utask_main_job(module, job_type, input_download_url)
118-
119-
def create_utask_main_jobs(self,
120-
remote_tasks: List[remote_task_types.RemoteTask]):
121-
"""Creates a batch of remote tasks, distributing them across backends.
122-
123-
This method handles two cases:
124-
1. If there is only one task, it uses a weighted random choice to select
125-
a backend, similar to `create_utask_main_job`.
126-
2. If there are multiple tasks, it distributes them deterministically
127-
across the available backends based on their configured frequencies.
128-
This ensures that a batch of 100 tasks with a 70/30 split sends
129-
exactly 70 tasks to one backend and 30 to the other.
130-
"""
131-
tasks_by_adapter = collections.defaultdict(list)
132-
133-
if len(remote_tasks) == 1:
134-
# For a single task, use a random distribution.
135-
adapter_id = self._get_adapter()
136-
tasks_by_adapter[adapter_id].extend(remote_tasks)
137-
else:
138-
# For multiple tasks, use deterministic slicing to ensure the
139-
# distribution precisely matches the frequency configuration.
140-
frequencies = self.get_job_frequency()
141-
start_index = 0
142-
for adapter_id, frequency in frequencies.items():
143-
count = int(len(remote_tasks) * frequency)
144-
tasks_by_adapter[adapter_id].extend(
145-
remote_tasks[start_index:start_index + count])
146-
start_index += count
147-
148-
# Distribute any remainder tasks (due to rounding) one by one. This
149-
# ensures that all tasks are assigned to a backend.
150-
remaining_tasks = remote_tasks[start_index:]
151-
for i, task in enumerate(remaining_tasks):
152-
adapter_id = list(frequencies.keys())[i % len(frequencies)]
153-
tasks_by_adapter[adapter_id].append(task)
154-
155-
results = []
156-
for adapter_id, tasks in tasks_by_adapter.items():
157-
if tasks:
158-
logs.info(f'Sending {len(tasks)} tasks to {adapter_id}.')
159-
service = self._service_map[adapter_id]
160-
results.extend(service.create_utask_main_jobs(tasks))
161-
return results
14+
"""Remote task package."""

src/clusterfuzz/_internal/remote_task/remote_task_adapters.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515

1616
from enum import Enum
1717

18-
from clusterfuzz._internal.batch.service import GcpBatchService
18+
from clusterfuzz._internal.batch import service as batch_service
1919
from clusterfuzz._internal.datastore import feature_flags
20-
from clusterfuzz._internal.k8s.service import KubernetesService
20+
from clusterfuzz._internal.k8s import service as k8s_service
2121

2222

2323
class RemoteTaskAdapters(Enum):
@@ -33,9 +33,9 @@ class RemoteTaskAdapters(Enum):
3333
feature_flag: The feature flag that controls the frequency of this backend.
3434
default_weight: The default frequency if the feature flag is not set.
3535
"""
36-
KUBERNETES = ('kubernetes', KubernetesService,
36+
KUBERNETES = ('kubernetes', k8s_service.KubernetesService,
3737
feature_flags.FeatureFlags.K8S_JOBS_FREQUENCY, 0.0)
38-
GCP_BATCH = ('gcp_batch', GcpBatchService, None, 1.0)
38+
GCP_BATCH = ('gcp_batch', batch_service.GcpBatchService, None, 1.0)
3939

4040
def __init__(self, adapter_id, service, feature_flag, default_weight):
4141
self.id = adapter_id
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Remote task interface.
15+
16+
This module defines the interface for a remote task execution client. This
17+
abstraction allows ClusterFuzz to support multiple remote execution
18+
environments, such as GCP Batch and Kubernetes, without tightly coupling
19+
the task creation logic to a specific implementation.
20+
"""
21+
22+
import collections
23+
import random
24+
from typing import List
25+
26+
from clusterfuzz._internal.metrics import logs
27+
from clusterfuzz._internal.remote_task import remote_task_types
28+
from clusterfuzz._internal.remote_task import remote_task_adapters
29+
30+
31+
class RemoteTaskGate(remote_task_types.RemoteTaskInterface):
32+
"""A generic dispatcher for remote task execution.
33+
34+
This class acts as a high-level manager that abstracts away the specific
35+
details of the underlying remote execution backends. It uses the frequencies
36+
defined in this module to dynamically choose a backend for each task,
37+
allowing for flexible distribution and A/B testing.
38+
"""
39+
40+
def __init__(self):
41+
# Instantiate and cache the service clients for each defined adapter.
42+
self._service_map = {
43+
adapter.id: adapter.service()
44+
for adapter in remote_task_adapters.RemoteTaskAdapters
45+
}
46+
self._adapters = remote_task_adapters.RemoteTaskAdapters
47+
48+
def _get_adapter(self) -> str:
49+
"""Performs a weighted random choice to select a remote backend.
50+
51+
This method is used when creating a single task, ensuring that the
52+
distribution of tasks over time aligns with the configured frequencies.
53+
"""
54+
frequencies = self.get_job_frequency()
55+
population = list(frequencies.keys())
56+
weights = list(frequencies.values())
57+
return random.choices(population, weights)[0]
58+
59+
def get_job_frequency(self):
60+
"""Returns the frequency distribution for all remote task adapters.
61+
62+
This function calculates the proportion of tasks that should be sent to each
63+
remote backend defined in the `RemoteTaskAdapters` enum. The calculation
64+
is based on feature flags, default weights, and ensures the total
65+
distribution sums to 1.0.
66+
67+
The order of adapters in the enum matters, as this function processes them
68+
sequentially, and any remaining weight to sum to 1.0 is assigned to the
69+
last adapter.
70+
71+
Returns:
72+
A dictionary mapping each adapter's ID (e.g., 'gcp_batch') to its
73+
calculated frequency (a float between 0.0 and 1.0).
74+
"""
75+
frequencies = {adapter.id: 0.0 for adapter in self._adapters}
76+
total_weight = 0.0
77+
78+
for adapter in self._adapters:
79+
default_weight = adapter.default_weight
80+
feature_flag = adapter.feature_flag
81+
weight = default_weight
82+
83+
# A feature flag can override the default weight for an adapter, allowing
84+
# for dynamic adjustments to task distribution.
85+
if (feature_flag and feature_flag.enabled and
86+
isinstance(feature_flag.content, float)):
87+
feature_flag_weight = feature_flag.content
88+
if 0 <= feature_flag_weight <= 1:
89+
weight = feature_flag_weight
90+
91+
if total_weight >= 1.0 and weight > 0.0:
92+
logs.warning(
93+
'Total weight for jobs frequency bigger than 1.0. Adapter starving',
94+
adapter=adapter.id)
95+
break
96+
97+
# Ensure the cumulative weight does not exceed 1.0. If adding the
98+
# current weight would push the total over, we cap it.
99+
if weight + total_weight > 1.0:
100+
weight = 1.0 - total_weight
101+
102+
total_weight += weight
103+
frequencies[adapter.id] = weight if weight >= 0 else 0.0
104+
105+
logs.info('Job frequencies', frequencies=frequencies)
106+
return frequencies
107+
108+
def create_utask_main_job(self, module, job_type, input_download_url):
109+
adapter_id = self._get_adapter()
110+
service = self._service_map[adapter_id]
111+
return service.create_utask_main_job(module, job_type, input_download_url)
112+
113+
def create_utask_main_jobs(
114+
self, remote_tasks: List[remote_task_types.RemoteTask]):
115+
"""Creates a batch of remote tasks, distributing them across backends.
116+
117+
This method handles two cases:
118+
1. If there is only one task, it uses a weighted random choice to select
119+
a backend, similar to `create_utask_main_job`.
120+
2. If there are multiple tasks, it distributes them deterministically
121+
across the available backends based on their configured frequencies.
122+
This ensures that a batch of 100 tasks with a 70/30 split sends
123+
exactly 70 tasks to one backend and 30 to the other.
124+
"""
125+
tasks_by_adapter = collections.defaultdict(list)
126+
127+
if len(remote_tasks) == 1:
128+
# For a single task, use a random distribution.
129+
adapter_id = self._get_adapter()
130+
tasks_by_adapter[adapter_id].extend(remote_tasks)
131+
else:
132+
# For multiple tasks, use deterministic slicing to ensure the
133+
# distribution precisely matches the frequency configuration.
134+
frequencies = self.get_job_frequency()
135+
start_index = 0
136+
for adapter_id, frequency in frequencies.items():
137+
count = int(len(remote_tasks) * frequency)
138+
tasks_by_adapter[adapter_id].extend(
139+
remote_tasks[start_index:start_index + count])
140+
start_index += count
141+
142+
# Distribute any remainder tasks (due to rounding) one by one. This
143+
# ensures that all tasks are assigned to a backend.
144+
remaining_tasks = remote_tasks[start_index:]
145+
for i, task in enumerate(remaining_tasks):
146+
adapter_id = list(frequencies.keys())[i % len(frequencies)]
147+
tasks_by_adapter[adapter_id].append(task)
148+
149+
results = []
150+
for adapter_id, tasks in tasks_by_adapter.items():
151+
if tasks:
152+
logs.info(f'Sending {len(tasks)} tasks to {adapter_id}.')
153+
service = self._service_map[adapter_id]
154+
results.extend(service.create_utask_main_jobs(tasks))
155+
return results

src/clusterfuzz/_internal/remote_task/remote_task_types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,5 @@ def create_utask_main_job(self, module: str, job_type: str,
4747

4848
@abc.abstractmethod
4949
def create_utask_main_jobs(self, remote_tasks: list[RemoteTask]):
50-
"""Creates a many remote tasks for uworker main tasks."""
50+
"""Creates many remote tasks for uworker main tasks."""
5151
raise NotImplementedError

src/clusterfuzz/_internal/tests/core/remote_task/job_frequency_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import unittest
1616
from unittest import mock
1717

18-
from clusterfuzz._internal.remote_task import RemoteTaskGate
18+
from clusterfuzz._internal.remote_task import remote_task_gate
1919
from clusterfuzz._internal.tests.core.datastore import ds_test_utils
2020
from clusterfuzz._internal.tests.test_libs import test_utils
2121

@@ -28,7 +28,7 @@ def setUp(self):
2828
mock.patch(
2929
'clusterfuzz._internal.k8s.service.KubernetesService._load_gke_credentials'
3030
).start()
31-
self.gate = RemoteTaskGate()
31+
self.gate = remote_task_gate.RemoteTaskGate()
3232

3333
def test_get_job_frequency_defaults(self):
3434
"""Tests that the default frequencies are returned when no feature flags

0 commit comments

Comments
 (0)