Skip to content

Commit 75ece20

Browse files
committed
Pr/metrics logging (#5115)
Signed-off-by: Javan Lacerda <javanlacerda@google.com>
1 parent d00c0c4 commit 75ece20

13 files changed

Lines changed: 466 additions & 110 deletions

File tree

butler.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,6 @@ def main():
435435
'clean_indexes', help='Clean up undefined indexes (in index.yaml).')
436436
parser_clean_indexes.add_argument(
437437
'-c', '--config-dir', required=True, help='Path to application config.')
438-
439438
parser_create_config = subparsers.add_parser(
440439
'create_config', help='Create a new deployment config.')
441440
parser_create_config.add_argument(

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@
6464
'regression': 24 * 60 * 60,
6565
}
6666

67+
68+
def get_task_duration(command):
69+
"""Gets the duration of a task."""
70+
return TASK_LEASE_SECONDS_BY_COMMAND.get(command, TASK_LEASE_SECONDS)
71+
72+
6773
TASK_QUEUE_DISPLAY_NAMES = {
6874
'LINUX': 'Linux',
6975
'LINUX_WITH_GPU': 'Linux (with GPU)',
@@ -503,6 +509,7 @@ def __init__(self, pubsub_message):
503509
}
504510

505511
self.eta = datetime.datetime.utcfromtimestamp(float(self.attribute('eta')))
512+
self.do_not_ack = False
506513

507514
def attribute(self, key):
508515
"""Return attribute value."""
@@ -550,7 +557,8 @@ def lease(self, _event=None): # pylint: disable=arguments-differ
550557
leaser_thread.join()
551558

552559
# If we get here the task succeeded in running. Acknowledge the message.
553-
self._pubsub_message.ack()
560+
if not self.do_not_ack:
561+
self._pubsub_message.ack()
554562
track_task_end()
555563

556564
def dont_retry(self):
@@ -587,7 +595,8 @@ def lease(self, _event=None): # pylint: disable=arguments-differ
587595
leaser_thread.join()
588596

589597
# If we get here the task succeeded in running. Acknowledge the message.
590-
self._pubsub_message.ack()
598+
if not self.do_not_ack:
599+
self._pubsub_message.ack()
591600
track_task_end()
592601

593602

src/clusterfuzz/_internal/batch/service.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,6 @@ def _get_config_names(batch_tasks: List[RemoteTask]):
238238
return config_map
239239

240240

241-
def _get_task_duration(command):
242-
return tasks.TASK_LEASE_SECONDS_BY_COMMAND.get(command,
243-
tasks.TASK_LEASE_SECONDS)
244-
245-
246241
WeightedSubconfig = collections.namedtuple('WeightedSubconfig',
247242
['name', 'weight'])
248243

@@ -293,7 +288,7 @@ def _get_specs_from_config(batch_tasks: List[RemoteTask]) -> Dict:
293288
# Lower numbers are a lower priority, meaning less likely to run From:
294289
# https://cloud.google.com/batch/docs/reference/rest/v1/projects.locations.jobs
295290
priority = 0 if task.command == 'fuzz' else 1
296-
max_run_duration = f'{_get_task_duration(task.command)}s'
291+
max_run_duration = f'{tasks.get_task_duration(task.command)}s'
297292
# This saves us time and reduces fragementation, e.g. every linux fuzz task
298293
# run in this call will run in the same zone.
299294
if config_name not in subconfig_map:

src/clusterfuzz/_internal/bot/tasks/task_types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def execute(self, task_argument, job_type, uworker_env):
225225
'analyze': UTask,
226226
'blame': TrustedTask,
227227
'corpus_pruning': UTask,
228-
'fuzz': UTaskLocalExecutor,
228+
'fuzz': UTask,
229229
'impact': TrustedTask,
230230
'minimize': UTask,
231231
'progression': UTask,

src/clusterfuzz/_internal/k8s/service.py

Lines changed: 94 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
from google.auth.transport import requests as google_requests
2525
from googleapiclient import discovery
2626
from kubernetes import client as k8s_client
27+
from kubernetes import config as k8s_config
2728

29+
from clusterfuzz._internal.base import tasks
2830
from clusterfuzz._internal.base import utils
2931
from clusterfuzz._internal.base.tasks import task_utils
3032
from clusterfuzz._internal.config import local_config
@@ -35,9 +37,7 @@
3537
from clusterfuzz._internal.remote_task import RemoteTaskInterface
3638
from clusterfuzz._internal.system import environment
3739

38-
# See https://cloud.google.com/batch/quotas#job_limits
39-
MAX_CONCURRENT_VMS_PER_JOB = 1000
40-
# TODO(javanlacerda): Use feature flag for setting it.
40+
MAX_PENDING_JOBS = 1000
4141
CLUSTER_NAME = 'clusterfuzz-cronjobs-gke'
4242

