Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ def main():
'clean_indexes', help='Clean up undefined indexes (in index.yaml).')
parser_clean_indexes.add_argument(
'-c', '--config-dir', required=True, help='Path to application config.')

parser_create_config = subparsers.add_parser(
'create_config', help='Create a new deployment config.')
parser_create_config.add_argument(
Expand Down
13 changes: 11 additions & 2 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@
'regression': 24 * 60 * 60,
}


def get_task_duration(command):
"""Gets the duration of a task."""
return TASK_LEASE_SECONDS_BY_COMMAND.get(command, TASK_LEASE_SECONDS)


TASK_QUEUE_DISPLAY_NAMES = {
'LINUX': 'Linux',
'LINUX_WITH_GPU': 'Linux (with GPU)',
Expand Down Expand Up @@ -503,6 +509,7 @@ def __init__(self, pubsub_message):
}

self.eta = datetime.datetime.utcfromtimestamp(float(self.attribute('eta')))
self.do_not_ack = False

def attribute(self, key):
"""Return attribute value."""
Expand Down Expand Up @@ -550,7 +557,8 @@ def lease(self, _event=None): # pylint: disable=arguments-differ
leaser_thread.join()

# If we get here the task succeeded in running. Acknowledge the message.
self._pubsub_message.ack()
if not self.do_not_ack:
self._pubsub_message.ack()
track_task_end()

def dont_retry(self):
Expand Down Expand Up @@ -587,7 +595,8 @@ def lease(self, _event=None): # pylint: disable=arguments-differ
leaser_thread.join()

# If we get here the task succeeded in running. Acknowledge the message.
self._pubsub_message.ack()
if not self.do_not_ack:
self._pubsub_message.ack()
track_task_end()


Expand Down
7 changes: 1 addition & 6 deletions src/clusterfuzz/_internal/batch/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,6 @@ def _get_config_names(batch_tasks: List[RemoteTask]):
return config_map


def _get_task_duration(command):
return tasks.TASK_LEASE_SECONDS_BY_COMMAND.get(command,
tasks.TASK_LEASE_SECONDS)


WeightedSubconfig = collections.namedtuple('WeightedSubconfig',
['name', 'weight'])

Expand Down Expand Up @@ -293,7 +288,7 @@ def _get_specs_from_config(batch_tasks: List[RemoteTask]) -> Dict:
# Lower numbers are a lower priority, meaning less likely to run From:
# https://cloud.google.com/batch/docs/reference/rest/v1/projects.locations.jobs
priority = 0 if task.command == 'fuzz' else 1
max_run_duration = f'{_get_task_duration(task.command)}s'
max_run_duration = f'{tasks.get_task_duration(task.command)}s'
# This saves us time and reduces fragementation, e.g. every linux fuzz task
# run in this call will run in the same zone.
if config_name not in subconfig_map:
Expand Down
2 changes: 1 addition & 1 deletion src/clusterfuzz/_internal/bot/tasks/task_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def execute(self, task_argument, job_type, uworker_env):
'analyze': UTask,
'blame': TrustedTask,
'corpus_pruning': UTask,
'fuzz': UTaskLocalExecutor,
'fuzz': UTask,
'impact': TrustedTask,
'minimize': UTask,
'progression': UTask,
Expand Down
26 changes: 17 additions & 9 deletions src/clusterfuzz/_internal/bot/tasks/utasks/fuzz_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ def clamp(val, minimum, maximum):
'return_code': return_code,
'platform': environment.platform(),
'job': job_type,
'runtime': environment.get_runtime().value
})


Expand All @@ -480,20 +481,26 @@ def _track_testcase_run_result(fuzzer, job_type, new_crash_count,
known_crash_count, {
'fuzzer': fuzzer,
'platform': environment.platform(),
'runtime': environment.get_runtime().value
})
monitoring_metrics.FUZZER_NEW_CRASH_COUNT.increment_by(
new_crash_count, {
'fuzzer': fuzzer,
'platform': environment.platform(),
'runtime': environment.get_runtime().value
})
monitoring_metrics.JOB_KNOWN_CRASH_COUNT.increment_by(
known_crash_count, {
'job': job_type,
'platform': environment.platform(),
'runtime': environment.get_runtime().value
})
monitoring_metrics.JOB_NEW_CRASH_COUNT.increment_by(
new_crash_count, {
'job': job_type,
'platform': environment.platform(),
'runtime': environment.get_runtime().value
})
monitoring_metrics.JOB_KNOWN_CRASH_COUNT.increment_by(known_crash_count, {
'job': job_type,
'platform': environment.platform(),
})
monitoring_metrics.JOB_NEW_CRASH_COUNT.increment_by(new_crash_count, {
'job': job_type,
'platform': environment.platform()
})


