Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/release-trigger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,30 @@ jobs:
echo "is-stable-release=false" >> "$GITHUB_OUTPUT"
fi

# TODO: this notification currently runs before the approval gate, so it
# announces a *pending* release. Once the `release` environment gate is removed
# the release starts immediately — switch `needs` back to the release step and
# update the wording in notify_slack.py to "release starting".
notify:
name: Notify pending release
needs: context
runs-on: ubuntu-latest
steps:
- name: Checkout notification script
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
fetch-depth: 1
sparse-checkout: .github/workflows/scripts
- name: Notify Slack of pending release
env:
SLACK_API_TOKEN: ${{ secrets.SLACK_API_TOKEN }}
SLACK_CHANNEL_ID: ${{ secrets.SLACK_CHANNEL_ID_AGENT_INTEGRATIONS_RELEASE }}
SOURCE_REPO: integrations-core
REF: ${{ github.event_name == 'workflow_dispatch' && inputs.source-repo-ref || github.sha }}
PACKAGES: ${{ inputs.packages || '' }}
RUN_URL: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}
run: python3 .github/workflows/scripts/notify_slack.py

approve:
name: Await release approval
needs: context
Expand Down
64 changes: 64 additions & 0 deletions .github/workflows/scripts/notify_slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Post a 'wheel release starting' Slack notification via chat.postMessage.

Env: SLACK_API_TOKEN, SLACK_CHANNEL_ID (both required or no-op), SOURCE_REPO,
REF, PACKAGES, RUN_URL. Slack/network errors warn and never fail the job.
"""
import json
import os
import urllib.error
import urllib.request

SLACK_URL = "https://slack.com/api/chat.postMessage"


def build_text(source_repo: str, ref: str, packages: str, run_url: str) -> str:
"""Return the release notification Slack message body."""
# TODO: this fires before the approval gate, so the release is pending. Once
# the `release` environment gate is removed, reword to "Wheel release starting"
# with a "View release run" link, since the release will start immediately.
return (
f":hourglass_flowing_sand: *Wheel release pending approval* — `{source_repo}`\n"
f"• ref: `{ref[:12] or '—'}`\n"
f"• packages: {packages.strip() or 'auto-detect from tags at HEAD'}\n"
f"• <{run_url}|Review &amp; approve →>"
)


def post(token: str, channel: str, text: str) -> None:
"""Post *text* to *channel*, warning (not failing) on error."""
data = json.dumps({"channel": channel, "text": text, "unfurl_links": False}).encode()
request = urllib.request.Request(
SLACK_URL,
data=data,
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json; charset=utf-8"},
)
try:
with urllib.request.urlopen(request, timeout=15) as response:
body = json.loads(response.read())
except (urllib.error.URLError, TimeoutError, ValueError) as e:
print(f"::warning::Slack request failed: {e}")
return
if not body.get("ok"):
print(f"::warning::Slack notification failed: {body.get('error', 'unknown error')}")


def main() -> None:
token = os.environ.get("SLACK_API_TOKEN", "").strip()
channel = os.environ.get("SLACK_CHANNEL_ID", "").strip()
if not token or not channel:
print("Slack token or channel not configured; skipping notification.")
return
post(
token,
channel,
build_text(
os.environ.get("SOURCE_REPO", "integrations-core"),
os.environ.get("REF", ""),
os.environ.get("PACKAGES", ""),
os.environ.get("RUN_URL", ""),
),
)


if __name__ == "__main__":
main()
41 changes: 41 additions & 0 deletions .github/workflows/scripts/tests/test_notify_slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Tests for notify_slack."""
import urllib.error
from unittest.mock import patch

import notify_slack


def test_build_text_lists_packages():
text = notify_slack.build_text("integrations-core", "abcdef1234567890", '["postgres"]', "http://run")
assert "pending approval" in text
assert '["postgres"]' in text
assert "`abcdef123456`" in text
assert "http://run" in text


