Skip to content

Commit 11bf695

Browse files
authored
fix: correct the settings for OTEL and add logging in the scheduler (#904)
* fix: correct OTEL settings * feat: add some logging in the task scheduler to keep track of its activity
1 parent 346e522 commit 11bf695

2 files changed

Lines changed: 124 additions & 2 deletions

File tree

  • diracx-routers/src/diracx/routers
  • diracx-tasks/src/diracx/tasks/plumbing/scheduler

diracx-routers/src/diracx/routers/otel.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,10 @@ def instrument_otel(app: FastAPI) -> None:
124124
# # override logger format which with trace id and span id
125125
# https://github.com/mhausenblas/ref.otel.help/blob/main/how-to/logs-collection/yoda/main.py
126126

127-
LoggingInstrumentor().instrument(set_logging_format=True)
127+
# When set to True, the logs are too noisy to be read from the pods.
128+
# Moreover, the data can't be ingested by CERN Opentelemtry
129+
# setup. To be investigated
130+
LoggingInstrumentor().instrument(set_logging_format=False)
128131

129132
logger_provider = LoggerProvider(resource=resource)
130133
_logs.set_logger_provider(logger_provider)
@@ -136,7 +139,15 @@ def instrument_otel(app: FastAPI) -> None:
136139
)
137140
logger_provider.add_log_record_processor(BatchLogRecordProcessor(otlp_exporter))
138141
handler = LoggingHandler(level=logging.DEBUG, logger_provider=logger_provider)
139-
handler.setFormatter(logging.Formatter(DEFAULT_LOGGING_FORMAT))
142+
# We need to give some default values for these keys, otherwise the service crashes
143+
# https://github.com/DIRACGrid/diracx/pull/847/
144+
# handler.setFormatter(logging.Formatter(DEFAULT_LOGGING_FORMAT))
145+
default_format = dict.fromkeys(
146+
["otelTraceID", "otelSpanID", "otelServiceName", "otelTraceSampled"], "Default"
147+
)
148+
handler.setFormatter(
149+
logging.Formatter(DEFAULT_LOGGING_FORMAT, defaults=default_format)
150+
)
140151
# Add the handler to diracx and all uvicorn logger
141152
# Note adding it to just 'uvicorn' or the root logger
142153
# is not enough because uvicorn sets propagate=False

