Skip to content

Commit b97b50d

Browse files
IvanBM18jardondiegojavanlacerda
authored
Swarming: Removes Unnecessary DB calls (#5223)
We are adding back the last batch of the [changes that were rollbacked](c964710) which make sure that we weren't making a lot of unnecessary calls into the DB. By consequence the signature of multiple methods were changed to simplified. I also added a new tests to avoid the previous issue (the one that caused the rollback) from ever appearing again. This tests are in its own test class and its purpose is to check that the BadConfigError is correctly handled when no config is found(which is the expected bahaviour in the external and google envs) ## Tests performed in `dev` environment This changes have been present since March 25th. Its been over one week since then and metrics seem consistent <img width="957" height="337" alt="image" src="https://github.com/user-attachments/assets/1ffcdba5-9adc-434b-831e-905c4653357e" /> Note: Down times in fuzzing hours matches when we deployed a change into the env. Also no new error groups seem to be appearing in swarming, the only notable error is one relating missing permissions for uploading a test case, which seems unrelated This changes have been in `dev` for quite some time and we have successfully tested their functionality without compromising any functionality. For example [this Task](https://chrome-swarming.appspot.com/task?id=7743c89f08030410) was scheduled with this changes. --------- Co-authored-by: Diego Jardon <37823380+jardondiego@users.noreply.github.com> Co-authored-by: Javan Lacerda <javanlacerda@google.com>
1 parent 870e910 commit b97b50d

10 files changed

Lines changed: 218 additions & 86 deletions

File tree

src/clusterfuzz/_internal/bot/tasks/task_types.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ def is_remote_utask(command, job):
115115
# Return True even if we can't query the db.
116116
return True
117117

118-
return batch_service.is_remote_task(
119-
command, job) or swarming.is_swarming_task(command, job)
118+
return batch_service.is_remote_task(command,
119+
job) or swarming.is_swarming_task(job)
120120

121121

122122
def task_main_runs_on_uworker():
@@ -178,11 +178,8 @@ def execute(self, task_argument, job_type, uworker_env):
178178
return
179179

180180
logs.info('Queueing utask for remote execution.', download_url=download_url)
181-
if batch_service.is_remote_task(command, job_type):
182-
tasks.add_utask_main(command, download_url, job_type)
183-
else:
184-
assert swarming.is_swarming_task(command, job_type)
185-
swarming.push_swarming_task(command, download_url, job_type)
181+
assert batch_service.is_remote_task(command, job_type)
182+
tasks.add_utask_main(command, download_url, job_type)
186183

187184
@logs.task_stage_context(logs.Stage.PREPROCESS)
188185
def preprocess(self, task_argument, job_type, uworker_env):

src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,9 @@ def _timestamp_now() -> Timestamp:
6363
return ts
6464

6565

66-
def _get_execution_mode(utask_module, job_type):
66+
def _get_execution_mode(job_type):
6767
"""Determines whether this task in executed on swarming or batch."""
68-
command = task_utils.get_command_from_module(utask_module.__name__)
69-
if swarming.is_swarming_task(command, job_type):
68+
if swarming.is_swarming_task(job_type):
7069
return Mode.SWARMING
7170
return Mode.BATCH
7271

@@ -410,7 +409,7 @@ def tworker_preprocess(utask_module, task_argument, job_type, uworker_env):
410409
signed download URL for the uworker's input and the (unsigned) download URL
411410
for its output."""
412411
with _MetricRecorder(_Subtask.PREPROCESS) as recorder:
413-
execution_mode = _get_execution_mode(utask_module, job_type)
412+
execution_mode = _get_execution_mode(job_type)
414413
uworker_input = _preprocess(utask_module, task_argument, job_type,
415414
uworker_env, recorder, execution_mode)
416415
if not uworker_input:
@@ -501,8 +500,7 @@ def tworker_postprocess(output_download_url) -> None:
501500
task_utils.reset_task_stage_env()
502501

503502
utask_module = get_utask_module(uworker_output.uworker_input.module_name)
504-
execution_mode = _get_execution_mode(utask_module,
505-
uworker_output.uworker_input.job_type)
503+
execution_mode = _get_execution_mode(uworker_output.uworker_input.job_type)
506504
recorder.set_task_details(
507505
utask_module,
508506
uworker_output.uworker_input.job_type,

src/clusterfuzz/_internal/metrics/logs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,6 +1013,7 @@ class Stage(enum.Enum):
10131013
MAIN = 'main'
10141014
POSTPROCESS = 'postprocess'
10151015
UNKNOWN = 'unknown'
1016+
SCHEDULER = 'scheduler'
10161017
NA = 'n/a'
10171018

10181019

src/clusterfuzz/_internal/remote_task/remote_task_gate.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424

2525
from clusterfuzz._internal import swarming
2626
from clusterfuzz._internal.base import feature_flags
27-
from clusterfuzz._internal.base.tasks import task_utils
2827
from clusterfuzz._internal.metrics import logs
2928
from clusterfuzz._internal.remote_task import remote_task_adapters
3029
from clusterfuzz._internal.remote_task import remote_task_types
@@ -61,9 +60,8 @@ def _get_adapter(self) -> str:
6160
def _is_swarming_applicable(self):
6261
return feature_flags.FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled
6362

64-
def _is_swarming_task(self, module, job_type):
65-
return swarming.is_swarming_task(
66-
task_utils.get_command_from_module(module), job_type)
63+
def _is_swarming_task(self, job_type):
64+
return swarming.is_swarming_task(job_type)
6765

6866
def _handle_swarming_job(self, module, job_type, input_download_url):
6967
return self._service_map['swarming'].create_utask_main_job(
@@ -124,8 +122,7 @@ def get_job_frequency(self):
124122

125123
def create_utask_main_job(self, module, job_type, input_download_url):
126124
"""Creates a single remote task, selecting a backend dynamically."""
127-
if self._is_swarming_applicable() and self._is_swarming_task(
128-
module, job_type):
125+
if self._is_swarming_applicable() and self._is_swarming_task(job_type):
129126
return self._handle_swarming_job(module, job_type, input_download_url)
130127

131128
adapter_id = self._get_adapter()
@@ -156,6 +153,7 @@ def create_utask_main_jobs(self,
156153
unscheduled_tasks = []
157154

158155
if self._is_swarming_applicable():
156+
logs.info(f'[Swarming] enabled, pushing {len(remote_tasks)} tasks.')
159157
remote_tasks = self._handle_swarming_jobs(remote_tasks)
160158

161159
if not remote_tasks:

src/clusterfuzz/_internal/swarming/__init__.py

Lines changed: 66 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from google.protobuf import json_format
2222

2323
from clusterfuzz._internal.base import utils
24+
from clusterfuzz._internal.base.errors import BadConfigError
2425
from clusterfuzz._internal.base.feature_flags import FeatureFlags
2526
from clusterfuzz._internal.config import local_config
2627
from clusterfuzz._internal.datastore import data_types
@@ -35,38 +36,62 @@
3536
]
3637

3738

38-
def is_swarming_task(command: str, job_name: str):
39+
def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool:
3940
"""Returns True if the task is supposed to run on swarming."""
4041
if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled:
42+
logs.info('[DEBUG] Flag is disabled', job_name=job_name)
4143
return False
42-
job = data_types.Job.query(data_types.Job.name == job_name).get()
43-
if not job:
44-
return False
44+
if job is None:
45+
job = data_types.Job.query(data_types.Job.name == job_name).get()
46+
if not job:
47+
logs.info('[Swarming DEBUG] Job not found', job_name=job_name)
48+
return False
4549

4650
job_environment = job.get_environment()
47-
if not utils.string_is_true(job_environment.get('IS_SWARMING_JOB')):
51+
if not utils.string_is_true(job_environment.get(
52+
'IS_SWARMING_JOB')) and not job_environment.get('SWARMING_DIMENSIONS'):
53+
logs.info('[Swarming DEBUG] No swarming env var', job_name=job_name)
4854
return False
4955

50-
try:
51-
_get_new_task_spec(command, job_name, '')
52-
return True
53-
except ValueError:
56+
swarming_config = _get_swarming_config()
57+
if swarming_config is None:
58+
logs.warning(
59+
"""[Swarming DEBUG] current task is not suitable for swarming.
60+
'Reason: failed to retrieve config.""",
61+
job_name=job_name)
5462
return False
5563

