Skip to content

Commit 6d17163

Browse files
IvanBM18letitz
andauthored
Custom logger for Swarming & Removes circular dependency at logs.py (#5247)
Currently swarming bots use the K8s default logging configurations(which work fine :D), but when we try to find logs specific to a given swarming bot host or a swarming task, things start to get complicated, its hard to pinpoint the execution workflow. In this PR we changed the configuration so the logger can displays different information if its running on swarming. To achieve this: - Added a logger config if running in swarming and a new method in the `environment` module - Removed a circular dependency between `environment` and `logs` module - Causing a method to be moved from one module to the other - Simplified a bunch of conditionals in the `logs` module Note: This PR is complimentary to this [other pr](https://clusterfuzz-config-472119376969.us-central1.sourcemanager.dev/clusterfuzz-testing/clusterfuzz-config/pulls/291) ## Tests performed I launched some fuzzing jobs that executed in swarming, thanks to the new fields i was able to easily find the logs for a given bot or a given task. [See example](https://cloudlogging.app.goo.gl/kNdC9cXeGEmF5hdRA) --------- Co-authored-by: Titouan Rigoudy <titouan@chromium.org>
1 parent acf7691 commit 6d17163

6 files changed

Lines changed: 109 additions & 68 deletions

File tree

src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ def uworker_main(input_download_url) -> None:
454454
_start_web_server_if_needed(uworker_input.job_type)
455455

456456
utask_module = get_utask_module(uworker_input.module_name)
457-
execution_mode = Mode.SWARMING if environment.is_swarming_bot(
457+
execution_mode = Mode.SWARMING if environment.is_running_on_swarming(
458458
) else Mode.BATCH
459459
recorder.set_task_details(
460460
utask_module, uworker_input.job_type, execution_mode,

src/clusterfuzz/_internal/metrics/logs.py

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
from typing import NamedTuple
3333
from typing import TYPE_CHECKING
3434

35+
from clusterfuzz._internal.system import environment
36+
3537
# This is needed to avoid circular import
3638
if TYPE_CHECKING:
3739
from clusterfuzz._internal.cron.grouper import TestcaseAttributes
@@ -93,26 +95,17 @@ def _console_logging_enabled():
9395

9496
# TODO(pmeuleman) Revert the changeset that added these once
9597
# https://github.com/google/clusterfuzz/pull/3422 lands.
96-
def _file_logging_enabled():
98+
def _file_logging_enabled() -> bool:
9799
"""Return bool True when logging to files (bot/logs/*.log) is enabled.
98-
This is enabled by default.
99-
This is disabled if we are running in app engine or kubernetes as these have
100-
their dedicated loggers, see configure_appengine() and configure_k8s().
101-
"""
102-
return bool(os.getenv(
103-
'LOG_TO_FILE',
104-
'True')) and not _is_running_on_app_engine() and not _is_running_on_k8s()
100+
This is enabled by default."""
101+
return environment.get_value('LOG_TO_FILE', True)
105102

106103

107-
def _cloud_logging_enabled():
104+
def _cloud_logging_enabled() -> bool:
108105
"""Return bool True where Google Cloud Logging is enabled.
109-
This is enabled by default.
110-
This is disabled for local development and if we are running in a app engine
111-
or kubernetes as these have their dedicated loggers, see
112-
configure_appengine() and configure_k8s()."""
113-
return (bool(os.getenv('LOG_TO_GCP', 'True')) and
114-
not os.getenv("PY_UNITTESTS") and not _is_local() and
115-
not _is_running_on_app_engine() and not _is_running_on_k8s())
106+
This is enabled by default but disabled for local development."""
107+
return (environment.get_value('LOG_TO_GCP', True) and
108+
not environment.is_running_unit_tests() and not _is_local())
116109

117110

118111
def suppress_unwanted_warnings():
@@ -418,7 +411,7 @@ def configure_appengine():
418411
"""Configure logging for App Engine."""
419412
logging.getLogger().setLevel(logging.INFO)
420413

421-
if os.getenv('LOCAL_DEVELOPMENT') or os.getenv('PY_UNITTESTS'):
414+
if os.getenv('LOCAL_DEVELOPMENT') or environment.is_running_unit_tests():
422415
return
423416

424417
import google.cloud.logging
@@ -554,12 +547,38 @@ def cloud_label_filter(record):
554547
logging.getLogger().addHandler(handler)
555548

556549

550+
def configure_swarming(name: str, extras: dict[str, str] | None = None) -> None:
551+
"""Configure logging for swarming bots."""
552+
if extras is None:
553+
extras = {}
554+
extras['task_id'] = os.getenv('TASK_ID')
555+
extras['instance_id'] = os.getenv('BOT_NAME')
556+
extras['platform'] = 'swarming'
557+
558+
global _default_extras
559+
_default_extras = extras
560+
561+
logging.basicConfig(level=logging.INFO)
562+
if _cloud_logging_enabled():
563+
configure_cloud_logging()
564+
565+
logger = logging.getLogger(name)
566+
logger.setLevel(logging.INFO)
567+
set_logger(logger)
568+
569+
sys.excepthook = uncaught_exception_handler
570+
571+
557572
def configure(name, extras=None):
558573
"""Set logger. See the list of loggers in bot/config/logging.yaml.
559574
Also configures the process to log any uncaught exceptions as an error.
560575
|extras| will be included by emit() in log messages."""
561576
suppress_unwanted_warnings()
562577

578+
if environment.is_running_on_swarming():
579+
configure_swarming(name, extras)
580+
return
581+
563582
if _is_running_on_k8s():
564583
configure_k8s()
565584
return
@@ -792,7 +811,6 @@ def get_common_log_context() -> dict[str, str]:
792811
"""Return common context to be propagated by logs."""
793812
# Avoid circular imports on the top level.
794813
from clusterfuzz._internal.base import utils
795-
from clusterfuzz._internal.system import environment
796814

797815
try:
798816
os_type = environment.platform()

src/clusterfuzz/_internal/system/environment.py

Lines changed: 5 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@
2323
import sys
2424
import uuid
2525

26-
import requests
2726
import yaml
2827

2928
from clusterfuzz._internal import fuzzing
30-
from clusterfuzz._internal.metrics import logs
3129

3230
# Tools supporting customization of options via ADDITIONAL_{TOOL_NAME}_OPTIONS.
3331
# FIXME: Support ADDITIONAL_UBSAN_OPTIONS and ADDITIONAL_LSAN_OPTIONS in an
@@ -749,9 +747,9 @@ def get_runtime() -> UtaskMainRuntime:
749747
return UtaskMainRuntime.INSTANCE_GROUP
750748

751749

752-
def is_swarming_bot():
750+
def is_running_on_swarming() -> bool:
753751
"""Return whether or not the current bot is a swarming bot."""
754-
return get_value('SWARMING_BOT')
752+
return get_value('SWARMING_BOT') is True
755753

756754

757755
def is_running_on_app_engine():
@@ -1240,42 +1238,6 @@ def is_tworker():
12401238
return get_value('TWORKER', False)
12411239

12421240

1243-
def update_task_enabled() -> bool:
1244-
""" It uses the GCE VM metadata server `update_task_enabled` flag.
1245-
1246-
This flag will be used to rollout the update_task deprecation
1247-
by disabling it progressively for each instance group through
1248-
the instance template metadata
1249-
"""
1250-
metadata_url = ("http://metadata.google.internal/computeMetadata/v1/" +
1251-
"instance/attributes/")
1252-
metadata_header = {"Metadata-Flavor": "Google"}
1253-
metadata_key = "update_task_enabled"
1254-
1255-
running_on_batch = bool(is_uworker())
1256-
1257-
try:
1258-
# Construct the full URL for your specific metadata key
1259-
response = requests.get(
1260-
f"{metadata_url}{metadata_key}", headers=metadata_header, timeout=10)
1261-
1262-
# Raise an exception for bad status codes (4xx or 5xx)
1263-
response.raise_for_status()
1264-
1265-
# The metadata value is in the response text
1266-
metadata_value = response.text
1267-
logs.info(f"The value for '{metadata_key}' is: {metadata_value}")
1268-
is_update_task_enabled = metadata_value.lower() != 'false'
1269-
1270-
# The flag is_uworker is true for Batch environment
1271-
# The update task should run if it's not a Batch environment
1272-
# and the flag is enabled on the VM template metadata
1273-
return not running_on_batch and is_update_task_enabled
1274-
1275-
except requests.exceptions.HTTPError as http_error:
1276-
logs.warning(f"Http error fetching metadata: {http_error}")
1277-
1278-
except Exception as ex:
1279-
logs.error(f"Unknown exception fetching metadata: {ex}")
1280-
1281-
return not running_on_batch
1241+
def is_running_unit_tests() -> bool:
1242+
"""Returns whether or not we're running unit tests."""
1243+
return get_value('PY_UNITTESTS', False)

src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ def setUp(self):
199199
'clusterfuzz._internal.bot.tasks.utasks.uworker_io.download_and_deserialize_uworker_input',
200200
'clusterfuzz._internal.bot.tasks.utasks.uworker_io.serialize_and_upload_uworker_output',
201201
'clusterfuzz._internal.bot.tasks.utasks.get_utask_module',
202-
'clusterfuzz._internal.system.environment.is_swarming_bot',
202+
'clusterfuzz._internal.system.environment.is_running_on_swarming',
203203
'clusterfuzz._internal.metrics.events.emit',
204204
])
205205
self.module = mock.MagicMock(__name__='tasks.analyze_task')
@@ -210,10 +210,8 @@ def test_uworker_main(self, execution_mode: utasks.Mode):
210210
"""Tests that uworker_main works as intended."""
211211
start_time_ns = time.time_ns()
212212

213-
if execution_mode == utasks.Mode.SWARMING:
214-
self.mock.is_swarming_bot.return_value = True # pylint: disable=protected-access
215-
else:
216-
self.mock.is_swarming_bot.return_value = False
213+
self.mock.is_running_on_swarming.return_value = (
214+
execution_mode == utasks.Mode.SWARMING)
217215

218216
preprocess_start_time_ns = start_time_ns - 42 * 10**9 # In the past.
219217
preprocess_start_timestamp = timestamp_pb2.Timestamp()

src/clusterfuzz/_internal/tests/core/metrics/logs_test.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,26 @@ def test_configure_appengine(self):
521521
logs.configure('test')
522522
self.assertEqual(0, self.mock.dictConfig.call_count)
523523

524+
def test_configure_swarming(self):
525+
"""Test configure for swarming bot."""
526+
# pylint: disable=protected-access
527+
os.environ['SWARMING_BOT'] = 'True'
528+
os.environ['TASK_ID'] = 'task-123'
529+
os.environ['BOT_NAME'] = 'bot-123'
530+
531+
helpers.patch(
532+
self,
533+
['clusterfuzz._internal.system.environment.is_running_on_swarming'])
534+
535+
logger = mock.MagicMock()
536+
self.mock.getLogger.return_value = logger
537+
538+
logs.configure('test')
539+
540+
self.assertEqual(logs._default_extras['task_id'], 'task-123')
541+
self.assertEqual(logs._default_extras['instance_id'], 'bot-123')
542+
self.assertEqual(logs._default_extras['platform'], 'swarming')
543+
524544

525545
@test_utils.with_cloud_emulators('datastore')
526546
class EmitTest(unittest.TestCase):

src/python/bot/startup/run_bot.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import time
2828
import traceback
2929

30+
import requests
31+
3032
from clusterfuzz._internal.base import dates
3133
from clusterfuzz._internal.base import errors
3234
from clusterfuzz._internal.base import tasks
@@ -125,7 +127,7 @@ def task_loop():
125127
# This caches the current environment on first run. Don't move this.
126128
environment.reset_environment()
127129
try:
128-
if environment.update_task_enabled():
130+
if update_task_enabled():
129131
logs.info("Running update task.")
130132
# Run regular updates.
131133
# TODO(metzman): Move this after utask_main execution
@@ -193,6 +195,47 @@ def task_loop():
193195
return stacktrace, clean_exit, task_payload
194196

195197

198+
def update_task_enabled() -> bool:
199+
""" It uses the GCE VM metadata server `update_task_enabled` flag.
200+
201+
This flag will be used to rollout the update_task deprecation
202+
by disabling it progressively for each instance group through
203+
the instance template metadata
204+
"""
205+
metadata_url = ("http://metadata.google.internal/computeMetadata/v1/" +
206+
"instance/attributes/")
207+
metadata_header = {"Metadata-Flavor": "Google"}
208+
metadata_key = "update_task_enabled"
209+
210+
running_on_batch = bool(environment.is_uworker())
211+
212+
try:
213+
# Construct the full URL for your specific metadata key
214+
response = requests.get(
215+
f"{metadata_url}{metadata_key}", headers=metadata_header, timeout=10)
216+
217+
# Raise an exception for bad status codes (4xx or 5xx)
218+
response.raise_for_status()
219+
220+
# The metadata value is in the response text
221+
metadata_value = response.text
222+
logs.info(f"The value for '{metadata_key}' is: {metadata_value}")
223+
is_update_task_enabled = metadata_value.lower() != 'false'
224+
225+
# The flag is_uworker is true for Batch environment
226+
# The update task should run if it's not a Batch environment
227+
# and the flag is enabled on the VM template metadata
228+
return not running_on_batch and is_update_task_enabled
229+
230+
except requests.exceptions.HTTPError as http_error:
231+
logs.warning(f"Http error fetching metadata: {http_error}")
232+
233+
except Exception as ex:
234+
logs.error(f"Unknown exception fetching metadata: {ex}")
235+
236+
return not running_on_batch
237+
238+
196239
def main():
197240
"""Prepare the configuration options and start requesting tasks."""
198241
logs.configure('run_bot')

0 commit comments

Comments
 (0)