diracx-tasks/src/diracx/tasks/plumbing/scheduler/scheduler.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666
"""
6767

6868
DELAYED_ZSET_KEY = "diracx:tasks:delayed"
69+
SCHEDULE_DUMP_INTERVAL_SECONDS = 600
70+
SCHEDULE_DUMP_MAX_ENTRIES = 20
6971

7072

7173
async def schedule_delayed(
@@ -124,6 +126,8 @@ def __init__(
124126
)
125127
# Mapping of (task_class_name, vo_or_empty) -> next_scheduled_time
126128
self._next_runs: dict[tuple[str, str], datetime] = {}
129+
self._schedule_dump_interval_seconds = SCHEDULE_DUMP_INTERVAL_SECONDS
130+
self._last_schedule_dump_at: datetime | None = None
127131
# Cached ZSET size for OTel observable gauge
128132
self._delayed_zset_size: int = 0
129133
_meter.create_observable_gauge(
@@ -134,6 +138,7 @@ def __init__(
134138

135139
async def startup(self) -> None:
136140
await self.broker.startup()
141+
self._log_task_registry_awareness()
137142
logger.info("Scheduler started")
138143

139144
async def shutdown(self) -> None:
@@ -201,6 +206,9 @@ async def _periodic_loop(self, finish_event: asyncio.Event) -> None:
201206
await asyncio.gather(*coros)
202207

203208
self._next_runs.update(due_updates)
209+
if self._should_dump_schedule_snapshot(now):
210+
self._log_next_schedules_snapshot("periodic")
211+
self._last_schedule_dump_at = now
204212

205213
try:
206214
await asyncio.wait_for(finish_event.wait(), timeout=self.check_interval)
@@ -256,15 +264,24 @@ def load_vos(self) -> list[str]:
256264
def _compute_initial_schedules(self) -> None:
257265
"""Compute the initial next-run times for all periodic tasks."""
258266
vos = self.load_vos()
267+
non_periodic_count = 0
268+
disabled_count = 0
269+
periodic_count = 0
270+
vo_aware_count = 0
271+
scheduled_entries = 0
259272

260273
for task_name, task_cls in self.task_registry.items():
261274
if not issubclass(task_cls, PeriodicBaseTask):
275+
non_periodic_count += 1
262276
continue
277+
periodic_count += 1
263278
if not getattr(task_cls, "_enabled", True):
279+
disabled_count += 1
264280
continue
265281
schedule = task_cls.default_schedule
266282

267283
if issubclass(task_cls, PeriodicVoAwareBaseTask):
284+
vo_aware_count += 1
268285
if not vos:
269286
logger.warning(
270287
"No VOs configured, skipping VO-aware task %s",
@@ -273,8 +290,22 @@ def _compute_initial_schedules(self) -> None:
273290
continue
274291
for vo in vos:
275292
self.add_vo_schedule(task_name, vo, schedule.next_occurrence())
293+
scheduled_entries += 1
276294
else:
277295
self._next_runs[(task_name, "")] = schedule.next_occurrence()
296+
scheduled_entries += 1
297+
298+
logger.info(
299+
"Initial periodic schedules computed: entries=%d periodic=%d "
300+
"vo_aware=%d disabled=%d non_periodic=%d vos=%d",
301+
scheduled_entries,
302+
periodic_count,
303+
vo_aware_count,
304+
disabled_count,
305+
non_periodic_count,
306+
len(vos),
307+
)
308+
self._log_next_schedules_snapshot("initial")
278309

279310
def add_vo_schedule(self, task_name: str, vo: str, next_run: datetime) -> None:
280311
"""Register a VO-specific periodic task schedule."""
@@ -414,6 +445,86 @@ async def _config_watch_loop(self, finish_event: asyncio.Event) -> None:
414445
if key[1] in removed:
415446
del self._next_runs[key]
416447

448+
logger.info(
449+
"Reconciled VO schedules: tracked_entries=%d added_vos=%d removed_vos=%d",
450+
len(self._next_runs),
451+
len(added),
452+
len(removed),
453+
)
454+
self._log_next_schedules_snapshot("config_reconcile")
455+
456+
def _log_task_registry_awareness(self) -> None:
457+
"""Log which tasks are known to the scheduler."""
458+
periodic_enabled: list[str] = []
459+
vo_aware_enabled: list[str] = []
460+
disabled_periodic: list[str] = []
461+
non_periodic: list[str] = []
462+
463+
for task_name, task_cls in self.task_registry.items():
464+
if not issubclass(task_cls, PeriodicBaseTask):
465+
non_periodic.append(task_name)
466+
continue
467+
if not getattr(task_cls, "_enabled", True):
468+
disabled_periodic.append(task_name)
469+
continue
470+
periodic_enabled.append(task_name)
471+
if issubclass(task_cls, PeriodicVoAwareBaseTask):
472+
vo_aware_enabled.append(task_name)
473+
474+
periodic_enabled.sort()
475+
vo_aware_enabled.sort()
476+
disabled_periodic.sort()
477+
non_periodic.sort()
478+
479+
logger.info(
480+
"Scheduler task registry: total=%d periodic_enabled=%d "
481+
"vo_aware_enabled=%d periodic_disabled=%d non_periodic=%d",
482+
len(self.task_registry),
483+
len(periodic_enabled),
484+
len(vo_aware_enabled),
485+
len(disabled_periodic),
486+
len(non_periodic),
487+
)
488+
if periodic_enabled:
489+
logger.info("Scheduler periodic tasks: %s", periodic_enabled)
490+
if vo_aware_enabled:
491+
logger.info("Scheduler VO-aware periodic tasks: %s", vo_aware_enabled)
492+
if disabled_periodic:
493+
logger.info("Scheduler disabled periodic tasks: %s", disabled_periodic)
494+
if non_periodic:
495+
logger.info("Scheduler non-periodic tasks in registry: %s", non_periodic)
496+
497+
def _should_dump_schedule_snapshot(self, now: datetime) -> bool:
498+
if self._last_schedule_dump_at is None:
499+
return True
500+
elapsed = (now - self._last_schedule_dump_at).total_seconds()
501+
return elapsed >= self._schedule_dump_interval_seconds
502+
503+
def _log_next_schedules_snapshot(self, source: str) -> None:
504+
"""Log a bounded, sorted dump of upcoming schedules."""
505+
if not self._next_runs:
506+
logger.info("Next schedule snapshot (%s): no tracked schedules", source)
507+
return
508+
509+
upcoming = sorted(self._next_runs.items(), key=lambda item: item[1])
510+
shown = upcoming[:SCHEDULE_DUMP_MAX_ENTRIES]
511+
rendered = [
512+
{
513+
"task": task_name,
514+
"vo": vo or "N/A",
515+
"next_run": next_run.isoformat(),
516+
}
517+
for (task_name, vo), next_run in shown
518+
]
519+
520+
logger.info(
521+
"Next schedule snapshot (%s): tracked=%d shown=%d entries=%s",
522+
source,
523+
len(upcoming),
524+
len(rendered),
525+
rendered,
526+
)
527+
417528
# ------------------------------------------------------------------
418529
# OTel observable gauge callback
419530
# ------------------------------------------------------------------

0 commit comments

Comments
 (0)