Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/flaky-tests.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
name: Daily Flaky Tests (every 8 hours)

on:
pull_request:
paths:
- .github/workflows/test-results-master.yml
workflow_dispatch:
inputs:
dry_run:
description: "Skip committing the test-results badge."
required: false
default: false
type: boolean
schedule:
# 4 AM, 12 PM, 8 PM UTC
- cron: "0 4,12,20 * * *"
Expand Down Expand Up @@ -46,4 +55,6 @@ jobs:
pull-requests: write # Needed for test-results-master

uses: ./.github/workflows/test-results-master.yml
with:
dry_run: ${{ github.event_name == 'pull_request' || inputs.dry_run || false }}
secrets: inherit
4 changes: 3 additions & 1 deletion .github/workflows/release-hash-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:
- '7.[0-9]+.x'
paths:
- .in-toto/*.link
- .github/workflows/release-hash-check.yml

jobs:
build:
Expand All @@ -29,4 +30,5 @@ jobs:
with:
files: .in-toto/*.link

- run: python .github/workflows/release-hash-check.py ${{ steps.changed-files.outputs.all_changed_files }}
- if: steps.changed-files.outputs.any_changed == 'true'
run: python .github/workflows/release-hash-check.py ${{ steps.changed-files.outputs.all_changed_files }}
7 changes: 7 additions & 0 deletions .github/workflows/test-results-master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ name: Test results for the master branch

on:
workflow_call:
inputs:
dry_run:
description: "Run the badge step but skip committing to the badges branch."
required: false
default: false
type: boolean

defaults:
run:
Expand Down Expand Up @@ -66,6 +72,7 @@ jobs:
git config --local user.email "github-actions[bot]@users.noreply.github.com"

- name: Commit if stats have changed
if: ${{ !inputs.dry_run }}
run: |-
git add "${{ env.BADGE_PATH }}"
if git commit -m "Update test results from ${{ github.sha }}"; then
Expand Down
5 changes: 3 additions & 2 deletions clickhouse/tests/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,13 +571,14 @@ def test_max_execution_time_set_on_client(collector):


def test_main_query_failure_closes_client(collector):
# The base class catches per-DB errors and continues; the caller (run_job) catches
# any remaining exception. What matters is that the client is properly closed.
mock_client = mock.MagicMock()
mock_client.query_rows_stream.return_value.__enter__.side_effect = Exception("main query failed")

_capture_payloads(collector._check)
with mock.patch.object(collector._check, 'create_dbm_client', return_value=mock_client):
with pytest.raises(Exception, match="main query failed"):
collector.collect_schemas()
collector.collect_schemas()

mock_client.close.assert_called_once()
assert collector._db_client is None
Expand Down
1 change: 1 addition & 0 deletions datadog_checks_base/changelog.d/23913.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Send each logical database as its own independent schema snapshot, so an error or partial collection for one database does not affect others.
54 changes: 35 additions & 19 deletions datadog_checks_base/datadog_checks/base/utils/db/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,37 +63,54 @@ def collect_schemas(self) -> bool:
This class relies on the owning check to handle scheduling this method.
"""
status = "success"
run_started_at = now_ms()
total_payloads_count = 0
try:
self._collection_started_at = now_ms()
databases = self._get_databases()
self._log.debug("Collecting schemas for %d databases", len(databases))
for database in databases:
self._log.debug("Starting collection of schemas for database %s", database['name'])
database_name = database['name']
database_name = database.get('name')
if not database_name:
self._log.warning("database has no name %v", database)
self._log.warning("database has no name: %s", database)
continue
with self._get_cursor(database_name) as cursor:
# Get the next row from the cursor
next_row = self._get_next(cursor)
while next_row:
self._queued_rows.append(self._map_row(database, next_row))
self._total_rows_count += 1
# Because we're iterating over a cursor we need to try to get
# the next row to see if we've reached the last row
self._log.debug("Starting collection of schemas for database %s", database_name)
self._collection_started_at = now_ms()
self._collection_payloads_count = 0
self._queued_rows = []
db_rows_count = 0
try:
with self._get_cursor(database_name) as cursor:
# Get the next row from the cursor
next_row = self._get_next(cursor)
self.maybe_flush(is_last_payload=False)
is_last_payload = database == databases[-1]
self.maybe_flush(is_last_payload)
self._log.debug("Completed collection of schemas for database %s", database_name)
while next_row:
self._queued_rows.append(self._map_row(database, next_row))
db_rows_count += 1
# Because we're iterating over a cursor we need to try to get
# the next row to see if we've reached the last row
next_row = self._get_next(cursor)
self.maybe_flush(is_last_payload=False)
self.maybe_flush(is_last_payload=True)
self._total_rows_count += db_rows_count
total_payloads_count += self._collection_payloads_count
self._log.debug(
"Completed collection of schemas for database %s (%d rows, %d payloads)",
database_name,
db_rows_count,
self._collection_payloads_count,
)
except Exception as e:
self._queued_rows = []
total_payloads_count += self._collection_payloads_count
self._log.warning("Skipping database %s due to error: %s", database_name, e, exc_info=True)
continue
except Exception as e:
status = "error"
self._log.error("Error collecting schema: %s", e)
raise e
finally:
self._check.histogram(
f"dd.{self._check.dbms}.schema.time",
now_ms() - self._collection_started_at,
now_ms() - run_started_at,
tags=self._check.tags + ["status:" + status],
hostname=self._check.reported_hostname,
raw=True,
Expand All @@ -107,12 +124,11 @@ def collect_schemas(self) -> bool:
)
self._check.gauge(
f"dd.{self._check.dbms}.schema.payloads_count",
self._collection_payloads_count,
total_payloads_count,
tags=self._check.tags + ["status:" + status],
hostname=self._check.reported_hostname,
raw=True,
)

self._reset()
return True

Expand Down
134 changes: 125 additions & 9 deletions datadog_checks_base/tests/base/utils/db/test_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,54 @@ def _get_next(self, _cursor):
return None


class TestSchemaCollectorWithInaccessibleDb(TestSchemaCollector):
"""Simulates multiple databases where one raises an error on cursor open."""

__test__ = False

def __init__(self, check, config):
super().__init__(check, config)
self._current_rows = []

def _get_databases(self):
return [{'name': 'db_accessible'}, {'name': 'db_inaccessible'}, {'name': 'db_also_accessible'}]

@contextmanager
def _get_cursor(self, database: str):
if database == 'db_inaccessible':
raise RuntimeError("Cannot open database version 852")
self._current_rows = [{'table_name': 'users'}]
self._row_index = 0
yield {}

def _get_next(self, _cursor):
if self._row_index < len(self._current_rows):
row = self._current_rows[self._row_index]
self._row_index += 1
return row
return None


class TestSchemaCollectorMidIterationFailure(TestSchemaCollector):
"""Simulates a database that fails mid-iteration after at least one chunk has been flushed."""

__test__ = False

def _get_databases(self):
return [{'name': 'db_partial_fail'}, {'name': 'db_ok'}]

@contextmanager
def _get_cursor(self, database: str):
self._rows = [{'table_name': 'first'}, {'table_name': 'second'}]
self._row_index = 0
yield {}

def _map_row(self, database, cursor_row):
if database['name'] == 'db_partial_fail' and cursor_row.get('table_name') == 'second':
raise RuntimeError("mid-iteration error after first chunk flushed")
return super()._map_row(database, cursor_row)


@pytest.mark.unit
def test_schema_collector(aggregator):
check = TestDatabaseCheck()
Expand All @@ -134,20 +182,21 @@ def test_schema_collector(aggregator):

@pytest.mark.unit
def test_schema_collector_empty_last_database(aggregator):
"""Verify that queued rows are flushed even when the last database returns 0 rows."""
"""Verify that each database sends its own terminal payload, even when it returns 0 rows."""
check = TestDatabaseCheck()
collector = TestSchemaCollectorEmptyLastDb(check, SchemaCollectorConfig())
collector.collect_schemas()

events = aggregator.get_event_platform_events("dbm-metadata")
assert len(events) == 1, "Expected 1 payload but got {}".format(len(events))
event = events[0]
assert len(event['metadata']) == 2
assert event['metadata'][0]['name'] == 'db_with_tables'
assert event['metadata'][0]['tables'][0]['table_name'] == 'users'
assert event['metadata'][1]['name'] == 'db_with_tables'
assert event['metadata'][1]['tables'][0]['table_name'] == 'orders'
assert event['collection_payloads_count'] == 1
assert len(events) == 2, "Expected 2 payloads (one per database) but got {}".format(len(events))
assert len(events[0]['metadata']) == 2
assert events[0]['metadata'][0]['name'] == 'db_with_tables'
assert events[0]['metadata'][0]['tables'][0]['table_name'] == 'users'
assert events[0]['metadata'][1]['name'] == 'db_with_tables'
assert events[0]['metadata'][1]['tables'][0]['table_name'] == 'orders'
assert events[0]['collection_payloads_count'] == 1
assert len(events[1]['metadata']) == 0
assert events[1]['collection_payloads_count'] == 1


@pytest.mark.unit
Expand All @@ -164,3 +213,70 @@ def test_schema_collector_chunk_size_flush(aggregator):
assert len(events) == 2
assert 'collection_payloads_count' not in events[0]
assert events[-1]['collection_payloads_count'] == len(events)


@pytest.mark.unit
def test_schema_collector_mid_iteration_failure(aggregator):
"""Verify chunk payloads sent before a mid-iteration failure are counted correctly."""
check = TestDatabaseCheck()
config = SchemaCollectorConfig()
config.payload_chunk_size = 1
collector = TestSchemaCollectorMidIterationFailure(check, config)
result = collector.collect_schemas()

assert result is True
events = aggregator.get_event_platform_events("dbm-metadata")
# db_partial_fail: 1 orphaned chunk (no terminal), db_ok: 2 chunks + 1 terminal
assert len(events) == 4

partial_fail_events = [e for e in events if any(r.get('name') == 'db_partial_fail' for r in e['metadata'])]
ok_events = [e for e in events if any(r.get('name') == 'db_ok' for r in e['metadata'])]
terminal_events = [e for e in events if 'collection_payloads_count' in e]

assert len(partial_fail_events) == 1
assert 'collection_payloads_count' not in partial_fail_events[0]
assert len(ok_events) == 2
assert len(terminal_events) == 1
assert terminal_events[0]['collection_payloads_count'] == 3


@pytest.mark.unit
def test_schema_collector_skips_inaccessible_database(aggregator):
"""An inaccessible database is skipped and collection continues for the remaining databases."""
check = TestDatabaseCheck()
collector = TestSchemaCollectorWithInaccessibleDb(check, SchemaCollectorConfig())
result = collector.collect_schemas()

assert result is True
events = aggregator.get_event_platform_events("dbm-metadata")
assert len(events) == 2
collected_names = [row['name'] for event in events for row in event['metadata']]
assert 'db_accessible' in collected_names
assert 'db_also_accessible' in collected_names
assert 'db_inaccessible' not in collected_names


@pytest.mark.unit
def test_schema_collector_skips_malformed_database_entry(aggregator):
"""A database entry missing the 'name' key is skipped without aborting the run."""

class MalformedDbCollector(TestSchemaCollector):
__test__ = False

def _get_databases(self):
return [{'name': 'db_good'}, {'no_name_key': 'bad'}, {'name': 'db_also_good'}]

@contextmanager
def _get_cursor(self, database: str):
self._row_index = 0
yield {}

check = TestDatabaseCheck()
collector = MalformedDbCollector(check, SchemaCollectorConfig())
result = collector.collect_schemas()

assert result is True
events = aggregator.get_event_platform_events("dbm-metadata")
collected_names = [row['name'] for event in events for row in event['metadata']]
assert 'db_good' in collected_names
assert 'db_also_good' in collected_names
Loading