def test_build_text_auto_detect_and_dash_ref():
text = notify_slack.build_text("marketplace", "", "", "http://run")
assert "auto-detect from tags at HEAD" in text
assert "ref: `—`" in text


def test_main_no_op_without_config(monkeypatch):
monkeypatch.delenv("SLACK_API_TOKEN", raising=False)
monkeypatch.setenv("SLACK_CHANNEL_ID", "C1")
with patch.object(notify_slack, "post") as post:
notify_slack.main()
post.assert_not_called()


def test_main_posts_when_configured(monkeypatch):
monkeypatch.setenv("SLACK_API_TOKEN", "xoxb")
monkeypatch.setenv("SLACK_CHANNEL_ID", "C1")
with patch.object(notify_slack, "post") as post:
notify_slack.main()
post.assert_called_once()


def test_post_warns_on_error(capsys):
with patch("urllib.request.urlopen", side_effect=urllib.error.URLError("boom")):
notify_slack.post("token", "C1", "hi")
assert "failed" in capsys.readouterr().out

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions gpu/metadata.csv
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ gpu.memory.free,gauge,16,byte,,Unallocated device memory (in bytes).,0,gpu,memor
gpu.memory.limit,gauge,16,byte,,"Total device memory (framebuffer). This is always the device-level memory limit; the `pid` and `container_id` tags are present to enable per-process and per-container utilization formulas, but the value itself does not change.",0,gpu,memory.limit,,
gpu.memory.reserved,gauge,16,byte,,Device memory (in bytes) reserved for system use (driver or firmware).,0,gpu,memory.reserved,,
gpu.memory.temperature,gauge,16,degree celsius,,Temperature of the memory chip,0,gpu,memory.temperature,,
gpu.nccl.collective.algo_bandwidth_gbps,gauge,16,gigabyte,second,Algorithmic bandwidth of a collective operation per rank,0,gpu,nccl.collective.algo_bandwidth_gbps,,
gpu.nccl.collective.bus_bandwidth_gbps,gauge,16,gigabyte,second,Bus bandwidth of a collective operation per rank,0,gpu,nccl.collective.bus_bandwidth_gbps,,
gpu.nccl.collective.exec_time_us,gauge,16,microsecond,,Execution time of a collective operation per rank,0,gpu,nccl.collective.exec_time_us,,
gpu.nccl.collective.msg_size_bytes,gauge,16,byte,,Message size of a collective operation per rank,0,gpu,nccl.collective.msg_size_bytes,,
gpu.nccl.rank.seconds_since_last_event,gauge,16,second,,Seconds since the last NCCL event was received for a rank. Used for hang detection.,0,gpu,nccl.rank.seconds_since_last_event,,
gpu.nvlink.ber.effective,gauge,16,,,NVLink effective error counter total for all links (errors not corrected by FEC/recovery mechanisms).,0,gpu,nvlink.ber.effective,,
gpu.nvlink.ber.symbol,gauge,16,,,Symbol bit error rate for all NVLINK links,0,gpu,nvlink.ber.symbol,,
gpu.nvlink.count.active,gauge,16,,,Number of active nvlinks for the device,0,gpu,nvlink.count.active,,
Expand Down
1 change: 1 addition & 0 deletions postgres/changelog.d/24265.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Migrate the Postgres check's tag handling to the shared `TagManager`.
75 changes: 31 additions & 44 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def __init__(self, name, init_config, instances):
for warning in validation_result.warnings:
self.log.warning(warning)

self._tags = list(self._config.tags)
self.tag_manager.set_tags_from_list(self._config.tags, replace=True)
self.add_core_tags()

# Submit the initialization health event in case the `check` method is never called
Expand All @@ -151,9 +151,6 @@ def __init__(self, name, init_config, instances):
self.log.error("Configuration validation failed: %s", validation_result.errors)
raise validation_result.errors[0]

