Skip to content

Commit d4c309d

Browse files
mobuchowskiclaude
andauthored
postgres data-observability - use cron to precisely schedule queries (DataDog#23529)
* postgres data-observability - use cron to precisely schedule queries Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * review Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * fix: apply ddev formatter Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * Skip invalid queries at construction time Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com> * remove lookback as configurable param Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * Replace croniter with datadog_checks_base CronScheduler utility Drop the third-party croniter==6.2.2 dependency and rewrite the cron scheduling in PostgresDataObservability on top of the CronExpression / CronScheduler utilities added to datadog_checks_base in DataDog#23741. Changes: - _filter_valid_queries: build a CronScheduler per monitor_id inside a try/except(ValueError, TypeError) instead of calling croniter.is_valid. Scheduler captures startup_lookback at construction time. - _get_due_queries: collapse the entire cron branch to a single due_ticks(now) call; state registration, lookback recovery, tick detection, and advancement are all handled by CronScheduler. - run_job: remove the post-fire croniter re-advance; due_ticks() already advanced the scheduler at poll time. - Tests: replace _next_run[mid] accesses with _schedulers[mid].next_tick; update boundary test to reflect CronScheduler's inclusive (<=) lookback semantics vs the old strict (<) comparison. - Remove croniter from agent_requirements.in, postgres/pyproject.toml, and LICENSE-3rdparty.csv; bump datadog-checks-base pin to >=37.39.0 (first release shipping cron.py). Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * Address code review: type hints, explicit state returns, lateness comment, error guard - _filter_valid_queries: add Iterable[Query] parameter type; return (tuple[Query,...], dict[int,CronScheduler]) so callers own the assignment — removes the hidden self._schedulers side-effect. __init__ now assigns both fields explicitly. - _get_due_queries: remove setdefault seed for _last_execution; use .get() and treat None as first-sight. Scheduling-state mutations (seed and advance) are now consolidated in run_job. - run_job: add comment distinguishing lateness (scheduling delay) from execution time; wrap emit_failures count in a nested try/except so a concurrent _shutdown() cannot turn the error handler into a job crash. - Tests: merge _make_cron_lookback_check into _make_cron_check via optional window_seconds/monkeypatch params; collapse three identical lookback-window tests into one parametrized test_cron_startup_lookback_window_behavior. Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * fmt Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * code review fixes Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * postgres DO: fix cron boundary miss and improve schedule error message - due_ticks now probes now+0.001 so a first poll landing exactly on a cron tick boundary fires instead of skipping the cycle - invalid cron schedule warning now includes the exception message and the monitor_id to help customers identify and fix the bad config - regression test added for the exact-boundary case Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * revert cron.py boundary fix (moved to postgres data_observability) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * fix cron exact-boundary detection without inflating lookback window The previous +0.001 epsilon was passed directly to due_ticks(), which caused CronScheduler to compare (now+0.001) - prev <= lookback, breaking the inclusive-boundary test where now - prev == lookback exactly. Instead, call due_ticks(now) normally and only on the first poll (before next_tick is cached) probe previous_tick(before=now+0.001) to detect whether now itself is a tick. This isolates the epsilon to the boundary detection and leaves the lookback window comparison untouched. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * simplify cron boundary fix; update over-precise test Use due_ticks(now + 0.001) — the simpler approach. The previous complex wrapper was added to avoid breaking test_cron_startup_lookback_boundary_inclusive, which was asserting now - prev == window exactly. That exact-equality check belongs in test_cron.py (CronScheduler unit tests), not here. Updated the test to use a clock 55s after the tick (clearly inside the 60s window), keeping the meaningful assertion that recovery fires inside the window and does not fire outside it. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> --------- Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> Signed-off-by: Maciej Obuchowski <maciej.obuchowski@datadoghq.com> Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent cb57d7c commit d4c309d

6 files changed

Lines changed: 731 additions & 46 deletions

File tree

postgres/assets/configuration/spec.yaml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,6 @@ files:
643643
- monitor_id
644644
- dbname
645645
- query
646-
- interval_seconds
647646
- entity
648647
properties:
649648
- name: monitor_id
@@ -655,7 +654,17 @@ files:
655654
- name: query
656655
type: string
657656
- name: interval_seconds
657+
description: |
658+
How often (in seconds) to run this query. Ignored when schedule is set
659+
(see schedule for the precedence rule).
658660
type: integer
661+
- name: schedule
662+
description: |
663+
A standard 5-field cron expression (minute hour dom month dow) specifying
664+
when to run this query. When both schedule and interval_seconds are set,
665+
schedule wins and interval_seconds is ignored. If neither is set, the
666+
query is skipped at runtime with a warning.
667+
type: string
659668
- name: entity
660669
type: object
661670
required:

postgres/changelog.d/23529.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Support cron `schedule` field for Data Observability queries.

postgres/datadog_checks/postgres/config_models/instance.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,16 @@ class Query(BaseModel):
172172
custom_sql_select_fields: Optional[CustomSqlSelectFields] = None
173173
dbname: str
174174
entity: Entity
175-
interval_seconds: int
175+
interval_seconds: Optional[int] = Field(
176+
None,
177+
description='How often (in seconds) to run this query. Ignored when schedule is set\n(see schedule for the precedence rule).\n',
178+
)
176179
monitor_id: int
177180
query: str
181+
schedule: Optional[str] = Field(
182+
None,
183+
description='A standard 5-field cron expression (minute hour dom month dow) specifying\nwhen to run this query. When both schedule and interval_seconds are set,\nschedule wins and interval_seconds is ignored. If neither is set, the\nquery is skipped at runtime with a warning.\n',
184+
)
178185
type: Optional[str] = None
179186

180187

postgres/datadog_checks/postgres/data_observability.py

Lines changed: 97 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55

66
import json
77
import time
8-
from typing import TYPE_CHECKING, Any
8+
from collections.abc import Iterable
9+
from dataclasses import dataclass
10+
from typing import TYPE_CHECKING, Any, Literal
911

1012
import psycopg
1113

14+
from datadog_checks.base.utils.cron import CronScheduler
1215
from datadog_checks.base.utils.db.utils import DBMAsyncJob, default_json_event_encoding
1316

1417
if TYPE_CHECKING:
@@ -17,15 +20,29 @@
1720

1821
EVENT_TRACK_TYPE = 'do-query-results'
1922

20-
# Cap the number of rows fetched per query to prevent unbounded memory usage.
2123
MAX_RESULT_ROWS = 10_000
2224

25+
# After an agent start, run cron-scheduled queries whose scheduled time of execution
26+
# fell within this many seconds in the past. Recovers missed runs across short check
27+
# restarts (deploys, crashes, Remote Configuration redeliveries). Set to 0 to skip
28+
# catch-up.
29+
CRON_STARTUP_LOOKBACK_SECONDS = 300
30+
31+
Mode = Literal["cron", "interval"]
32+
33+
34+
@dataclass(frozen=True)
35+
class DueQuery:
36+
query: Query
37+
scheduled_time: float
38+
mode: Mode
39+
2340

2441
class PostgresDataObservability(DBMAsyncJob):
2542
def __init__(self, check: PostgreSql, config: InstanceConfig):
2643
self._check = check
2744
self._config = config
28-
self._last_execution: dict[int, float] = {}
45+
self._last_execution: dict[int, float] = {} # interval mode: last fire timestamp
2946
collection_interval = config.data_observability.collection_interval or 10
3047
super(PostgresDataObservability, self).__init__(
3148
check,
@@ -37,6 +54,8 @@ def __init__(self, check: PostgreSql, config: InstanceConfig):
3754
expected_db_exceptions=(psycopg.errors.DatabaseError,),
3855
job_name="data-observability",
3956
)
57+
# Filter bad queries on check construction.
58+
self._queries, self._schedulers = self._filter_valid_queries(self._do_config.queries or ())
4059

4160
def _shutdown(self):
4261
self._check = None
@@ -45,14 +64,51 @@ def _shutdown(self):
4564
def _do_config(self):
4665
return self._config.data_observability
4766

48-
def _get_due_queries(self) -> list[Query]:
49-
queries = self._do_config.queries or ()
50-
now = time.time()
51-
due = []
67+
def _filter_valid_queries(self, queries: Iterable[Query]) -> tuple[tuple[Query, ...], dict[int, CronScheduler]]:
68+
valid: list[Query] = []
69+
schedulers: dict[int, CronScheduler] = {}
5270
for q in queries:
53-
last_run = self._last_execution.get(q.monitor_id, 0.0)
54-
if now - last_run >= q.interval_seconds:
55-
due.append(q)
71+
if q.schedule:
72+
try:
73+
schedulers[q.monitor_id] = CronScheduler(q.schedule, startup_lookback=CRON_STARTUP_LOOKBACK_SECONDS)
74+
except (ValueError, TypeError) as e:
75+
self._log.warning(
76+
"Skipping DO query monitor_id=%d: invalid cron schedule %r (%s). "
77+
"Check the schedule of Data Observability monitor %d.",
78+
q.monitor_id,
79+
q.schedule,
80+
e,
81+
q.monitor_id,
82+
)
83+
continue
84+
elif not (q.interval_seconds and q.interval_seconds > 0):
85+
self._log.warning(
86+
"Skipping DO query monitor_id=%d: neither schedule nor positive interval_seconds set",
87+
q.monitor_id,
88+
)
89+
continue
90+
valid.append(q)
91+
return tuple(valid), schedulers
92+
93+
def _get_due_queries(self) -> list[DueQuery]:
94+
now = time.time()
95+
due: list[DueQuery] = []
96+
for q in self._queries:
97+
if q.schedule:
98+
# +0.001 so a poll landing exactly on a tick boundary is treated
99+
# as due (CronScheduler.previous_tick uses strict less-than).
100+
ticks = self._schedulers[q.monitor_id].due_ticks(now + 0.001)
101+
if ticks:
102+
# Take the latest elapsed tick; earlier ones are already in the past
103+
# and do not need separate execution.
104+
due.append(DueQuery(q, ticks[-1], "cron"))
105+
else:
106+
last = self._last_execution.get(q.monitor_id)
107+
if last is None or now - last >= q.interval_seconds:
108+
# Seed: treat first sight as if the previous interval just completed,
109+
# so the scheduled_time for DueQuery is now and lateness is 0.
110+
scheduled = (last + q.interval_seconds) if last is not None else now
111+
due.append(DueQuery(q, scheduled, "interval"))
56112
return due
57113

58114
def _build_base_tags(self) -> list[str]:
@@ -147,16 +203,20 @@ def run_job(self):
147203

148204
base_tags = self._build_base_tags()
149205

150-
for q in due_queries:
206+
for due in due_queries:
207+
q = due.query
151208
tags = base_tags + [f'monitor_id:{q.monitor_id}']
152209

210+
now_at_fire_start = time.time()
153211
with self._check.db_pool.get_connection(q.dbname) as conn:
154212
result = self._execute_single_query(conn, q)
155213

156-
# Update scheduling timestamp immediately after execution, before
157-
# metric/event emission, so a serialization failure in the event
158-
# path cannot cause infinite re-execution of the same query.
159-
self._last_execution[q.monitor_id] = time.time()
214+
# Advance scheduling state before emission so an emit-side error cannot
215+
# leave the query stuck re-firing the same tick.
216+
# For cron mode, due_ticks() already advanced the scheduler's internal state.
217+
now_at_fire_end = time.time()
218+
if due.mode == "interval":
219+
self._last_execution[q.monitor_id] = now_at_fire_end
160220

161221
try:
162222
self._check.gauge(
@@ -174,6 +234,17 @@ def run_job(self):
174234
raw=True,
175235
)
176236

237+
# Lateness measures scheduling delay only (time from tick to query start),
238+
# not end-to-end result latency — query execution time is reported separately.
239+
lateness = max(0.0, now_at_fire_start - due.scheduled_time)
240+
self._check.gauge(
241+
'dd.postgres.data_observability.query_fire_lateness_seconds',
242+
lateness,
243+
tags=tags + [f'mode:{due.mode}'],
244+
hostname=self._check.reported_hostname,
245+
raw=True,
246+
)
247+
177248
payload = self._build_event_payload(q, result)
178249
raw_event = json.dumps(payload, default=default_json_event_encoding)
179250
self._log.debug(
@@ -183,8 +254,18 @@ def run_job(self):
183254
result['row_count'],
184255
)
185256
self._check.event_platform_event(raw_event, EVENT_TRACK_TYPE)
186-
except Exception:
257+
except Exception as e:
187258
self._log.exception(
188259
"Failed to emit metrics/event for monitor_id=%d",
189260
q.monitor_id,
190261
)
262+
try:
263+
self._check.count(
264+
'dd.postgres.data_observability.emit_failures',
265+
1,
266+
tags=tags + [f'exc_class:{type(e).__name__}'],
267+
hostname=self._check.reported_hostname,
268+
raw=True,
269+
)
270+
except Exception:
271+
pass

postgres/tests/test_config.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,3 +610,99 @@ def test_autodiscovery_exclude_none_does_not_error(mock_check):
610610
config, result = build_config(check=mock_check)
611611
assert result.valid
612612
assert config.dbname == 'main'
613+
614+
615+
# ---------------------------------------------------------------------------
616+
# Data Observability — schedule field config tests
617+
# ---------------------------------------------------------------------------
618+
619+
620+
def test_do_query_schedule_field_defaults_to_none(mock_check, minimal_instance):
621+
"""A DO query without schedule field has schedule=None by default."""
622+
minimal_instance['data_observability'] = {
623+
'enabled': True,
624+
'run_sync': True,
625+
'queries': [
626+
{
627+
'monitor_id': 1,
628+
'dbname': 'mydb',
629+
'query': 'SELECT 1',
630+
'interval_seconds': 60,
631+
'entity': {
632+
'platform': 'aws',
633+
'account': '123',
634+
'database': 'mydb',
635+
'schema': 'public',
636+
'table': 'foo',
637+
},
638+
}
639+
],
640+
}
641+
mock_check.instance = minimal_instance
642+
mock_check.init_config = {}
643+
config, result = build_config(check=mock_check)
644+
assert result.valid
645+
query = config.data_observability.queries[0]
646+
assert query.schedule is None
647+
assert query.interval_seconds == 60
648+
649+
650+
def test_do_query_schedule_field_parsed(mock_check, minimal_instance):
651+
"""A DO query with schedule field is parsed correctly; interval_seconds may be absent."""
652+
minimal_instance['data_observability'] = {
653+
'enabled': True,
654+
'run_sync': True,
655+
'queries': [
656+
{
657+
'monitor_id': 2,
658+
'dbname': 'mydb',
659+
'query': 'SELECT 1',
660+
'schedule': '20 * * * *',
661+
'entity': {
662+
'platform': 'aws',
663+
'account': '123',
664+
'database': 'mydb',
665+
'schema': 'public',
666+
'table': 'bar',
667+
},
668+
}
669+
],
670+
}
671+
mock_check.instance = minimal_instance
672+
mock_check.init_config = {}
673+
config, result = build_config(check=mock_check)
674+
assert result.valid
675+
query = config.data_observability.queries[0]
676+
assert query.schedule == '20 * * * *'
677+
assert query.interval_seconds is None
678+
679+
680+
def test_do_query_both_schedule_and_interval_parsed(mock_check, minimal_instance):
681+
"""A DO query with both schedule and interval_seconds is accepted; both fields present."""
682+
minimal_instance['data_observability'] = {
683+
'enabled': True,
684+
'run_sync': True,
685+
'queries': [
686+
{
687+
'monitor_id': 3,
688+
'dbname': 'mydb',
689+
'query': 'SELECT 1',
690+
'schedule': '0 * * * *',
691+
'interval_seconds': 3600,
692+
'entity': {
693+
'platform': 'aws',
694+
'account': '123',
695+
'database': 'mydb',
696+
'schema': 'public',
697+
'table': 'baz',
698+
},
699+
}
700+
],
701+
}
702+
mock_check.instance = minimal_instance
703+
mock_check.init_config = {}
704+
config, result = build_config(check=mock_check)
705+
assert result.valid
706+
query = config.data_observability.queries[0]
707+
assert query.schedule == '0 * * * *'
708+
assert query.interval_seconds == 3600

0 commit comments

Comments
 (0)