Skip to content

Commit da47d7b

Browse files
committed
prepare unscheduled_tasks
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
1 parent 279404c commit da47d7b

5 files changed

Lines changed: 90 additions & 25 deletions

File tree

src/clusterfuzz/_internal/batch/service.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,8 +375,10 @@ def create_utask_main_job(self, module: str, job_type: str,
375375
batch_tasks = [
376376
remote_task_types.RemoteTask(command, job_type, input_download_url)
377377
]
378-
uncreated_tasks = self.create_utask_main_jobs(batch_tasks)
379-
return uncreated_tasks
378+
result = self.create_utask_main_jobs(batch_tasks)
379+
if not result:
380+
return None
381+
return result[0]
380382

381383
def create_utask_main_jobs(self,
382384
remote_tasks: List[remote_task_types.RemoteTask]):

src/clusterfuzz/_internal/remote_task/remote_task_gate.py

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
import collections
2323
import random
24-
from typing import List
24+
import typing
2525

26+
from clusterfuzz._internal.datastore import data_types
27+
from clusterfuzz._internal.datastore import ndb_utils
2628
from clusterfuzz._internal.metrics import logs
2729
from clusterfuzz._internal.remote_task import remote_task_adapters
2830
from clusterfuzz._internal.remote_task import remote_task_types
@@ -105,13 +107,39 @@ def get_job_frequency(self):
105107
logs.info('Job frequencies', frequencies=frequencies)
106108
return frequencies
107109

110+
def prepare_unscheduled_tasks(
111+
self, unscheduled_remote_tasks: typing.List[remote_task_types.RemoteTask]
112+
) -> typing.List[remote_task_types.RemoteTask]:
113+
"""Prepares the unscheduled remote tasks to be sent back to the
114+
preprocess queue.
115+
116+
The messages in the preprocess queue expects tasks with
117+
the properties followng task_name, fuzzer, job, eta.
118+
The utasks has the signed url as argument instead of the fuzzer,
119+
then for sending it back to the preprocess, it recovers its fuzzer
120+
and override the argument.
121+
"""
122+
123+
job_names = {task.job_type for task in unscheduled_remote_tasks}
124+
query = data_types.FuzzerJob.query(
125+
data_types.FuzzerJob.job.IN(list(job_names)))
126+
fuzzer_jobs = ndb_utils.get_all_from_query(query)
127+
fuzzer_jobs_name_mapped = {job.name: job for job in fuzzer_jobs}
128+
for task in unscheduled_remote_tasks:
129+
if task.job_type not in fuzzer_jobs_name_mapped:
130+
logs.error(f'{task.job_type} not found.')
131+
continue
132+
task.argument = fuzzer_jobs_name_mapped[task.job_type].argument
133+
134+
return unscheduled_remote_tasks
135+
108136
def create_utask_main_job(self, module, job_type, input_download_url):
109137
adapter_id = self._get_adapter()
110138
service = self._service_map[adapter_id]
111139
return service.create_utask_main_job(module, job_type, input_download_url)
112140

113-
def create_utask_main_jobs(self,
114-
remote_tasks: List[remote_task_types.RemoteTask]):
141+
def create_utask_main_jobs(
142+
self, remote_tasks: typing.List[remote_task_types.RemoteTask]):
115143
"""Creates a batch of remote tasks, distributing them across backends.
116144
117145
This method handles two cases:
@@ -146,13 +174,16 @@ def create_utask_main_jobs(self,
146174
adapter_id = list(frequencies.keys())[i % len(frequencies)]
147175
tasks_by_adapter[adapter_id].append(task)
148176

149-
results = []
177+
unscheduled_tasks = []
150178
for adapter_id, tasks in tasks_by_adapter.items():
151179
if tasks:
152180
try:
153181
logs.info(f'Sending {len(tasks)} tasks to {adapter_id}.')
154182
service = self._service_map[adapter_id]
155-
results.extend(service.create_utask_main_jobs(tasks))
183+
unscheduled_tasks.extend(service.create_utask_main_jobs(tasks))
156184
except Exception: # pylint: disable=broad-except
157185
logs.error(f'Failed to send {len(tasks)} tasks to {adapter_id}.')
158-
return results
186+
187+
prepared_unscheduled_tasks = self.prepare_unscheduled_tasks(
188+
unscheduled_tasks)
189+
return prepared_unscheduled_tasks

src/clusterfuzz/_internal/tests/core/batch/batch_service_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ def test_create_uworker_main_batch_job(self):
254254
UUIDS[0], spec1, ['url1'])
255255
self.mock_batch_client_instance.create_job.assert_called_with(
256256
expected_create_request)
257-
self.assertEqual(result, [])
257+
self.assertIsNone(result)
258258

259259

260260
@test_utils.with_cloud_emulators('datastore')

src/clusterfuzz/_internal/tests/core/remote_task/remote_task_gate_test.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ def setUp(self):
5555
self.patcher.start()
5656
self.addCleanup(self.patcher.stop)
5757

58+
self.mock_prepare_unscheduled_tasks = mock.patch.object(
59+
remote_task_gate.RemoteTaskGate,
60+
'prepare_unscheduled_tasks',
61+
side_effect=lambda x: x).start()
62+
self.addCleanup(mock.patch.stopall)
63+
5864
def test_init(self):
5965
"""Tests that the RemoteTaskGate initializes correctly and creates
6066
service map."""
@@ -197,3 +203,33 @@ def test_create_utask_main_jobs_full_gcp_batch(self, mock_get_job_frequency):
197203
self.mock_gcp_batch_service.create_utask_main_jobs.assert_called_once_with(
198204
tasks)
199205
self.mock_k8s_service.create_utask_main_jobs.assert_not_called()
206+
207+
@mock.patch.object(remote_task_gate.RemoteTaskGate, 'get_job_frequency')
208+
def test_create_utask_main_jobs_calls_prepare_unscheduled_tasks(
209+
self, mock_get_job_frequency):
210+
"""Tests that create_utask_main_jobs calls prepare_unscheduled_tasks with
211+
unscheduled tasks."""
212+
tasks = [
213+
remote_task_types.RemoteTask('c', 'j', 'u1'),
214+
]
215+
unscheduled_tasks = [
216+
remote_task_types.RemoteTask('c', 'j', 'u1'),
217+
]
218+
prepared_tasks = [
219+
remote_task_types.RemoteTask('c', 'j', 'u1'),
220+
]
221+
prepared_tasks[0].argument = 'arg'
222+
223+
mock_get_job_frequency.return_value = {'kubernetes': 1.0, 'gcp_batch': 0.0}
224+
self.mock_k8s_service.create_utask_main_jobs.return_value = unscheduled_tasks
225+
226+
self.mock_prepare_unscheduled_tasks.side_effect = None
227+
self.mock_prepare_unscheduled_tasks.return_value = prepared_tasks
228+
229+
gate = remote_task_gate.RemoteTaskGate()
230+
result = gate.create_utask_main_jobs(tasks)
231+
232+
self.mock_k8s_service.create_utask_main_jobs.assert_called_once_with(tasks)
233+
self.mock_prepare_unscheduled_tasks.assert_called_once_with(
234+
unscheduled_tasks)
235+
self.assertEqual(result, prepared_tasks)

src/clusterfuzz/_internal/tests/core/remote_task/remote_task_test.py

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,19 @@ def setUp(self):
4141
mock_creds.token = 'fake-token'
4242
self.mock_auth_default.return_value = (mock_creds, 'test-project')
4343

44-
# Mock discovery.build to avoid network calls during KubernetesService init
45-
patcher = mock.patch('googleapiclient.discovery.build')
44+
# Mock _load_gke_credentials to avoid BOT_DIR dependency
45+
patcher = mock.patch.object(k8s_service.KubernetesService,
46+
'_load_gke_credentials')
4647
self.addCleanup(patcher.stop)
47-
self.mock_discovery_build = patcher.start()
48-
mock_service = mock.Mock()
49-
self.mock_discovery_build.return_value = mock_service
50-
mock_service.projects().locations().clusters().list(
51-
).execute.return_value = {
52-
'clusters': [{
53-
'name': 'clusterfuzz-cronjobs-gke',
54-
'endpoint': '1.2.3.4',
55-
'masterAuth': {
56-
'clusterCaCertificate':
57-
'ZmFrZS1jZXJ0' # base64 encoded 'fake-cert'
58-
}
59-
}]
60-
}
48+
patcher.start()
49+
50+
# Mock prepare_unscheduled_tasks to avoid NDB context issues
51+
patcher = mock.patch.object(
52+
remote_task_gate.RemoteTaskGate,
53+
'prepare_unscheduled_tasks',
54+
side_effect=lambda x: x)
55+
self.addCleanup(patcher.stop)
56+
patcher.start()
6157

6258
self.gate = remote_task_gate.RemoteTaskGate()
6359

0 commit comments

Comments
 (0)