From 6b0eda4e9763f94e4d655adaebad51df1f20007d Mon Sep 17 00:00:00 2001 From: Lucia Date: Fri, 5 Jun 2026 12:23:48 +0200 Subject: [PATCH 1/2] Make CI action bumps easier to validate (#23890) * Run release-hash-check on its own workflow changes Add the workflow file to the pull_request paths filter so action-version bumps (e.g. Renovate updates to tj-actions/changed-files) are exercised on the PR that introduces them. Guard the hash-check script on any_changed so a workflow-only change still runs the action but doesn't fail with no link file to validate. Co-Authored-By: Claude Opus 4.8 * Remove redundant comments Co-Authored-By: Claude Opus 4.8 * Add dry-run mode to skip the test-results badge commit Add a dry_run input to test-results-master.yml that runs the badge step but skips committing to the badges branch, and expose it as a workflow_dispatch input on flaky-tests.yml. This lets action bumps (e.g. Renovate updates to emibcn/badge-action) be validated on demand without moving the public badge. Co-Authored-By: Claude Opus 4.8 * Trim dry_run input descriptions Co-Authored-By: Claude Opus 4.8 * chore(ci): run flaky-tests workflow on test-results-master.yml changes - Add pull_request trigger scoped to .github/workflows/test-results-master.yml - Auto-enable dry_run when triggered by pull request to avoid committing during validation Rationale: Allows self-validation of the workflow when test-results-master.yml is modified, mirroring the pattern already used for release-hash-check This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md) --------- Co-authored-by: Claude Opus 4.8 --- .github/workflows/flaky-tests.yml | 11 +++++++++++ .github/workflows/release-hash-check.yml | 4 +++- .github/workflows/test-results-master.yml | 7 +++++++ 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/.github/workflows/flaky-tests.yml b/.github/workflows/flaky-tests.yml index b3f4331a56b07..253eb156e9824 100644 --- a/.github/workflows/flaky-tests.yml +++ b/.github/workflows/flaky-tests.yml @@ -1,7 +1,16 @@ name: Daily Flaky Tests (every 8 hours) on: + pull_request: + paths: + - .github/workflows/test-results-master.yml workflow_dispatch: + inputs: + dry_run: + description: "Skip committing the test-results badge." + required: false + default: false + type: boolean schedule: # 4 AM, 12 PM, 8 PM UTC - cron: "0 4,12,20 * * *" @@ -46,4 +55,6 @@ jobs: pull-requests: write # Needed for test-results-master uses: ./.github/workflows/test-results-master.yml + with: + dry_run: ${{ github.event_name == 'pull_request' || inputs.dry_run || false }} secrets: inherit diff --git a/.github/workflows/release-hash-check.yml b/.github/workflows/release-hash-check.yml index 6c1d6c4cd91fe..02fe95dde8201 100644 --- a/.github/workflows/release-hash-check.yml +++ b/.github/workflows/release-hash-check.yml @@ -7,6 +7,7 @@ on: - '7.[0-9]+.x' paths: - .in-toto/*.link + - .github/workflows/release-hash-check.yml jobs: build: @@ -29,4 +30,5 @@ jobs: with: files: .in-toto/*.link - - run: python .github/workflows/release-hash-check.py ${{ steps.changed-files.outputs.all_changed_files }} + - if: steps.changed-files.outputs.any_changed == 'true' + run: python .github/workflows/release-hash-check.py ${{ steps.changed-files.outputs.all_changed_files }} diff --git a/.github/workflows/test-results-master.yml b/.github/workflows/test-results-master.yml index 169d781ca46d8..2917e771d79fd 100644 --- a/.github/workflows/test-results-master.yml +++ b/.github/workflows/test-results-master.yml @@ -2,6 +2,12 @@ name: Test results for the master branch on: workflow_call: + inputs: + dry_run: + description: "Run the badge step but skip committing to the badges branch." + required: false + default: false + type: boolean defaults: run: @@ -66,6 +72,7 @@ jobs: git config --local user.email "github-actions[bot]@users.noreply.github.com" - name: Commit if stats have changed + if: ${{ !inputs.dry_run }} run: |- git add "${{ env.BADGE_PATH }}" if git commit -m "Update test results from ${{ github.sha }}"; then From bf7ac65c48e4c120f39912906b7c7147653951ef Mon Sep 17 00:00:00 2001 From: Pierre Le Noan <144696725+pierreln-dd@users.noreply.github.com> Date: Fri, 5 Jun 2026 15:47:45 +0200 Subject: [PATCH 2/2] Send each logical database as its own independent schema snapshot (#23913) * Send each logical database as its own independent schema snapshot Each database now gets a fresh collection_started_at timestamp and its own terminal payload with collection_payloads_count, so the backend can independently complete or discard each database's snapshot. Errors on one database are isolated: partial rows are discarded and collection continues for the remaining databases. Co-Authored-By: Claude Sonnet 4.6 * Add changelog entry for PR #23913 Co-Authored-By: Claude Sonnet 4.6 * Fix metric bookkeeping for mid-iteration database failures When a database fails after already flushing one or more chunks, two counters were wrong: - _total_rows_count included unsent buffered rows that were about to be discarded, inflating the tables_count gauge - total_payloads_count excluded chunk payloads already emitted for the failed database, undercounting the payloads_count gauge Fix by computing unsent row count before the delete and subtracting it, and adding _collection_payloads_count to total_payloads_count in the error path as well as the success path. Add test for mid-iteration failure after a chunk flush. Co-Authored-By: Claude Sonnet 4.6 * Simplify per-DB state reset using per-DB queued_rows initialization Reset _queued_rows per database instead of tracking a slice offset. On error, discard by reassignment rather than del slicing. Use a local db_rows_count so _total_rows_count only reflects completed databases, not partially-failed ones. Co-Authored-By: Claude Sonnet 4.6 * Add row count to per-database collection debug log Co-Authored-By: Claude Sonnet 4.6 * Add payload count to per-database collection debug log Co-Authored-By: Claude Sonnet 4.6 * Fix changelog entry type: changed -> fixed Co-Authored-By: Claude Sonnet 4.6 * Move completion debug log after terminal flush so payload count is accurate Co-Authored-By: Claude Sonnet 4.6 * Include database info in terminal payload for empty databases When a logical database has no rows (or all rows were already flushed by chunk-size), the terminal payload would have metadata: [] with no DatabaseName for the backend to apply per-database snapshot completion/deletion to. For empty databases, append the database-level dict as a sentinel row so the backend receives a DatabaseName even when no schema rows were collected. Co-Authored-By: Claude Sonnet 4.6 * Remove empty-database sentinel from per-DB schema collection Drop the sentinel row that was injected into terminal payloads for empty or fully-chunk-flushed databases. Per Seth's review, this is a separate concern and should be addressed in a follow-up PR to keep revert scope minimal. Co-Authored-By: Claude Sonnet 4.6 * Guard against missing 'name' key in database entries from _get_databases() `database['name']` raised KeyError if a subclass returned a malformed entry, propagating to the outer handler and aborting the entire run. Switch to `database.get('name')` and reorder so the guard check fires before the debug log. Add a test covering the per-DB skip path. Co-Authored-By: Claude Sonnet 4.6 * Include DB identity sentinel in terminal payload for empty databases When a logical database has no rows, the terminal flush emits metadata=[] with no DatabaseName. The backend cannot attribute snapshot completion/deletion to that DB, so stale schemas survive. Append a {**database} sentinel entry before the terminal flush when db_rows_count==0. This ensures the backend calls snapshotStore.Add with a DatabaseName and can remove stale rows on completion. Update the test to assert the sentinel name field rather than empty metadata, per Ilia's review request. Co-Authored-By: Claude Sonnet 4.6 * Fix ClickHouse schema collection compatibility with empty-DB sentinel ClickhouseSchemaCollector uses a single cluster-wide stub entry rather than iterating real database names. Without this fix, the base-class sentinel appends the stub name ('_cluster_') to empty terminal payloads, sending a fake database identifier to the backend and breaking the test_collect_emits_empty_snapshot_marker_when_no_tables assertion. Add _emit_empty_db_sentinel class attribute (default True) to SchemaCollector so subclasses using stub entries can opt out, and set it to False in ClickhouseSchemaCollector. Co-Authored-By: Claude Sonnet 4.6 * Revert empty-DB sentinel from SchemaCollector The sentinel approach requires touching every integration that uses a stub database entry (e.g. ClickHouse), making it unsuitable for this PR. Deferred to a follow-up with proper cross-integration design. Restores metadata=[] for empty-database terminal payloads. Co-Authored-By: Claude Sonnet 4.6 * Fix ClickHouse test_main_query_failure_closes_client for per-DB error isolation The base SchemaCollector now catches per-DB errors and continues (matching the production behavior where run_job already catches all exceptions from collect_schemas). Update the test to verify client cleanup without asserting exception propagation, which was never a code-level contract. Co-Authored-By: Claude Sonnet 4.6 --------- Co-authored-by: Claude Sonnet 4.6 --- clickhouse/tests/test_metadata.py | 5 +- datadog_checks_base/changelog.d/23913.fixed | 1 + .../datadog_checks/base/utils/db/schemas.py | 54 ++++--- .../tests/base/utils/db/test_schemas.py | 134 ++++++++++++++++-- 4 files changed, 164 insertions(+), 30 deletions(-) create mode 100644 datadog_checks_base/changelog.d/23913.fixed diff --git a/clickhouse/tests/test_metadata.py b/clickhouse/tests/test_metadata.py index 1e05dd37997be..fd954de239d50 100644 --- a/clickhouse/tests/test_metadata.py +++ b/clickhouse/tests/test_metadata.py @@ -571,13 +571,14 @@ def test_max_execution_time_set_on_client(collector): def test_main_query_failure_closes_client(collector): + # The base class catches per-DB errors and continues; the caller (run_job) catches + # any remaining exception. What matters is that the client is properly closed. mock_client = mock.MagicMock() mock_client.query_rows_stream.return_value.__enter__.side_effect = Exception("main query failed") _capture_payloads(collector._check) with mock.patch.object(collector._check, 'create_dbm_client', return_value=mock_client): - with pytest.raises(Exception, match="main query failed"): - collector.collect_schemas() + collector.collect_schemas() mock_client.close.assert_called_once() assert collector._db_client is None diff --git a/datadog_checks_base/changelog.d/23913.fixed b/datadog_checks_base/changelog.d/23913.fixed new file mode 100644 index 0000000000000..aa5e1e7b5482b --- /dev/null +++ b/datadog_checks_base/changelog.d/23913.fixed @@ -0,0 +1 @@ +Send each logical database as its own independent schema snapshot, so an error or partial collection for one database does not affect others. diff --git a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py index 61edb65ade541..8945885e54af4 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py @@ -63,29 +63,46 @@ def collect_schemas(self) -> bool: This class relies on the owning check to handle scheduling this method. """ status = "success" + run_started_at = now_ms() + total_payloads_count = 0 try: - self._collection_started_at = now_ms() databases = self._get_databases() self._log.debug("Collecting schemas for %d databases", len(databases)) for database in databases: - self._log.debug("Starting collection of schemas for database %s", database['name']) - database_name = database['name'] + database_name = database.get('name') if not database_name: - self._log.warning("database has no name %v", database) + self._log.warning("database has no name: %s", database) continue - with self._get_cursor(database_name) as cursor: - # Get the next row from the cursor - next_row = self._get_next(cursor) - while next_row: - self._queued_rows.append(self._map_row(database, next_row)) - self._total_rows_count += 1 - # Because we're iterating over a cursor we need to try to get - # the next row to see if we've reached the last row + self._log.debug("Starting collection of schemas for database %s", database_name) + self._collection_started_at = now_ms() + self._collection_payloads_count = 0 + self._queued_rows = [] + db_rows_count = 0 + try: + with self._get_cursor(database_name) as cursor: + # Get the next row from the cursor next_row = self._get_next(cursor) - self.maybe_flush(is_last_payload=False) - is_last_payload = database == databases[-1] - self.maybe_flush(is_last_payload) - self._log.debug("Completed collection of schemas for database %s", database_name) + while next_row: + self._queued_rows.append(self._map_row(database, next_row)) + db_rows_count += 1 + # Because we're iterating over a cursor we need to try to get + # the next row to see if we've reached the last row + next_row = self._get_next(cursor) + self.maybe_flush(is_last_payload=False) + self.maybe_flush(is_last_payload=True) + self._total_rows_count += db_rows_count + total_payloads_count += self._collection_payloads_count + self._log.debug( + "Completed collection of schemas for database %s (%d rows, %d payloads)", + database_name, + db_rows_count, + self._collection_payloads_count, + ) + except Exception as e: + self._queued_rows = [] + total_payloads_count += self._collection_payloads_count + self._log.warning("Skipping database %s due to error: %s", database_name, e, exc_info=True) + continue except Exception as e: status = "error" self._log.error("Error collecting schema: %s", e) @@ -93,7 +110,7 @@ def collect_schemas(self) -> bool: finally: self._check.histogram( f"dd.{self._check.dbms}.schema.time", - now_ms() - self._collection_started_at, + now_ms() - run_started_at, tags=self._check.tags + ["status:" + status], hostname=self._check.reported_hostname, raw=True, @@ -107,12 +124,11 @@ def collect_schemas(self) -> bool: ) self._check.gauge( f"dd.{self._check.dbms}.schema.payloads_count", - self._collection_payloads_count, + total_payloads_count, tags=self._check.tags + ["status:" + status], hostname=self._check.reported_hostname, raw=True, ) - self._reset() return True diff --git a/datadog_checks_base/tests/base/utils/db/test_schemas.py b/datadog_checks_base/tests/base/utils/db/test_schemas.py index 18adf38336819..989c73ae53fcd 100644 --- a/datadog_checks_base/tests/base/utils/db/test_schemas.py +++ b/datadog_checks_base/tests/base/utils/db/test_schemas.py @@ -111,6 +111,54 @@ def _get_next(self, _cursor): return None +class TestSchemaCollectorWithInaccessibleDb(TestSchemaCollector): + """Simulates multiple databases where one raises an error on cursor open.""" + + __test__ = False + + def __init__(self, check, config): + super().__init__(check, config) + self._current_rows = [] + + def _get_databases(self): + return [{'name': 'db_accessible'}, {'name': 'db_inaccessible'}, {'name': 'db_also_accessible'}] + + @contextmanager + def _get_cursor(self, database: str): + if database == 'db_inaccessible': + raise RuntimeError("Cannot open database version 852") + self._current_rows = [{'table_name': 'users'}] + self._row_index = 0 + yield {} + + def _get_next(self, _cursor): + if self._row_index < len(self._current_rows): + row = self._current_rows[self._row_index] + self._row_index += 1 + return row + return None + + +class TestSchemaCollectorMidIterationFailure(TestSchemaCollector): + """Simulates a database that fails mid-iteration after at least one chunk has been flushed.""" + + __test__ = False + + def _get_databases(self): + return [{'name': 'db_partial_fail'}, {'name': 'db_ok'}] + + @contextmanager + def _get_cursor(self, database: str): + self._rows = [{'table_name': 'first'}, {'table_name': 'second'}] + self._row_index = 0 + yield {} + + def _map_row(self, database, cursor_row): + if database['name'] == 'db_partial_fail' and cursor_row.get('table_name') == 'second': + raise RuntimeError("mid-iteration error after first chunk flushed") + return super()._map_row(database, cursor_row) + + @pytest.mark.unit def test_schema_collector(aggregator): check = TestDatabaseCheck() @@ -134,20 +182,21 @@ def test_schema_collector(aggregator): @pytest.mark.unit def test_schema_collector_empty_last_database(aggregator): - """Verify that queued rows are flushed even when the last database returns 0 rows.""" + """Verify that each database sends its own terminal payload, even when it returns 0 rows.""" check = TestDatabaseCheck() collector = TestSchemaCollectorEmptyLastDb(check, SchemaCollectorConfig()) collector.collect_schemas() events = aggregator.get_event_platform_events("dbm-metadata") - assert len(events) == 1, "Expected 1 payload but got {}".format(len(events)) - event = events[0] - assert len(event['metadata']) == 2 - assert event['metadata'][0]['name'] == 'db_with_tables' - assert event['metadata'][0]['tables'][0]['table_name'] == 'users' - assert event['metadata'][1]['name'] == 'db_with_tables' - assert event['metadata'][1]['tables'][0]['table_name'] == 'orders' - assert event['collection_payloads_count'] == 1 + assert len(events) == 2, "Expected 2 payloads (one per database) but got {}".format(len(events)) + assert len(events[0]['metadata']) == 2 + assert events[0]['metadata'][0]['name'] == 'db_with_tables' + assert events[0]['metadata'][0]['tables'][0]['table_name'] == 'users' + assert events[0]['metadata'][1]['name'] == 'db_with_tables' + assert events[0]['metadata'][1]['tables'][0]['table_name'] == 'orders' + assert events[0]['collection_payloads_count'] == 1 + assert len(events[1]['metadata']) == 0 + assert events[1]['collection_payloads_count'] == 1 @pytest.mark.unit @@ -164,3 +213,70 @@ def test_schema_collector_chunk_size_flush(aggregator): assert len(events) == 2 assert 'collection_payloads_count' not in events[0] assert events[-1]['collection_payloads_count'] == len(events) + + +@pytest.mark.unit +def test_schema_collector_mid_iteration_failure(aggregator): + """Verify chunk payloads sent before a mid-iteration failure are counted correctly.""" + check = TestDatabaseCheck() + config = SchemaCollectorConfig() + config.payload_chunk_size = 1 + collector = TestSchemaCollectorMidIterationFailure(check, config) + result = collector.collect_schemas() + + assert result is True + events = aggregator.get_event_platform_events("dbm-metadata") + # db_partial_fail: 1 orphaned chunk (no terminal), db_ok: 2 chunks + 1 terminal + assert len(events) == 4 + + partial_fail_events = [e for e in events if any(r.get('name') == 'db_partial_fail' for r in e['metadata'])] + ok_events = [e for e in events if any(r.get('name') == 'db_ok' for r in e['metadata'])] + terminal_events = [e for e in events if 'collection_payloads_count' in e] + + assert len(partial_fail_events) == 1 + assert 'collection_payloads_count' not in partial_fail_events[0] + assert len(ok_events) == 2 + assert len(terminal_events) == 1 + assert terminal_events[0]['collection_payloads_count'] == 3 + + +@pytest.mark.unit +def test_schema_collector_skips_inaccessible_database(aggregator): + """An inaccessible database is skipped and collection continues for the remaining databases.""" + check = TestDatabaseCheck() + collector = TestSchemaCollectorWithInaccessibleDb(check, SchemaCollectorConfig()) + result = collector.collect_schemas() + + assert result is True + events = aggregator.get_event_platform_events("dbm-metadata") + assert len(events) == 2 + collected_names = [row['name'] for event in events for row in event['metadata']] + assert 'db_accessible' in collected_names + assert 'db_also_accessible' in collected_names + assert 'db_inaccessible' not in collected_names + + +@pytest.mark.unit +def test_schema_collector_skips_malformed_database_entry(aggregator): + """A database entry missing the 'name' key is skipped without aborting the run.""" + + class MalformedDbCollector(TestSchemaCollector): + __test__ = False + + def _get_databases(self): + return [{'name': 'db_good'}, {'no_name_key': 'bad'}, {'name': 'db_also_good'}] + + @contextmanager + def _get_cursor(self, database: str): + self._row_index = 0 + yield {} + + check = TestDatabaseCheck() + collector = MalformedDbCollector(check, SchemaCollectorConfig()) + result = collector.collect_schemas() + + assert result is True + events = aggregator.get_event_platform_events("dbm-metadata") + collected_names = [row['name'] for event in events for row in event['metadata']] + assert 'db_good' in collected_names + assert 'db_also_good' in collected_names