64+
return _get_instance_spec(swarming_config, job) is not None
65+
5666

57-
def _get_task_name():
58-
return 't-' + str(uuid.uuid4()).lower()
67+
def _get_instance_spec(swarming_config: local_config.SwarmingConfig,
68+
job: data_types.Job) -> dict | None:
69+
return swarming_config.get('mapping').get(job.platform, None)
5970

6071

61-
def _get_swarming_config():
72+
def _get_task_name(job_name: str):
73+
return f't-{str(uuid.uuid4()).lower()}-{job_name}'
74+
75+
76+
def _get_swarming_config() -> local_config.SwarmingConfig | None:
6277
"""Returns the swarming config."""
63-
return local_config.SwarmingConfig()
78+
try:
79+
return local_config.SwarmingConfig()
80+
except (BadConfigError, ValueError) as e:
81+
logs.error(f'[Swarming] Failed to retrieve config: {e}')
82+
return None
6483

6584

6685
def _get_task_dimensions(job: data_types.Job, platform_specific_dimensions: list
6786
) -> list[swarming_pb2.StringPair]: # pylint: disable=no-member
6887
""" Gets all swarming dimensions for a task.
6988
Job dimensions have more precedence than static dimensions"""
89+
swarming_config = _get_swarming_config()
90+
if not swarming_config:
91+
logs.error(
92+
'[Swarming] No dimensions set. Reason: failed to retrieve config')
93+
return []
94+
7095
unique_dimensions = {}
7196
unique_dimensions['os'] = str(job.platform).capitalize()
7297
unique_dimensions['pool'] = _get_swarming_config().get('swarming_pool')
@@ -100,16 +125,27 @@ def _env_vars_to_json(
100125
value=json.dumps(env_vars_dict))
101126