4343
KubernetesJobConfig = collections.namedtuple('KubernetesJobConfig', [
@@ -93,6 +93,7 @@ def _get_k8s_job_configs(remote_tasks: List[RemoteTask]) -> Dict:
9393

9494
if not remote_tasks:
9595
return {}
96+
#TODO(javanlacerda): Create remote task config
9697
batch_config = local_config.BatchConfig()
9798
config_map = _get_config_names(remote_tasks)
9899
configs = {}
@@ -130,7 +131,8 @@ def _get_k8s_job_configs(remote_tasks: List[RemoteTask]) -> Dict:
130131
return configs
131132

132133

133-
def _create_job_body(config: KubernetesJobConfig, input_url: str) -> dict:
134+
def _create_job_body(config: KubernetesJobConfig, input_url: str,
135+
service_account_name: str) -> dict:
134136
"""Creates the body of a Kubernetes job."""
135137

136138
job_name = config.job_type.replace('_', '-') + '-' + str(uuid.uuid4()).split(
@@ -144,10 +146,11 @@ def _create_job_body(config: KubernetesJobConfig, input_url: str) -> dict:
144146
'name': job_name
145147
},
146148
'spec': {
149+
'activeDeadlineSeconds': tasks.get_task_duration(config.command),
147150
'template': {
148151
'spec': {
149-
'hostNetwork':
150-
True,
152+
'serviceAccountName':
153+
service_account_name,
151154
'containers': [{
152155
'name':
153156
job_name,
@@ -182,12 +185,16 @@ def _create_job_body(config: KubernetesJobConfig, input_url: str) -> dict:
182185
},
183186
{
184187
'name': 'IS_K8S_ENV',
185-
'value': 'True'
188+
'value': 'true'
186189
},
187190
{
188191
'name': 'DISABLE_MOUNTS',
189192
'value': 'true'
190193
},
194+
{
195+
'name': 'UPDATE_WEB_TESTS',
196+
'value': 'False'
197+
},
191198
],
192199
'securityContext': {
193200
'privileged': True,
@@ -229,31 +236,31 @@ def __init__(self, k8s_config_loaded: bool = False):
229236

230237
def _load_gke_credentials(self):
231238
"""Loads GKE credentials and configures the Kubernetes client."""
232-
credentials, project = google.auth.default()
239+
credentials, _ = google.auth.default()
240+
project = utils.get_application_id()
233241
service = discovery.build('container', 'v1', credentials=credentials)
234-
parent = f'projects/{project}/locations/-'
235-
# pylint: disable=no-member
236-
response = service.projects().locations().clusters().list(
237-
parent=parent).execute()
238-
239-
cluster = None
240-
for c in response.get('clusters', []):
241-
if c['name'] == CLUSTER_NAME:
242-
cluster = c
243-
break
244-
245-
if not cluster:
246-
logs.error(f'Cluster {CLUSTER_NAME} not found.')
242+
parent = f"projects/{project}/locations/-"
243+
244+
try:
245+
response = service.projects().locations().clusters().list(
246+
parent=parent).execute()
247+
clusters = response.get('clusters', [])
248+
cluster = next((c for c in clusters if c['name'] == CLUSTER_NAME), None)
249+
250+
if not cluster:
251+
logs.error(f"Cluster {CLUSTER_NAME} not found in project {project}.")
252+
print(f"DEBUG: Cluster {CLUSTER_NAME} not found in project {project}.")
253+
return
254+
255+
except Exception as e:
256+
logs.error(f"Failed to list clusters in {project}: {e}")
247257
return
248258

249259
endpoint = cluster['endpoint']
250260
# ca_cert is base64 encoded.
251261
ca_cert = base64.b64decode(cluster['masterAuth']['clusterCaCertificate'])
252262

253263
# Write CA cert to a temporary file.
254-
# Note: This file persists for the lifetime of the process/container.
255-
# Ideally it should be cleaned up, but standard k8s client usage involves
256-
# file paths.
257264
fd, ca_cert_path = tempfile.mkstemp()
258265
with os.fdopen(fd, 'wb') as f:
259266
f.write(ca_cert)
@@ -264,17 +271,38 @@ def _load_gke_credentials(self):
264271
configuration.verify_ssl = True
265272

266273
def get_token(creds):
267-
if not creds.valid:
268-
request = google_requests.Request()
274+
request = google_requests.Request()
275+
if not creds.valid or creds.expired:
269276
creds.refresh(request)
270-
return creds.token
277+
return {"authorization": "Bearer " + creds.token}
271278

272-
# Hook to refresh token on API calls.
273-
configuration.refresh_api_key_hook = lambda _: {
274-
"authorization": "Bearer " + get_token(credentials)
275-
}
279+
configuration.refresh_api_key_hook = lambda _: get_token(credentials)
280+
configuration.api_key = get_token(credentials)
276281

277282
k8s_client.Configuration.set_default(configuration)
283+
logs.info("GKE credentials loaded successfully.")
284+
285+
def _create_service_account_if_needed(self,
286+
service_account_email: str) -> str:
287+
"""Creates a Kubernetes Service Account if it doesn't exist."""
288+
service_account_name = service_account_email.split('@')[0]
289+
namespace = 'default'
290+
try:
291+
self._core_api.read_namespaced_service_account(service_account_name,
292+
namespace)
293+
return service_account_name
294+
except k8s_client.rest.ApiException as e:
295+
if e.status != 404:
296+
raise
297+
298+
logs.info(f'Creating Service Account {service_account_name} for '
299+
f'{service_account_email}.')
300+
metadata = k8s_client.V1ObjectMeta(
301+
name=service_account_name,
302+
annotations={'iam.gke.io/gcp-service-account': service_account_email})
303+
body = k8s_client.V1ServiceAccount(metadata=metadata)
304+
self._core_api.create_namespaced_service_account(namespace, body)
305+
return service_account_name
278306

279307
def create_job(self, config: KubernetesJobConfig, input_url: str) -> str:
280308
"""Creates a Kubernetes job.
@@ -285,11 +313,25 @@ def create_job(self, config: KubernetesJobConfig, input_url: str) -> str:
285313
Returns:
286314
The name of the created Kubernetes job.
287315
"""
288-
289-
job_body = _create_job_body(config, input_url)
316+
service_account_name = self._create_service_account_if_needed(
317+
config.service_account_email)
318+
job_body = _create_job_body(config, input_url, service_account_name)
290319
self._batch_api.create_namespaced_job(body=job_body, namespace='default')
291320
return job_body['metadata']['name']
292321

322+
def _get_pending_jobs_count(self) -> int:
323+
"""Returns the number of pending jobs."""
324+
try:
325+
pods = self._core_api.list_namespaced_pod(
326+
namespace='default',
327+
label_selector='app.kubernetes.io/name=clusterfuzz-kata-job',
328+
field_selector='status.phase=Pending')
329+
logs.info(f"Found {len(pods.items)} pending jobs.")
330+
return len(pods.items)
331+
except Exception as e:
332+
logs.error(f"Failed to list pods: {e}")
333+
return 0
334+
293335
def create_uworker_main_batch_job(self, module: str, job_type: str,
294336
input_download_url: str):
295337
"""Creates a single batch job for a uworker main task."""
@@ -309,6 +351,15 @@ def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]):
309351
separate batch job for each group. This allows tasks with similar
310352
requirements to be processed together, which can improve efficiency.
311353
"""
354+
if self._get_pending_jobs_count() >= MAX_PENDING_JOBS:
355+
logs.warning(
356+
f'Kubernetes job limit reached. Not acking {len(remote_tasks)} tasks.'
357+
)
358+
for task in remote_tasks:
359+
if task.pubsub_task:
360+
task.pubsub_task.do_not_ack = True
361+
return []
362+
312363
job_specs = collections.defaultdict(list)
313364
configs = _get_k8s_job_configs(remote_tasks)
314365
for remote_task in remote_tasks:
@@ -331,6 +382,8 @@ def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]):
331382
def create_kata_container_job(self, config: KubernetesJobConfig,
332383
input_url: str) -> str:
333384
"""Creates a Kubernetes job that runs in a Kata container."""
385+
service_account_name = self._create_service_account_if_needed(
386+
config.service_account_email)
334387
job_name = 'clusterfuzz-kata-job-' + str(uuid.uuid4()).split(
335388
'-', maxsplit=1)[0]
336389

@@ -341,6 +394,7 @@ def create_kata_container_job(self, config: KubernetesJobConfig,
341394
'name': job_name
342395
},
343396
'spec': {
397+
'activeDeadlineSeconds': tasks.get_task_duration(config.command),
344398
'template': {
345399
'metadata': {
346400
'labels': {
@@ -350,8 +404,8 @@ def create_kata_container_job(self, config: KubernetesJobConfig,
350404
'spec': {
351405
'runtimeClassName':
352406
'kata',
353-
'hostNetwork':
354-
True,
407+
'serviceAccountName':
408+
service_account_name,
355409
'dnsPolicy':
356410
'ClusterFirstWithHostNet',
357411
'containers': [{
@@ -415,12 +469,16 @@ def create_kata_container_job(self, config: KubernetesJobConfig,
415469
},
416470
{
417471
'name': 'IS_K8S_ENV',
418-
'value': 'True'
472+
'value': 'true'
419473
},
420474
{
421475
'name': 'DISABLE_MOUNTS',
422476
'value': 'true'
423477
},
478+
{
479+
'name': 'UPDATE_WEB_TESTS',
480+
'value': 'False'
481+
},
424482
]
425483
}],
426484
'restartPolicy':

0 commit comments

Comments
 (0)