Skip to content

Commit 55a989b

Browse files
mobuchowskiclaude
andauthored
postgres: data observability queries respect rc-passed transaction timeout (DataDog#23896)
* data observability postgres: respect rc-passed transaction timeout Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * timeout, yaml parse Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * remove unrelated tests Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * postgres/do: rename timeout_seconds to query_timeout, fix SET LOCAL binding Rename `timeout_seconds` → `query_timeout` in the DO query spec to align with the existing `query_timeout` instance config. Replace `cursor.execute("SET LOCAL statement_timeout = %s", ...)` with `cursor.execute("SELECT set_config('statement_timeout', %s, true)", ...)` to avoid psycopg3 server-side binding rejection: psycopg3 uses the extended query protocol for parameterized execute() calls, and PostgreSQL rejects bound parameters in SET statements under that protocol. set_config() accepts parameters normally; is_local=true gives identical scope to SET LOCAL. Also remove the now-unnecessary _loaded_config_id tracking: the whole check is rescheduled when the RC payload changes, so _last_execution is always fresh on a new instance. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * postgres/do: fix test execute_side_effect filter for set_config The test was filtering on 'SET LOCAL' to skip the timeout-setting execute, but after replacing SET LOCAL with set_config() the filter no longer matched, causing query_count to increment on the set_config call and shifting description assignments off by one. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> * query_timeout in milliseconds Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> --------- Signed-off-by: mobuchowski <maciej.obuchowski@datadoghq.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent b593bed commit 55a989b

6 files changed

Lines changed: 206 additions & 16 deletions

File tree

postgres/assets/configuration/spec.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,7 @@ files:
643643
- monitor_id
644644
- dbname
645645
- query
646+
- query_timeout
646647
- entity
647648
properties:
648649
- name: monitor_id
@@ -665,6 +666,11 @@ files:
665666
schedule wins and interval_seconds is ignored. If neither is set, the
666667
query is skipped at runtime with a warning.
667668
type: string
669+
- name: query_timeout
670+
description: |
671+
Statement timeout for this query in milliseconds. Overrides the instance-level
672+
query_timeout for this query only.
673+
type: integer
668674
- name: entity
669675
type: object
670676
required:

postgres/changelog.d/23896.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Honor the per-query ``query_timeout`` for Data Observability queries.

postgres/datadog_checks/postgres/config_models/instance.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ class Query(BaseModel):
178178
)
179179
monitor_id: int
180180
query: str
181+
query_timeout: int = Field(
182+
...,
183+
description='Statement timeout for this query in milliseconds. Overrides the instance-level\nquery_timeout for this query only.\n',
184+
)
181185
schedule: Optional[str] = Field(
182186
None,
183187
description='A standard 5-field cron expression (minute hour dom month dow) specifying\nwhen to run this query. When both schedule and interval_seconds are set,\nschedule wins and interval_seconds is ignored. If neither is set, the\nquery is skipped at runtime with a warning.\n',

postgres/datadog_checks/postgres/data_observability.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
# catch-up.
2929
CRON_STARTUP_LOOKBACK_SECONDS = 300
3030

31+
# Fallback per-query statement timeout.
32+
DEFAULT_DO_QUERY_TIMEOUT_S = 60
33+
3134
Mode = Literal["cron", "interval"]
3235

3336

@@ -42,7 +45,7 @@ class PostgresDataObservability(DBMAsyncJob):
4245
def __init__(self, check: PostgreSql, config: InstanceConfig):
4346
self._check = check
4447
self._config = config
45-
self._last_execution: dict[int, float] = {} # interval mode: last fire timestamp
48+
self._last_execution: dict[int, float] = {}
4649
collection_interval = config.data_observability.collection_interval or 10
4750
super(PostgresDataObservability, self).__init__(
4851
check,
@@ -128,18 +131,34 @@ def _execute_single_query(self, conn: Any, query_spec: Query) -> dict[str, Any]:
128131
try:
129132
if self._cancel_event.is_set():
130133
raise Exception("Job loop cancelled. Aborting query.")
131-
with conn.cursor() as cursor:
132-
cursor.execute(query_spec.query)
133-
# cursor.description is None when the query produced no result set
134-
# (e.g. INSERT, UPDATE, DELETE, or a syntax error that executed without
135-
# raising). RC-delivered queries must be SELECTs; treat this as a
136-
# per-query error so subsequent queries in the list still run.
137-
if cursor.description is None:
138-
raise psycopg.errors.ProgrammingError(
139-
"Query returned no result set — only SELECT statements are supported"
134+
# query_timeout is in milliseconds, matching the instance-level query_timeout unit.
135+
timeout_ms = query_spec.query_timeout
136+
# Pool connections run with autocommit=True, so the timeout must be
137+
# applied inside an explicit transaction and reverts on commit,
138+
# avoiding timeout leakage onto the shared connection.
139+
# set_config() is used instead of "SET LOCAL statement_timeout = %s"
140+
# because psycopg3 uses server-side binding (extended query protocol)
141+
# for parameterized execute() calls, and PostgreSQL rejects bound
142+
# parameters in SET statements under the extended protocol. set_config()
143+
# is a regular function that accepts parameters normally; is_local=true
144+
# gives the same scope as SET LOCAL (current transaction only).
145+
with conn.transaction():
146+
with conn.cursor() as cursor:
147+
cursor.execute(
148+
"SELECT set_config('statement_timeout', %s, true)",
149+
(str(int(timeout_ms)),),
140150
)
141-
columns = [desc[0] for desc in cursor.description]
142-
rows = [list(row) for row in cursor.fetchmany(MAX_RESULT_ROWS)]
151+
cursor.execute(query_spec.query)
152+
# cursor.description is None when the query produced no result set
153+
# (e.g. INSERT, UPDATE, DELETE, or a syntax error that executed without
154+
# raising). RC-delivered queries must be SELECTs; treat this as a
155+
# per-query error so subsequent queries in the list still run.
156+
if cursor.description is None:
157+
raise psycopg.errors.ProgrammingError(
158+
"Query returned no result set — only SELECT statements are supported"
159+
)
160+
columns = [desc[0] for desc in cursor.description]
161+
rows = [list(row) for row in cursor.fetchmany(MAX_RESULT_ROWS)]
143162
duration = time.time() - start
144163
return {
145164
'status': 'success',

postgres/tests/test_config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,7 @@ def test_do_query_schedule_field_defaults_to_none(mock_check, minimal_instance):
628628
'dbname': 'mydb',
629629
'query': 'SELECT 1',
630630
'interval_seconds': 60,
631+
'query_timeout': 30_000,
631632
'entity': {
632633
'platform': 'aws',
633634
'account': '123',
@@ -658,6 +659,7 @@ def test_do_query_schedule_field_parsed(mock_check, minimal_instance):
658659
'dbname': 'mydb',
659660
'query': 'SELECT 1',
660661
'schedule': '20 * * * *',
662+
'query_timeout': 30_000,
661663
'entity': {
662664
'platform': 'aws',
663665
'account': '123',
@@ -689,6 +691,7 @@ def test_do_query_both_schedule_and_interval_parsed(mock_check, minimal_instance
689691
'query': 'SELECT 1',
690692
'schedule': '0 * * * *',
691693
'interval_seconds': 3600,
694+
'query_timeout': 30_000,
692695
'entity': {
693696
'platform': 'aws',
694697
'account': '123',

postgres/tests/test_data_observability.py

Lines changed: 161 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import psycopg
1414
import pytest
15+
import yaml
1516

1617
from datadog_checks.postgres import PostgreSql
1718
from datadog_checks.postgres.data_observability import EVENT_TRACK_TYPE
@@ -26,6 +27,7 @@
2627
'dbname': 'test_db',
2728
'query': 'SELECT count(*) FROM orders',
2829
'interval_seconds': 60,
30+
'query_timeout': 30_000,
2931
'type': 'freshness',
3032
'entity': {
3133
'platform': 'aws',
@@ -43,6 +45,7 @@
4345
'dbname': 'test_db',
4446
'query': 'SELECT count(*) FROM users',
4547
'interval_seconds': 120,
48+
'query_timeout': 30_000,
4649
'type': 'freshness',
4750
'entity': {
4851
'platform': 'aws',
@@ -279,15 +282,47 @@ def execute_side_effect(sql, *args, **kwargs):
279282
assert len(status_metrics) == 2
280283

281284

285+
def _get_local_timeout_ms(mock_cursor):
286+
"""Return the statement_timeout (ms) passed to the set_config execute, or None."""
287+
for call in mock_cursor.execute.call_args_list:
288+
sql = call.args[0]
289+
if "set_config('statement_timeout'" in sql.lower():
290+
return int(call.args[1][0])
291+
return None
292+
293+
294+
def test_query_timeout_applied_in_transaction(pg_instance):
295+
"""The query's query_timeout is applied via set_config inside a transaction."""
296+
mock_conn, mock_cursor = _make_mock_conn()
297+
298+
_setup_and_run(pg_instance, mock_conn=mock_conn, mock_cursor=mock_cursor)
299+
300+
mock_conn.transaction.assert_called_once()
301+
assert _get_local_timeout_ms(mock_cursor) == 30_000
302+
303+
304+
def test_query_timeout_passed_directly_to_set_config(pg_instance):
305+
"""query_timeout (milliseconds) is forwarded as-is to SET LOCAL statement_timeout."""
306+
query = deepcopy(BASE_QUERY)
307+
query['query_timeout'] = 180_000
308+
mock_conn, mock_cursor = _make_mock_conn()
309+
310+
_setup_and_run(pg_instance, queries=[query], mock_conn=mock_conn, mock_cursor=mock_cursor)
311+
312+
assert _get_local_timeout_ms(mock_cursor) == 180_000
313+
314+
282315
def test_no_description_does_not_block_subsequent(aggregator, pg_instance):
283316
"""First query returns None description (non-SELECT), second query still runs."""
284317
mock_conn, mock_cursor = _make_mock_conn()
285-
call_count = 0
318+
query_count = 0
286319

287320
def execute_side_effect(sql, *args, **kwargs):
288-
nonlocal call_count
289-
call_count += 1
290-
mock_cursor.description = None if call_count == 1 else [('count',)]
321+
nonlocal query_count
322+
if "set_config('statement_timeout'" in sql.lower():
323+
return
324+
query_count += 1
325+
mock_cursor.description = None if query_count == 1 else [('count',)]
291326

292327
mock_cursor.execute = MagicMock(side_effect=execute_side_effect)
293328

@@ -513,6 +548,8 @@ def test_multi_query_different_dbnames(aggregator, pg_instance):
513548
'dbname': 'test_db',
514549
'query': 'SELECT 1',
515550
'schedule': '50 * * * *', # every hour at :50
551+
'interval_seconds': 3600,
552+
'query_timeout': 30_000,
516553
'type': 'freshness',
517554
'entity': {
518555
'platform': 'aws',
@@ -670,6 +707,7 @@ def test_query_without_schedule_or_positive_interval_filtered_at_init(pg_instanc
670707
'monitor_id': 30,
671708
'dbname': 'test_db',
672709
'query': 'SELECT 1',
710+
'query_timeout': 30_000,
673711
'type': 'freshness',
674712
'entity': {
675713
'platform': 'aws',
@@ -986,3 +1024,122 @@ def boom(*args, **kwargs):
9861024
assert len(failures) == 1
9871025
assert any(t.startswith('exc_class:JSONDecodeError') for t in failures[0].tags)
9881026
assert 'monitor_id:1' in failures[0].tags
1027+
1028+
1029+
# --- Agent YAML-delivery round-trip tests ---
1030+
#
1031+
# The DO queries originate from a Remote Configuration payload handled by the Datadog Agent's
1032+
# Go RC handler (comp/dataobs/queryactions/impl/handler.go). The agent injects them into the
1033+
# postgres instance config, serializes the instance to YAML, and hands that YAML *string* to
1034+
# this check; datadog_checks.base parses it with yaml.safe_load before the dict ever reaches
1035+
# PostgreSql.__init__.
1036+
#
1037+
# DO query strings are multi-line SQL that routinely mixes indented lines with a trailing
1038+
# column-0 "-- Datadog {...}" annotation. yaml.v3 used to serialize such a string as a literal
1039+
# block scalar ("|") whose later, less-indented line escaped the block and produced YAML that
1040+
# neither go-yaml nor PyYAML can parse (a "did not find expected key" / ParserError). The agent
1041+
# now forces a double-quoted scalar for the query so the round-trip is exact. The tests above
1042+
# all inject a Python dict directly and therefore never exercise this serialize→safe_load
1043+
# boundary; these tests close that gap from the consumer side.
1044+
1045+
# Indented SELECT lines followed by a column-0 "-- Datadog" comment — the canonical failing shape.
1046+
MULTILINE_QUERY = (
1047+
" SELECT count(*) AS dd_value\n"
1048+
" FROM events.clicks c\n"
1049+
" LEFT JOIN events.page_views pv\n"
1050+
" ON c.user_id = pv.user_id AND c.page_url = pv.url\n"
1051+
" WHERE pv.id IS NULL\n"
1052+
'-- Datadog {"monitor_ids":[26724188]}\n'
1053+
)
1054+
1055+
1056+
def _instance_yaml_as_agent_delivers(pg_instance, query):
1057+
"""Render the DO instance to a YAML string the way the agent delivers it to the check.
1058+
1059+
The query is emitted as a double-quoted scalar (matching the agent's yaml.Node fix); the
1060+
rest of the instance is dumped normally. Returns the YAML text, which mirrors exactly what
1061+
datadog_checks.base feeds to yaml.safe_load.
1062+
"""
1063+
instance = _make_do_instance(pg_instance, queries=[{**deepcopy(BASE_QUERY), 'query': query}])
1064+
do = instance.pop('data_observability')
1065+
queries_block = do.pop('queries')
1066+
q = queries_block[0]
1067+
# Build the query entry by hand so the SQL is a double-quoted scalar, as the agent emits it.
1068+
# json.dumps produces a valid YAML double-quoted flow scalar for these characters.
1069+
query_entry_lines = [f" - query: {json.dumps(query)}"]
1070+
for key, value in q.items():
1071+
if key == 'query':
1072+
continue
1073+
query_entry_lines.append(f" {key}: {json.dumps(value)}")
1074+
do_yaml = ["data_observability:"]
1075+
for key, value in do.items():
1076+
do_yaml.append(f" {key}: {json.dumps(value)}")
1077+
do_yaml.append(" queries:")
1078+
do_yaml.extend(query_entry_lines)
1079+
return yaml.safe_dump(instance, default_flow_style=False) + "\n".join(do_yaml) + "\n"
1080+
1081+
1082+
@pytest.mark.parametrize(
1083+
'query',
1084+
[
1085+
pytest.param(MULTILINE_QUERY, id='indented_then_col0_comment'),
1086+
pytest.param('SELECT 23 as dd_value;\n-- Datadog {"monitor_ids":[26386160]}\n', id='trailing_comment'),
1087+
pytest.param(
1088+
'SELECT COUNT(1) AS dd_a_1, COUNT(DISTINCT "customer_id") AS dd_b_2 FROM "testdb"."shop"."orders"\n'
1089+
'-- Datadog {"monitor_ids":[26358412,26386112]}\n',
1090+
id='embedded_quotes',
1091+
),
1092+
pytest.param('SELECT 1', id='simple_single_line'),
1093+
],
1094+
)
1095+
def test_agent_yaml_delivery_round_trips_query(pg_instance, query):
1096+
"""The query survives the agent's YAML serialization + safe_load round-trip byte-for-byte
1097+
and reaches cursor.execute unchanged. This is the consumer-side guard for the yaml.v3
1098+
block-scalar bug fixed in the agent (handler.go)."""
1099+
instance_yaml = _instance_yaml_as_agent_delivers(pg_instance, query)
1100+
1101+
# The agent → Python boundary: base.load_config feeds this YAML string to yaml.safe_load.
1102+
parsed = yaml.safe_load(instance_yaml)
1103+
assert parsed['data_observability']['queries'][0]['query'] == query, "query must survive safe_load intact"
1104+
1105+
mock_conn, mock_cursor = _make_mock_conn()
1106+
check = PostgreSql('postgres', {}, [parsed])
1107+
check.db_pool = _mock_db_pool(mock_conn)
1108+
check.data_observability.run_job()
1109+
1110+
# The exact SQL string (not the set_config timeout call) must reach the driver.
1111+
executed = [call.args[0] for call in mock_cursor.execute.call_args_list]
1112+
assert query in executed, f"original query string must be executed verbatim; got {executed!r}"
1113+
1114+
1115+
# The YAML the agent emitted for MULTILINE_QUERY *before* the handler.go fix: a literal block
1116+
# scalar ("|4") whose trailing "-- Datadog" line is indented 12 spaces while the block content
1117+
# sits at 14. Being less-indented than the block, that line terminates the scalar and is then
1118+
# read as a sibling node at indent 12 — deeper than the mapping keys at 10 — and the ":" inside
1119+
# it makes the parser expect a key. yaml.v3 emitted this without error; both go-yaml and PyYAML
1120+
# fail to parse it. In production this never reached the check: the agent's autodiscovery digest
1121+
# re-parsed the YAML first, failed, and dropped the config before scheduling.
1122+
BROKEN_AGENT_YAML = (
1123+
"data_observability:\n"
1124+
" enabled: true\n"
1125+
" queries:\n"
1126+
" - dbname: analyticsdb\n"
1127+
" interval_seconds: 3600\n"
1128+
" query: |4\n"
1129+
" SELECT count(*) AS dd_value\n"
1130+
" FROM events.clicks c\n"
1131+
" LEFT JOIN events.page_views pv\n"
1132+
" ON c.user_id = pv.user_id AND c.page_url = pv.url\n"
1133+
" WHERE pv.id IS NULL\n"
1134+
' -- Datadog {"monitor_ids":[26724188]}\n'
1135+
" query_timeout: 300\n"
1136+
" type: run_query\n"
1137+
)
1138+
1139+
1140+
def test_pre_fix_agent_yaml_was_unparseable():
1141+
"""Documents the cross-language bug. The agent's old literal-block output fails yaml.safe_load
1142+
with the same parse error go-yaml hit. The agent now emits a double-quoted scalar (handler.go),
1143+
so this shape is no longer produced; test_agent_yaml_delivery_round_trips_query covers the fix."""
1144+
with pytest.raises(yaml.YAMLError):
1145+
yaml.safe_load(BROKEN_AGENT_YAML)

0 commit comments

Comments
 (0)