102127

103-
def _get_new_task_spec(command: str, job_name: str,
104-
download_url: str) -> swarming_pb2.NewTaskRequest: # pylint: disable=no-member
105-
"""Gets the configured specifications for a swarming task."""
128+
def create_new_task_request(command: str, job_name: str, download_url: str
129+
) -> swarming_pb2.NewTaskRequest | None: # pylint: disable=no-member
130+
"""Gets the configured specifications for a swarming task.
131+
Returns None if the task should'nt be executed on swarming
132+
or if the SWARMING_REMOTE_EXECUTION flag is disabled."""
133+
if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled:
134+
return None
135+
106136
job = data_types.Job.query(data_types.Job.name == job_name).get()
107-
config_name = job.platform
137+
if job is None:
138+
return None
139+
108140
swarming_config = _get_swarming_config()
109-
instance_spec = swarming_config.get('mapping').get(config_name, None)
141+
if not swarming_config:
142+
return None
143+
144+
instance_spec = _get_instance_spec(swarming_config, job)
110145
if instance_spec is None:
111-
raise ValueError(f'No mapping for {config_name}')
112-
swarming_realm = swarming_config.get('swarming_realm')
146+
return None
147+
148+
swarming_realm = swarming_config.get('swarming_realm',)
113149
logs_project_id = swarming_config.get('logs_project_id')
114150
priority = instance_spec['priority']
115151
startup_command = instance_spec['command']
@@ -155,7 +191,7 @@ def _get_new_task_spec(command: str, job_name: str,
155191
cas_input_root = instance_spec.get('cas_input_root', {})
156192

157193
new_task_request = swarming_pb2.NewTaskRequest( # pylint: disable=no-member
158-
name=_get_task_name(),
194+
name=_get_task_name(job_name),
159195
priority=priority,
160196
realm=swarming_realm,
161197
service_account=service_account,
@@ -176,13 +212,13 @@ def _get_new_task_spec(command: str, job_name: str,
176212
return new_task_request
177213

178214

179-
def push_swarming_task(command, download_url, job_type):
215+
def push_swarming_task(task_request: swarming_pb2.NewTaskRequest): # pylint: disable=no-member
180216
"""Schedules a task on swarming."""
181-
job = data_types.Job.query(data_types.Job.name == job_type).get()
182-
if not job:
183-
raise ValueError('invalid job_name')
184-
185-
task_spec = _get_new_task_spec(command, job_type, download_url)
217+
swarming_config = _get_swarming_config()
218+
if not swarming_config:
219+
logs.error(
220+
'[Swarming] Failed to push task into swarming. Reason: No config.')
221+
return
186222
creds = credentials.get_scoped_service_account_credentials(_SWARMING_SCOPES)
187223
if not creds:
188224
logs.error(
@@ -199,11 +235,11 @@ def push_swarming_task(command, download_url, job_type):
199235
}
200236
swarming_server = _get_swarming_config().get('swarming_server')
201237
url = f'https://{swarming_server}/prpc/swarming.v2.Tasks/NewTask'
202-
message_body = json_format.MessageToJson(task_spec)
238+
message_body = json_format.MessageToJson(task_request)
203239
logs.info(
204-
f"""[Swarming] Pushing task for {job_type}
240+
f"""[Swarming] Pushing task {task_request.name}
205241
as {creds.service_account_email}""",
206242
url=url,
207243
body=message_body)
208244
response = utils.post_url(url=url, data=message_body, headers=headers)
209-
logs.info(f'[Swarming] Response from {job_type}', response=response)
245+
logs.info(f'[Swarming] Response from {task_request.name}', response=response)

src/clusterfuzz/_internal/swarming/service.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,23 @@ def create_utask_main_job(self, module: str, job_type: str,
3535

3636
return result[0]
3737

38+
@logs.task_stage_context(logs.Stage.SCHEDULER)
3839
def create_utask_main_jobs(self,
3940
remote_tasks: list[remote_task_types.RemoteTask]
4041
) -> list[remote_task_types.RemoteTask]:
4142
"""Creates many remote tasks for uworker main tasks.
4243
Returns the tasks that couldn't be created.
4344
"""
4445
unscheduled_tasks = []
46+
logs.info(f'[Swarming] Pushing {len(remote_tasks)} tasks trough service.')
4547
for task in remote_tasks:
4648
try:
47-
if not swarming.is_swarming_task(task.command, task.job_type):
49+
if not swarming.is_swarming_task(task.job_type):
4850
unscheduled_tasks.append(task)
4951
continue
50-
51-
swarming.push_swarming_task(task.command, task.input_download_url,
52-
task.job_type)
52+
if request := swarming.create_new_task_request(
53+
task.command, task.job_type, task.argument):
54+
swarming.push_swarming_task(request)
5355
except Exception: # pylint: disable=broad-except
5456
logs.error(
5557
f'Failed to push task to Swarming: {task.command}, {task.job_type}.'

src/clusterfuzz/_internal/tests/core/remote_task/remote_task_gate_test.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -518,15 +518,13 @@ def test_is_swarming_applicable(self, mock_swarming_flag):
518518
mock_swarming_flag.return_value = False
519519
self.assertFalse(self.gate._is_swarming_applicable())
520520

521-
@mock.patch('clusterfuzz._internal.remote_task.remote_task_gate.task_utils')
522521
@mock.patch('clusterfuzz._internal.remote_task.remote_task_gate.swarming')
523-
def test_is_swarming_task(self, mock_swarming, mock_task_utils):
522+
def test_is_swarming_task(self, mock_swarming):
524523
"""Tests _is_swarming_task."""
525-
mock_task_utils.get_command_from_module.return_value = 'fuzz'
526524
mock_swarming.is_swarming_task.return_value = True
527525

528-
self.assertTrue(self.gate._is_swarming_task('module', 'job'))
529-
mock_swarming.is_swarming_task.assert_called_once_with('fuzz', 'job')
526+
self.assertTrue(self.gate._is_swarming_task('job'))
527+
mock_swarming.is_swarming_task.assert_called_once_with('job')
530528

531529
def test_handle_swarming_job(self):
532530
"""Tests _handle_swarming_job."""

src/clusterfuzz/_internal/tests/core/swarming/service_test.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ def setUp(self):
2828
helpers.patch(self, [
2929
'clusterfuzz._internal.swarming.is_swarming_task',
3030
'clusterfuzz._internal.swarming.push_swarming_task',
31+
'clusterfuzz._internal.swarming.create_new_task_request',
3132
'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module',
3233
'clusterfuzz._internal.metrics.logs.error',
3334
])
3435
self.service = service.SwarmingService()
36+
self.mock.create_new_task_request.return_value = 'fake_request'
3537

3638
def test_create_utask_main_job_success(self):
3739
"""Test creating a single task successfully."""
@@ -44,8 +46,7 @@ def test_create_utask_main_job_success(self):
4446
# Success returns None in this interface (consistent with GcpBatchService)
4547
self.assertIsNone(result)
4648

47-
self.mock.push_swarming_task.assert_called_once_with(
48-
'fuzz', 'http://url', 'job_type')
49+
self.mock.push_swarming_task.assert_called_once_with('fake_request')
4950

5051
def test_create_utask_main_job_failure(self):
5152
"""Test creating a single task that is not a swarming task."""
@@ -78,8 +79,8 @@ def test_create_utask_main_jobs_mixed_results(self):
7879

7980
self.assertEqual(self.mock.push_swarming_task.call_count, 2)
8081
self.mock.push_swarming_task.assert_has_calls([
81-
mock.call('fuzz', 'url1', 'job1'),
82-
mock.call('fuzz', 'url3', 'job3'),
82+
mock.call('fake_request'),
83+
mock.call('fake_request'),
8384
])
8485

8586
def test_create_utask_main_jobs_all_success(self):

0 commit comments

Comments
 (0)