# Keep a copy of the tags without the internal resource tags so they can be used for paths that don't
# go through the agent internal metrics submission processing those tags
self._non_internal_tags = copy.deepcopy(self.tags)
self.set_resource_tags()
self.pg_settings = {}
self._warnings_by_code = {}
Expand Down Expand Up @@ -185,7 +182,6 @@ def __init__(self, name, init_config, instances):
self.check_initializations.append(self.load_system_identifier)
self.check_initializations.append(self.initialize_is_aurora)
self.check_initializations.append(self._query_manager.compile_queries)
self.tags_without_db = [t for t in copy.copy(self.tags) if not t.startswith("db:")]
self.autodiscovery = self._build_autodiscovery()
self._dynamic_queries = []
# _database_instance_emitted: limit the collection and transmission of the database instance metadata
Expand Down Expand Up @@ -246,8 +242,8 @@ def _build_autodiscovery(self):
return discovery

@property
def tags(self):
return self._tags
def tags_without_db(self):
return self.tag_manager.get_tags(include_db=False)

@property
def dbms(self):
Expand All @@ -258,40 +254,41 @@ def add_core_tags(self):
"""
Add tags that should be attached to every metric/event but which require check calculations outside the config.
"""
self.tags.append("database_hostname:{}".format(self.database_hostname))
self.tags.append("database_instance:{}".format(self.database_identifier))
self.tag_manager.set_tag("database_hostname", self.database_hostname, replace=True)
self.tag_manager.set_tag("database_instance", self.database_identifier, replace=True)