def _last_sync_time(sync_file_path):
Expand Down Expand Up @@ -2020,7 +2027,8 @@ def run(self):
fuzzing_session_duration, {
'fuzzer': self.fuzzer_name,
'job': self.job_type,
'platform': environment.platform()
'platform': environment.platform(),
'runtime': environment.get_runtime().value,
})

return uworker_msg_pb2.Output(fuzz_task_output=self.fuzz_task_output) # pylint: disable=no-member
Expand Down
130 changes: 94 additions & 36 deletions src/clusterfuzz/_internal/k8s/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
from google.auth.transport import requests as google_requests
from googleapiclient import discovery
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config

from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.base.tasks import task_utils
from clusterfuzz._internal.config import local_config
Expand All @@ -35,9 +37,7 @@
from clusterfuzz._internal.remote_task import RemoteTaskInterface
from clusterfuzz._internal.system import environment

# See https://cloud.google.com/batch/quotas#job_limits
MAX_CONCURRENT_VMS_PER_JOB = 1000
# TODO(javanlacerda): Use feature flag for setting it.
MAX_PENDING_JOBS = 1000
CLUSTER_NAME = 'clusterfuzz-cronjobs-gke'

KubernetesJobConfig = collections.namedtuple('KubernetesJobConfig', [
Expand Down Expand Up @@ -93,6 +93,7 @@ def _get_k8s_job_configs(remote_tasks: List[RemoteTask]) -> Dict:

if not remote_tasks:
return {}
#TODO(javanlacerda): Create remote task config
batch_config = local_config.BatchConfig()
config_map = _get_config_names(remote_tasks)
configs = {}
Expand Down Expand Up @@ -130,7 +131,8 @@ def _get_k8s_job_configs(remote_tasks: List[RemoteTask]) -> Dict:
return configs


def _create_job_body(config: KubernetesJobConfig, input_url: str) -> dict:
def _create_job_body(config: KubernetesJobConfig, input_url: str,
service_account_name: str) -> dict:
"""Creates the body of a Kubernetes job."""

job_name = config.job_type.replace('_', '-') + '-' + str(uuid.uuid4()).split(
Expand All @@ -144,10 +146,11 @@ def _create_job_body(config: KubernetesJobConfig, input_url: str) -> dict:
'name': job_name
},
'spec': {
'activeDeadlineSeconds': tasks.get_task_duration(config.command),
'template': {
'spec': {
'hostNetwork':
True,
'serviceAccountName':
service_account_name,
'containers': [{
'name':
job_name,
Expand Down Expand Up @@ -182,12 +185,16 @@ def _create_job_body(config: KubernetesJobConfig, input_url: str) -> dict:
},
{
'name': 'IS_K8S_ENV',
'value': 'True'
'value': 'true'
},
{
'name': 'DISABLE_MOUNTS',
'value': 'true'
},
{
'name': 'UPDATE_WEB_TESTS',
'value': 'False'
},
],
'securityContext': {
'privileged': True,
Expand Down Expand Up @@ -229,31 +236,31 @@ def __init__(self, k8s_config_loaded: bool = False):

def _load_gke_credentials(self):
"""Loads GKE credentials and configures the Kubernetes client."""
credentials, project = google.auth.default()
credentials, _ = google.auth.default()
project = utils.get_application_id()
service = discovery.build('container', 'v1', credentials=credentials)
parent = f'projects/{project}/locations/-'
# pylint: disable=no-member
response = service.projects().locations().clusters().list(
parent=parent).execute()

cluster = None
for c in response.get('clusters', []):
if c['name'] == CLUSTER_NAME:
cluster = c
break

if not cluster:
logs.error(f'Cluster {CLUSTER_NAME} not found.')
parent = f"projects/{project}/locations/-"

try:
response = service.projects().locations().clusters().list(
parent=parent).execute()
clusters = response.get('clusters', [])
cluster = next((c for c in clusters if c['name'] == CLUSTER_NAME), None)

if not cluster:
logs.error(f"Cluster {CLUSTER_NAME} not found in project {project}.")
print(f"DEBUG: Cluster {CLUSTER_NAME} not found in project {project}.")
return

except Exception as e:
logs.error(f"Failed to list clusters in {project}: {e}")
return

endpoint = cluster['endpoint']
# ca_cert is base64 encoded.
ca_cert = base64.b64decode(cluster['masterAuth']['clusterCaCertificate'])

# Write CA cert to a temporary file.
# Note: This file persists for the lifetime of the process/container.
# Ideally it should be cleaned up, but standard k8s client usage involves
# file paths.
fd, ca_cert_path = tempfile.mkstemp()
with os.fdopen(fd, 'wb') as f:
f.write(ca_cert)
Expand All @@ -264,17 +271,38 @@ def _load_gke_credentials(self):
configuration.verify_ssl = True

def get_token(creds):
if not creds.valid:
request = google_requests.Request()
request = google_requests.Request()
if not creds.valid or creds.expired:
creds.refresh(request)
return creds.token
return {"authorization": "Bearer " + creds.token}

# Hook to refresh token on API calls.
configuration.refresh_api_key_hook = lambda _: {
"authorization": "Bearer " + get_token(credentials)
}
configuration.refresh_api_key_hook = lambda _: get_token(credentials)
configuration.api_key = get_token(credentials)

k8s_client.Configuration.set_default(configuration)
logs.info("GKE credentials loaded successfully.")

def _create_service_account_if_needed(self,
service_account_email: str) -> str:
"""Creates a Kubernetes Service Account if it doesn't exist."""
service_account_name = service_account_email.split('@')[0]
namespace = 'default'
try:
self._core_api.read_namespaced_service_account(service_account_name,
namespace)
return service_account_name
except k8s_client.rest.ApiException as e:
if e.status != 404:
raise

logs.info(f'Creating Service Account {service_account_name} for '
f'{service_account_email}.')
metadata = k8s_client.V1ObjectMeta(
name=service_account_name,
annotations={'iam.gke.io/gcp-service-account': service_account_email})
body = k8s_client.V1ServiceAccount(metadata=metadata)
self._core_api.create_namespaced_service_account(namespace, body)
return service_account_name

def create_job(self, config: KubernetesJobConfig, input_url: str) -> str:
"""Creates a Kubernetes job.
Expand All @@ -285,11 +313,25 @@ def create_job(self, config: KubernetesJobConfig, input_url: str) -> str:
Returns:
The name of the created Kubernetes job.
"""

job_body = _create_job_body(config, input_url)
service_account_name = self._create_service_account_if_needed(
config.service_account_email)
job_body = _create_job_body(config, input_url, service_account_name)
self._batch_api.create_namespaced_job(body=job_body, namespace='default')
return job_body['metadata']['name']

def _get_pending_jobs_count(self) -> int:
"""Returns the number of pending jobs."""
try:
pods = self._core_api.list_namespaced_pod(
namespace='default',
label_selector='app.kubernetes.io/name=clusterfuzz-kata-job',
field_selector='status.phase=Pending')
logs.info(f"Found {len(pods.items)} pending jobs.")
return len(pods.items)
except Exception as e:
logs.error(f"Failed to list pods: {e}")
return 0

def create_uworker_main_batch_job(self, module: str, job_type: str,
input_download_url: str):
"""Creates a single batch job for a uworker main task."""
Expand All @@ -309,6 +351,15 @@ def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]):
separate batch job for each group. This allows tasks with similar
requirements to be processed together, which can improve efficiency.
"""
if self._get_pending_jobs_count() >= MAX_PENDING_JOBS:
logs.warning(
f'Kubernetes job limit reached. Not acking {len(remote_tasks)} tasks.'
)
for task in remote_tasks:
if task.pubsub_task:
task.pubsub_task.do_not_ack = True
return []

job_specs = collections.defaultdict(list)
configs = _get_k8s_job_configs(remote_tasks)
for remote_task in remote_tasks:
Expand All @@ -331,6 +382,8 @@ def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]):
def create_kata_container_job(self, config: KubernetesJobConfig,
input_url: str) -> str:
"""Creates a Kubernetes job that runs in a Kata container."""
service_account_name = self._create_service_account_if_needed(
config.service_account_email)
job_name = 'clusterfuzz-kata-job-' + str(uuid.uuid4()).split(
'-', maxsplit=1)[0]

Expand All @@ -341,6 +394,7 @@ def create_kata_container_job(self, config: KubernetesJobConfig,
'name': job_name
},
'spec': {
'activeDeadlineSeconds': tasks.get_task_duration(config.command),
'template': {
'metadata': {
'labels': {
Expand All @@ -350,8 +404,8 @@ def create_kata_container_job(self, config: KubernetesJobConfig,
'spec': {
'runtimeClassName':
'kata',
'hostNetwork':
True,
'serviceAccountName':
service_account_name,
'dnsPolicy':
'ClusterFirstWithHostNet',
'containers': [{
Expand Down Expand Up @@ -415,12 +469,16 @@ def create_kata_container_job(self, config: KubernetesJobConfig,
},
{
'name': 'IS_K8S_ENV',
'value': 'True'
'value': 'true'
},
{
'name': 'DISABLE_MOUNTS',
'value': 'true'
},
{
'name': 'UPDATE_WEB_TESTS',
'value': 'False'
},
]
}],
'restartPolicy':
Expand Down
Loading
Loading