Skip to content

Commit bf7ac65

Browse files
pierreln-ddclaude
andauthored
Send each logical database as its own independent schema snapshot (DataDog#23913)
* Send each logical database as its own independent schema snapshot Each database now gets a fresh collection_started_at timestamp and its own terminal payload with collection_payloads_count, so the backend can independently complete or discard each database's snapshot. Errors on one database are isolated: partial rows are discarded and collection continues for the remaining databases. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Add changelog entry for PR DataDog#23913 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix metric bookkeeping for mid-iteration database failures When a database fails after already flushing one or more chunks, two counters were wrong: - _total_rows_count included unsent buffered rows that were about to be discarded, inflating the tables_count gauge - total_payloads_count excluded chunk payloads already emitted for the failed database, undercounting the payloads_count gauge Fix by computing unsent row count before the delete and subtracting it, and adding _collection_payloads_count to total_payloads_count in the error path as well as the success path. Add test for mid-iteration failure after a chunk flush. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Simplify per-DB state reset using per-DB queued_rows initialization Reset _queued_rows per database instead of tracking a slice offset. On error, discard by reassignment rather than del slicing. Use a local db_rows_count so _total_rows_count only reflects completed databases, not partially-failed ones. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Add row count to per-database collection debug log Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Add payload count to per-database collection debug log Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix changelog entry type: changed -> fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Move completion debug log after terminal flush so payload count is accurate Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Include database info in terminal payload for empty databases When a logical database has no rows (or all rows were already flushed by chunk-size), the terminal payload would have metadata: [] with no DatabaseName for the backend to apply per-database snapshot completion/deletion to. For empty databases, append the database-level dict as a sentinel row so the backend receives a DatabaseName even when no schema rows were collected. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Remove empty-database sentinel from per-DB schema collection Drop the sentinel row that was injected into terminal payloads for empty or fully-chunk-flushed databases. Per Seth's review, this is a separate concern and should be addressed in a follow-up PR to keep revert scope minimal. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Guard against missing 'name' key in database entries from _get_databases() `database['name']` raised KeyError if a subclass returned a malformed entry, propagating to the outer handler and aborting the entire run. Switch to `database.get('name')` and reorder so the guard check fires before the debug log. Add a test covering the per-DB skip path. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Include DB identity sentinel in terminal payload for empty databases When a logical database has no rows, the terminal flush emits metadata=[] with no DatabaseName. The backend cannot attribute snapshot completion/deletion to that DB, so stale schemas survive. Append a {**database} sentinel entry before the terminal flush when db_rows_count==0. This ensures the backend calls snapshotStore.Add with a DatabaseName and can remove stale rows on completion. Update the test to assert the sentinel name field rather than empty metadata, per Ilia's review request. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix ClickHouse schema collection compatibility with empty-DB sentinel ClickhouseSchemaCollector uses a single cluster-wide stub entry rather than iterating real database names. Without this fix, the base-class sentinel appends the stub name ('_cluster_') to empty terminal payloads, sending a fake database identifier to the backend and breaking the test_collect_emits_empty_snapshot_marker_when_no_tables assertion. Add _emit_empty_db_sentinel class attribute (default True) to SchemaCollector so subclasses using stub entries can opt out, and set it to False in ClickhouseSchemaCollector. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Revert empty-DB sentinel from SchemaCollector The sentinel approach requires touching every integration that uses a stub database entry (e.g. ClickHouse), making it unsuitable for this PR. Deferred to a follow-up with proper cross-integration design. Restores metadata=[] for empty-database terminal payloads. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix ClickHouse test_main_query_failure_closes_client for per-DB error isolation The base SchemaCollector now catches per-DB errors and continues (matching the production behavior where run_job already catches all exceptions from collect_schemas). Update the test to verify client cleanup without asserting exception propagation, which was never a code-level contract. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 6b0eda4 commit bf7ac65

4 files changed

Lines changed: 164 additions & 30 deletions

File tree

clickhouse/tests/test_metadata.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -571,13 +571,14 @@ def test_max_execution_time_set_on_client(collector):
571571

572572

573573
def test_main_query_failure_closes_client(collector):
574+
# The base class catches per-DB errors and continues; the caller (run_job) catches
575+
# any remaining exception. What matters is that the client is properly closed.
574576
mock_client = mock.MagicMock()
575577
mock_client.query_rows_stream.return_value.__enter__.side_effect = Exception("main query failed")
576578

577579
_capture_payloads(collector._check)
578580
with mock.patch.object(collector._check, 'create_dbm_client', return_value=mock_client):
579-
with pytest.raises(Exception, match="main query failed"):
580-
collector.collect_schemas()
581+
collector.collect_schemas()
581582

582583
mock_client.close.assert_called_once()
583584
assert collector._db_client is None
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Send each logical database as its own independent schema snapshot, so an error or partial collection for one database does not affect others.

datadog_checks_base/datadog_checks/base/utils/db/schemas.py

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -63,37 +63,54 @@ def collect_schemas(self) -> bool:
6363
This class relies on the owning check to handle scheduling this method.
6464
"""
6565
status = "success"
66+
run_started_at = now_ms()
67+
total_payloads_count = 0
6668
try:
67-
self._collection_started_at = now_ms()
6869
databases = self._get_databases()
6970
self._log.debug("Collecting schemas for %d databases", len(databases))
7071
for database in databases:
71-
self._log.debug("Starting collection of schemas for database %s", database['name'])
72-
database_name = database['name']
72+
database_name = database.get('name')
7373
if not database_name:
74-
self._log.warning("database has no name %v", database)
74+
self._log.warning("database has no name: %s", database)
7575
continue
76-
with self._get_cursor(database_name) as cursor:
77-
# Get the next row from the cursor
78-
next_row = self._get_next(cursor)
79-
while next_row:
80-
self._queued_rows.append(self._map_row(database, next_row))
81-
self._total_rows_count += 1
82-
# Because we're iterating over a cursor we need to try to get
83-
# the next row to see if we've reached the last row
76+
self._log.debug("Starting collection of schemas for database %s", database_name)
77+
self._collection_started_at = now_ms()
78+
self._collection_payloads_count = 0
79+
self._queued_rows = []
80+
db_rows_count = 0
81+
try:
82+
with self._get_cursor(database_name) as cursor:
83+
# Get the next row from the cursor
8484
next_row = self._get_next(cursor)
85-
self.maybe_flush(is_last_payload=False)
86-
is_last_payload = database == databases[-1]
87-
self.maybe_flush(is_last_payload)
88-
self._log.debug("Completed collection of schemas for database %s", database_name)
85+
while next_row:
86+
self._queued_rows.append(self._map_row(database, next_row))
87+
db_rows_count += 1
88+
# Because we're iterating over a cursor we need to try to get
89+
# the next row to see if we've reached the last row
90+
next_row = self._get_next(cursor)
91+
self.maybe_flush(is_last_payload=False)
92+
self.maybe_flush(is_last_payload=True)
93+
self._total_rows_count += db_rows_count
94+
total_payloads_count += self._collection_payloads_count
95+
self._log.debug(
96+
"Completed collection of schemas for database %s (%d rows, %d payloads)",
97+
database_name,
98+
db_rows_count,
99+
self._collection_payloads_count,
100+
)
101+
except Exception as e:
102+
self._queued_rows = []
103+
total_payloads_count += self._collection_payloads_count
104+
self._log.warning("Skipping database %s due to error: %s", database_name, e, exc_info=True)
105+
continue
89106
except Exception as e:
90107
status = "error"
91108
self._log.error("Error collecting schema: %s", e)
92109
raise e
93110
finally:
94111
self._check.histogram(
95112
f"dd.{self._check.dbms}.schema.time",
96-
now_ms() - self._collection_started_at,
113+
now_ms() - run_started_at,
97114
tags=self._check.tags + ["status:" + status],
98115
hostname=self._check.reported_hostname,
99116
raw=True,
@@ -107,12 +124,11 @@ def collect_schemas(self) -> bool:
107124
)
108125
self._check.gauge(
109126
f"dd.{self._check.dbms}.schema.payloads_count",
110-
self._collection_payloads_count,
127+
total_payloads_count,
111128
tags=self._check.tags + ["status:" + status],
112129
hostname=self._check.reported_hostname,
113130
raw=True,
114131
)
115-
116132
self._reset()
117133
return True
118134

datadog_checks_base/tests/base/utils/db/test_schemas.py

Lines changed: 125 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,54 @@ def _get_next(self, _cursor):
111111
return None
112112

113113

114+
class TestSchemaCollectorWithInaccessibleDb(TestSchemaCollector):
115+
"""Simulates multiple databases where one raises an error on cursor open."""
116+
117+
__test__ = False
118+
119+
def __init__(self, check, config):
120+
super().__init__(check, config)
121+
self._current_rows = []
122+
123+
def _get_databases(self):
124+
return [{'name': 'db_accessible'}, {'name': 'db_inaccessible'}, {'name': 'db_also_accessible'}]
125+
126+
@contextmanager
127+
def _get_cursor(self, database: str):
128+
if database == 'db_inaccessible':
129+
raise RuntimeError("Cannot open database version 852")
130+
self._current_rows = [{'table_name': 'users'}]
131+
self._row_index = 0
132+
yield {}
133+
134+
def _get_next(self, _cursor):
135+
if self._row_index < len(self._current_rows):
136+
row = self._current_rows[self._row_index]
137+
self._row_index += 1
138+
return row
139+
return None
140+
141+
142+
class TestSchemaCollectorMidIterationFailure(TestSchemaCollector):
143+
"""Simulates a database that fails mid-iteration after at least one chunk has been flushed."""
144+
145+
__test__ = False
146+
147+
def _get_databases(self):
148+
return [{'name': 'db_partial_fail'}, {'name': 'db_ok'}]
149+
150+
@contextmanager
151+
def _get_cursor(self, database: str):
152+
self._rows = [{'table_name': 'first'}, {'table_name': 'second'}]
153+
self._row_index = 0
154+
yield {}
155+
156+
def _map_row(self, database, cursor_row):
157+
if database['name'] == 'db_partial_fail' and cursor_row.get('table_name') == 'second':
158+
raise RuntimeError("mid-iteration error after first chunk flushed")
159+
return super()._map_row(database, cursor_row)
160+
161+
114162
@pytest.mark.unit
115163
def test_schema_collector(aggregator):
116164
check = TestDatabaseCheck()
@@ -134,20 +182,21 @@ def test_schema_collector(aggregator):
134182

135183
@pytest.mark.unit
136184
def test_schema_collector_empty_last_database(aggregator):
137-
"""Verify that queued rows are flushed even when the last database returns 0 rows."""
185+
"""Verify that each database sends its own terminal payload, even when it returns 0 rows."""
138186
check = TestDatabaseCheck()
139187
collector = TestSchemaCollectorEmptyLastDb(check, SchemaCollectorConfig())
140188
collector.collect_schemas()
141189

142190
events = aggregator.get_event_platform_events("dbm-metadata")
143-
assert len(events) == 1, "Expected 1 payload but got {}".format(len(events))
144-
event = events[0]
145-
assert len(event['metadata']) == 2
146-
assert event['metadata'][0]['name'] == 'db_with_tables'
147-
assert event['metadata'][0]['tables'][0]['table_name'] == 'users'
148-
assert event['metadata'][1]['name'] == 'db_with_tables'
149-
assert event['metadata'][1]['tables'][0]['table_name'] == 'orders'
150-
assert event['collection_payloads_count'] == 1
191+
assert len(events) == 2, "Expected 2 payloads (one per database) but got {}".format(len(events))
192+
assert len(events[0]['metadata']) == 2
193+
assert events[0]['metadata'][0]['name'] == 'db_with_tables'
194+
assert events[0]['metadata'][0]['tables'][0]['table_name'] == 'users'
195+
assert events[0]['metadata'][1]['name'] == 'db_with_tables'
196+
assert events[0]['metadata'][1]['tables'][0]['table_name'] == 'orders'
197+
assert events[0]['collection_payloads_count'] == 1
198+
assert len(events[1]['metadata']) == 0
199+
assert events[1]['collection_payloads_count'] == 1
151200

152201

153202
@pytest.mark.unit
@@ -164,3 +213,70 @@ def test_schema_collector_chunk_size_flush(aggregator):
164213
assert len(events) == 2
165214
assert 'collection_payloads_count' not in events[0]
166215
assert events[-1]['collection_payloads_count'] == len(events)
216+
217+
218+
@pytest.mark.unit
219+
def test_schema_collector_mid_iteration_failure(aggregator):
220+
"""Verify chunk payloads sent before a mid-iteration failure are counted correctly."""
221+
check = TestDatabaseCheck()
222+
config = SchemaCollectorConfig()
223+
config.payload_chunk_size = 1
224+
collector = TestSchemaCollectorMidIterationFailure(check, config)
225+
result = collector.collect_schemas()
226+
227+
assert result is True
228+
events = aggregator.get_event_platform_events("dbm-metadata")
229+
# db_partial_fail: 1 orphaned chunk (no terminal), db_ok: 2 chunks + 1 terminal
230+
assert len(events) == 4
231+
232+
partial_fail_events = [e for e in events if any(r.get('name') == 'db_partial_fail' for r in e['metadata'])]
233+
ok_events = [e for e in events if any(r.get('name') == 'db_ok' for r in e['metadata'])]
234+
terminal_events = [e for e in events if 'collection_payloads_count' in e]
235+
236+
assert len(partial_fail_events) == 1
237+
assert 'collection_payloads_count' not in partial_fail_events[0]
238+
assert len(ok_events) == 2
239+
assert len(terminal_events) == 1
240+
assert terminal_events[0]['collection_payloads_count'] == 3
241+
242+
243+
@pytest.mark.unit
244+
def test_schema_collector_skips_inaccessible_database(aggregator):
245+
"""An inaccessible database is skipped and collection continues for the remaining databases."""
246+
check = TestDatabaseCheck()
247+
collector = TestSchemaCollectorWithInaccessibleDb(check, SchemaCollectorConfig())
248+
result = collector.collect_schemas()
249+
250+
assert result is True
251+
events = aggregator.get_event_platform_events("dbm-metadata")
252+
assert len(events) == 2
253+
collected_names = [row['name'] for event in events for row in event['metadata']]
254+
assert 'db_accessible' in collected_names
255+
assert 'db_also_accessible' in collected_names
256+
assert 'db_inaccessible' not in collected_names
257+
258+
259+
@pytest.mark.unit
260+
def test_schema_collector_skips_malformed_database_entry(aggregator):
261+
"""A database entry missing the 'name' key is skipped without aborting the run."""
262+
263+
class MalformedDbCollector(TestSchemaCollector):
264+
__test__ = False
265+
266+
def _get_databases(self):
267+
return [{'name': 'db_good'}, {'no_name_key': 'bad'}, {'name': 'db_also_good'}]
268+
269+
@contextmanager
270+
def _get_cursor(self, database: str):
271+
self._row_index = 0
272+
yield {}
273+
274+
check = TestDatabaseCheck()
275+
collector = MalformedDbCollector(check, SchemaCollectorConfig())
276+
result = collector.collect_schemas()
277+
278+
assert result is True
279+
events = aggregator.get_event_platform_events("dbm-metadata")
280+
collected_names = [row['name'] for event in events for row in event['metadata']]
281+
assert 'db_good' in collected_names
282+
assert 'db_also_good' in collected_names

0 commit comments

Comments
 (0)