def set_resource_tags(self):
if self._config.gcp.project_id and self._config.gcp.instance_id:
self.tags.append(
"dd.internal.resource:gcp_sql_database_instance:{}:{}".format(
self._config.gcp.project_id, self._config.gcp.instance_id
)
self.tag_manager.set_tag(
"dd.internal.resource",
"gcp_sql_database_instance:{}:{}".format(self._config.gcp.project_id, self._config.gcp.instance_id),
)
if self._config.aws.instance_endpoint:
self.tags.append(
"dd.internal.resource:aws_rds_instance:{}".format(
self._config.aws.instance_endpoint,
)
self.tag_manager.set_tag(
"dd.internal.resource",
"aws_rds_instance:{}".format(self._config.aws.instance_endpoint),
)
elif AWS_RDS_HOSTNAME_SUFFIX in self.resolved_hostname:
# allow for detecting if the host is an RDS host, and emit
# the resource properly even if the `aws` config is unset
self.tags.append("dd.internal.resource:aws_rds_instance:{}".format(self.resolved_hostname))
self.tag_manager.set_tag(
"dd.internal.resource",
"aws_rds_instance:{}".format(self.resolved_hostname),
)
if self._config.azure.deployment_type and self._config.azure.fully_qualified_domain_name:
deployment_type = self._config.azure.deployment_type
# some `deployment_type`s map to multiple `resource_type`s
resource_type = AZURE_DEPLOYMENT_TYPE_TO_RESOURCE_TYPE.get(deployment_type)
if resource_type:
self.tags.append(
"dd.internal.resource:{}:{}".format(resource_type, self._config.azure.fully_qualified_domain_name)
self.tag_manager.set_tag(
"dd.internal.resource",
"{}:{}".format(resource_type, self._config.azure.fully_qualified_domain_name),
)
# finally, tag the `database_instance` resource for this instance
# metrics intake will use this tag to add all the tags for the instance
self.tags.append(
"dd.internal.resource:database_instance:{}".format(
self.database_identifier,
)
self.tag_manager.set_tag(
"dd.internal.resource",
"database_instance:{}".format(self.database_identifier),
)

def _new_query_executor(self, queries, db):
Expand Down Expand Up @@ -1191,7 +1188,7 @@ def _send_database_instance_metadata(self):
"collection_interval": self._config.database_instance_collection_interval,
'dbms_version': self.dbms_version,
'integration_version': __version__,
"tags": [t for t in self._non_internal_tags if not t.startswith('db:')],
"tags": self.tag_manager.get_tags(include_internal=False, include_db=False),
"timestamp": time() * 1000,
"cloud_metadata": self.cloud_metadata,
"metadata": {
Expand All @@ -1213,11 +1210,9 @@ def check(self, _):
# Resend the initialization event. The submitter will debounce it
self._submit_initialization_health_event()

tags = copy.copy(self.tags)
self.tags_without_db = [t for t in copy.copy(self.tags) if not t.startswith("db:")]
# Reset _non_internal_tags to prevent stale dynamic tags (e.g., replication_role) from accumulating
self._non_internal_tags = [t for t in copy.copy(self.tags) if not t.startswith("dd.internal")]
tags_to_add = []
# Tags computed before connecting, used for the service check if an early failure occurs.
# Recomputed below once the dynamic tags (version, cluster name, etc.) have been set.
tags = self.tag_manager.get_tags()
try:
# Check version
self._connect()
Expand All @@ -1228,24 +1223,20 @@ def check(self, _):
self.wal_level = self._get_wal_level()

# Add raw version as a tag
tags.append(f'postgresql_version:{self.raw_version}')
tags_to_add.append(f'postgresql_version:{self.raw_version}')
self.tag_manager.set_tag('postgresql_version', self.raw_version, replace=True)

# Add system identifier as a tag
if self.system_identifier:
tags.append(f'system_identifier:{self.system_identifier}')
tags_to_add.append(f'system_identifier:{self.system_identifier}')
self.tag_manager.set_tag('system_identifier', str(self.system_identifier), replace=True)

# Add cluster name if it was set
if self.cluster_name:
tags.append(f'postgresql_cluster_name:{self.cluster_name}')
tags_to_add.append(f'postgresql_cluster_name:{self.cluster_name}')
self.tag_manager.set_tag('postgresql_cluster_name', self.cluster_name, replace=True)

if self._config.tag_replication_role:
replication_role_tag = "replication_role:{}".format(self._get_replication_role())
tags.append(replication_role_tag)
tags_to_add.append(replication_role_tag)
self._update_tag_sets(tags_to_add)
self.tag_manager.set_tag('replication_role', self._get_replication_role(), replace=True)

tags = self.tag_manager.get_tags()
self._send_database_instance_metadata()

self.log.debug("Running check against version %s: is_aurora: %s", str(self.version), str(self.is_aurora))
Expand Down Expand Up @@ -1316,7 +1307,3 @@ def _run_automatic_diagnostics(self):
self.log.exception("Error during automatic diagnostics: %s", e)
finally:
self.log.info("Automatic diagnostics completed")

def _update_tag_sets(self, tags):
self._non_internal_tags = list(set(self._non_internal_tags) | set(tags))
self.tags_without_db = list(set(self.tags_without_db) | set(tags))
5 changes: 4 additions & 1 deletion postgres/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@
POSTGRES_VERSION = os.environ.get('POSTGRES_VERSION', None)
POSTGRES_IMAGE = "alpine"
POSTGRES_LOCALE = os.environ.get('POSTGRES_LOCALE', "UTF8")
POSTGRES_IMAGE_TAG = os.environ.get('POSTGRES_IMAGE_TAG', None)

REPLICA_CONTAINER_1_NAME = 'compose-postgres_replica-1'
REPLICA_CONTAINER_2_NAME = 'compose-postgres_replica2-1'
REPLICA_LOGICAL_1_NAME = 'compose-postgres_logical_replica-1'
USING_LATEST = False

if POSTGRES_VERSION is not None:
if POSTGRES_IMAGE_TAG is not None:
POSTGRES_IMAGE = POSTGRES_IMAGE_TAG + "-alpine"
elif POSTGRES_VERSION is not None:
USING_LATEST = POSTGRES_VERSION.endswith('latest')
POSTGRES_IMAGE = POSTGRES_VERSION + "-alpine"

Expand Down
Loading
Loading