diff --git a/.github/workflows/flaky-tests.yml b/.github/workflows/flaky-tests.yml index b3f4331a56b07..253eb156e9824 100644 --- a/.github/workflows/flaky-tests.yml +++ b/.github/workflows/flaky-tests.yml @@ -1,7 +1,16 @@ name: Daily Flaky Tests (every 8 hours) on: + pull_request: + paths: + - .github/workflows/test-results-master.yml workflow_dispatch: + inputs: + dry_run: + description: "Skip committing the test-results badge." + required: false + default: false + type: boolean schedule: # 4 AM, 12 PM, 8 PM UTC - cron: "0 4,12,20 * * *" @@ -46,4 +55,6 @@ jobs: pull-requests: write # Needed for test-results-master uses: ./.github/workflows/test-results-master.yml + with: + dry_run: ${{ github.event_name == 'pull_request' || inputs.dry_run || false }} secrets: inherit diff --git a/.github/workflows/release-hash-check.yml b/.github/workflows/release-hash-check.yml index 6c1d6c4cd91fe..02fe95dde8201 100644 --- a/.github/workflows/release-hash-check.yml +++ b/.github/workflows/release-hash-check.yml @@ -7,6 +7,7 @@ on: - '7.[0-9]+.x' paths: - .in-toto/*.link + - .github/workflows/release-hash-check.yml jobs: build: @@ -29,4 +30,5 @@ jobs: with: files: .in-toto/*.link - - run: python .github/workflows/release-hash-check.py ${{ steps.changed-files.outputs.all_changed_files }} + - if: steps.changed-files.outputs.any_changed == 'true' + run: python .github/workflows/release-hash-check.py ${{ steps.changed-files.outputs.all_changed_files }} diff --git a/.github/workflows/test-results-master.yml b/.github/workflows/test-results-master.yml index 169d781ca46d8..2917e771d79fd 100644 --- a/.github/workflows/test-results-master.yml +++ b/.github/workflows/test-results-master.yml @@ -2,6 +2,12 @@ name: Test results for the master branch on: workflow_call: + inputs: + dry_run: + description: "Run the badge step but skip committing to the badges branch." + required: false + default: false + type: boolean defaults: run: @@ -66,6 +72,7 @@ jobs: git config --local user.email "github-actions[bot]@users.noreply.github.com" - name: Commit if stats have changed + if: ${{ !inputs.dry_run }} run: |- git add "${{ env.BADGE_PATH }}" if git commit -m "Update test results from ${{ github.sha }}"; then diff --git a/clickhouse/tests/test_metadata.py b/clickhouse/tests/test_metadata.py index 1e05dd37997be..fd954de239d50 100644 --- a/clickhouse/tests/test_metadata.py +++ b/clickhouse/tests/test_metadata.py @@ -571,13 +571,14 @@ def test_max_execution_time_set_on_client(collector): def test_main_query_failure_closes_client(collector): + # The base class catches per-DB errors and continues; the caller (run_job) catches + # any remaining exception. What matters is that the client is properly closed. mock_client = mock.MagicMock() mock_client.query_rows_stream.return_value.__enter__.side_effect = Exception("main query failed") _capture_payloads(collector._check) with mock.patch.object(collector._check, 'create_dbm_client', return_value=mock_client): - with pytest.raises(Exception, match="main query failed"): - collector.collect_schemas() + collector.collect_schemas() mock_client.close.assert_called_once() assert collector._db_client is None diff --git a/datadog_checks_base/changelog.d/23913.fixed b/datadog_checks_base/changelog.d/23913.fixed new file mode 100644 index 0000000000000..aa5e1e7b5482b --- /dev/null +++ b/datadog_checks_base/changelog.d/23913.fixed @@ -0,0 +1 @@ +Send each logical database as its own independent schema snapshot, so an error or partial collection for one database does not affect others. diff --git a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py index 61edb65ade541..8945885e54af4 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/schemas.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/schemas.py @@ -63,29 +63,46 @@ def collect_schemas(self) -> bool: This class relies on the owning check to handle scheduling this method. """ status = "success" + run_started_at = now_ms() + total_payloads_count = 0 try: - self._collection_started_at = now_ms() databases = self._get_databases() self._log.debug("Collecting schemas for %d databases", len(databases)) for database in databases: - self._log.debug("Starting collection of schemas for database %s", database['name']) - database_name = database['name'] + database_name = database.get('name') if not database_name: - self._log.warning("database has no name %v", database) + self._log.warning("database has no name: %s", database) continue - with self._get_cursor(database_name) as cursor: - # Get the next row from the cursor - next_row = self._get_next(cursor) - while next_row: - self._queued_rows.append(self._map_row(database, next_row)) - self._total_rows_count += 1 - # Because we're iterating over a cursor we need to try to get - # the next row to see if we've reached the last row + self._log.debug("Starting collection of schemas for database %s", database_name) + self._collection_started_at = now_ms() + self._collection_payloads_count = 0 + self._queued_rows = [] + db_rows_count = 0 + try: + with self._get_cursor(database_name) as cursor: + # Get the next row from the cursor next_row = self._get_next(cursor) - self.maybe_flush(is_last_payload=False) - is_last_payload = database == databases[-1] - self.maybe_flush(is_last_payload) - self._log.debug("Completed collection of schemas for database %s", database_name) + while next_row: + self._queued_rows.append(self._map_row(database, next_row)) + db_rows_count += 1 + # Because we're iterating over a cursor we need to try to get + # the next row to see if we've reached the last row + next_row = self._get_next(cursor) + self.maybe_flush(is_last_payload=False) + self.maybe_flush(is_last_payload=True) + self._total_rows_count += db_rows_count + total_payloads_count += self._collection_payloads_count + self._log.debug( + "Completed collection of schemas for database %s (%d rows, %d payloads)", + database_name, + db_rows_count, + self._collection_payloads_count, + ) + except Exception as e: + self._queued_rows = [] + total_payloads_count += self._collection_payloads_count + self._log.warning("Skipping database %s due to error: %s", database_name, e, exc_info=True) + continue except Exception as e: status = "error" self._log.error("Error collecting schema: %s", e) @@ -93,7 +110,7 @@ def collect_schemas(self) -> bool: finally: self._check.histogram( f"dd.{self._check.dbms}.schema.time", - now_ms() - self._collection_started_at, + now_ms() - run_started_at, tags=self._check.tags + ["status:" + status], hostname=self._check.reported_hostname, raw=True, @@ -107,12 +124,11 @@ def collect_schemas(self) -> bool: ) self._check.gauge( f"dd.{self._check.dbms}.schema.payloads_count", - self._collection_payloads_count, + total_payloads_count, tags=self._check.tags + ["status:" + status], hostname=self._check.reported_hostname, raw=True, ) - self._reset() return True diff --git a/datadog_checks_base/tests/base/utils/db/test_schemas.py b/datadog_checks_base/tests/base/utils/db/test_schemas.py index 18adf38336819..989c73ae53fcd 100644 --- a/datadog_checks_base/tests/base/utils/db/test_schemas.py +++ b/datadog_checks_base/tests/base/utils/db/test_schemas.py @@ -111,6 +111,54 @@ def _get_next(self, _cursor): return None +class TestSchemaCollectorWithInaccessibleDb(TestSchemaCollector): + """Simulates multiple databases where one raises an error on cursor open.""" + + __test__ = False + + def __init__(self, check, config): + super().__init__(check, config) + self._current_rows = [] + + def _get_databases(self): + return [{'name': 'db_accessible'}, {'name': 'db_inaccessible'}, {'name': 'db_also_accessible'}] + + @contextmanager + def _get_cursor(self, database: str): + if database == 'db_inaccessible': + raise RuntimeError("Cannot open database version 852") + self._current_rows = [{'table_name': 'users'}] + self._row_index = 0 + yield {} + + def _get_next(self, _cursor): + if self._row_index < len(self._current_rows): + row = self._current_rows[self._row_index] + self._row_index += 1 + return row + return None + + +class TestSchemaCollectorMidIterationFailure(TestSchemaCollector): + """Simulates a database that fails mid-iteration after at least one chunk has been flushed.""" + + __test__ = False + + def _get_databases(self): + return [{'name': 'db_partial_fail'}, {'name': 'db_ok'}] + + @contextmanager + def _get_cursor(self, database: str): + self._rows = [{'table_name': 'first'}, {'table_name': 'second'}] + self._row_index = 0 + yield {} + + def _map_row(self, database, cursor_row): + if database['name'] == 'db_partial_fail' and cursor_row.get('table_name') == 'second': + raise RuntimeError("mid-iteration error after first chunk flushed") + return super()._map_row(database, cursor_row) + + @pytest.mark.unit def test_schema_collector(aggregator): check = TestDatabaseCheck() @@ -134,20 +182,21 @@ def test_schema_collector(aggregator): @pytest.mark.unit def test_schema_collector_empty_last_database(aggregator): - """Verify that queued rows are flushed even when the last database returns 0 rows.""" + """Verify that each database sends its own terminal payload, even when it returns 0 rows.""" check = TestDatabaseCheck() collector = TestSchemaCollectorEmptyLastDb(check, SchemaCollectorConfig()) collector.collect_schemas() events = aggregator.get_event_platform_events("dbm-metadata") - assert len(events) == 1, "Expected 1 payload but got {}".format(len(events)) - event = events[0] - assert len(event['metadata']) == 2 - assert event['metadata'][0]['name'] == 'db_with_tables' - assert event['metadata'][0]['tables'][0]['table_name'] == 'users' - assert event['metadata'][1]['name'] == 'db_with_tables' - assert event['metadata'][1]['tables'][0]['table_name'] == 'orders' - assert event['collection_payloads_count'] == 1 + assert len(events) == 2, "Expected 2 payloads (one per database) but got {}".format(len(events)) + assert len(events[0]['metadata']) == 2 + assert events[0]['metadata'][0]['name'] == 'db_with_tables' + assert events[0]['metadata'][0]['tables'][0]['table_name'] == 'users' + assert events[0]['metadata'][1]['name'] == 'db_with_tables' + assert events[0]['metadata'][1]['tables'][0]['table_name'] == 'orders' + assert events[0]['collection_payloads_count'] == 1 + assert len(events[1]['metadata']) == 0 + assert events[1]['collection_payloads_count'] == 1 @pytest.mark.unit @@ -164,3 +213,70 @@ def test_schema_collector_chunk_size_flush(aggregator): assert len(events) == 2 assert 'collection_payloads_count' not in events[0] assert events[-1]['collection_payloads_count'] == len(events) + + +@pytest.mark.unit +def test_schema_collector_mid_iteration_failure(aggregator): + """Verify chunk payloads sent before a mid-iteration failure are counted correctly.""" + check = TestDatabaseCheck() + config = SchemaCollectorConfig() + config.payload_chunk_size = 1 + collector = TestSchemaCollectorMidIterationFailure(check, config) + result = collector.collect_schemas() + + assert result is True + events = aggregator.get_event_platform_events("dbm-metadata") + # db_partial_fail: 1 orphaned chunk (no terminal), db_ok: 2 chunks + 1 terminal + assert len(events) == 4 + + partial_fail_events = [e for e in events if any(r.get('name') == 'db_partial_fail' for r in e['metadata'])] + ok_events = [e for e in events if any(r.get('name') == 'db_ok' for r in e['metadata'])] + terminal_events = [e for e in events if 'collection_payloads_count' in e] + + assert len(partial_fail_events) == 1 + assert 'collection_payloads_count' not in partial_fail_events[0] + assert len(ok_events) == 2 + assert len(terminal_events) == 1 + assert terminal_events[0]['collection_payloads_count'] == 3 + + +@pytest.mark.unit +def test_schema_collector_skips_inaccessible_database(aggregator): + """An inaccessible database is skipped and collection continues for the remaining databases.""" + check = TestDatabaseCheck() + collector = TestSchemaCollectorWithInaccessibleDb(check, SchemaCollectorConfig()) + result = collector.collect_schemas() + + assert result is True + events = aggregator.get_event_platform_events("dbm-metadata") + assert len(events) == 2 + collected_names = [row['name'] for event in events for row in event['metadata']] + assert 'db_accessible' in collected_names + assert 'db_also_accessible' in collected_names + assert 'db_inaccessible' not in collected_names + + +@pytest.mark.unit +def test_schema_collector_skips_malformed_database_entry(aggregator): + """A database entry missing the 'name' key is skipped without aborting the run.""" + + class MalformedDbCollector(TestSchemaCollector): + __test__ = False + + def _get_databases(self): + return [{'name': 'db_good'}, {'no_name_key': 'bad'}, {'name': 'db_also_good'}] + + @contextmanager + def _get_cursor(self, database: str): + self._row_index = 0 + yield {} + + check = TestDatabaseCheck() + collector = MalformedDbCollector(check, SchemaCollectorConfig()) + result = collector.collect_schemas() + + assert result is True + events = aggregator.get_event_platform_events("dbm-metadata") + collected_names = [row['name'] for event in events for row in event['metadata']] + assert 'db_good' in collected_names + assert 'db_also_good' in collected_names