diff --git a/Dockerfile b/Dockerfile index 4b8d3d15d..561b0bea9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,8 +17,11 @@ COPY dist/*.whl ./dist/ RUN poetry config virtualenvs.create false \ && poetry install --only main --no-interaction --no-ansi || true -# Build and install the package -RUN pip install dist/*.whl +# Build and install the package with the metrics extra (for DogStatsD support). +# Two-step install: first install the wheel, then reinstall with extras using +# the resolved filename (shell glob + [extras] syntax don't mix in a single arg). +RUN pip install dist/*.whl \ + && pip install "$(ls dist/*.whl)[metrics]" # Recreate the original structure RUN mkdir -p source_declarative_manifest \ diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 54c207487..b3ec3dc53 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -23,6 +23,7 @@ from airbyte_cdk.connector import TConfig from airbyte_cdk.exception_handler import init_uncaught_exception_handler from airbyte_cdk.logger import PRINT_BUFFER, init_logger, is_platform_debug_log_enabled +from airbyte_cdk.metrics import get_metrics_client from airbyte_cdk.models import ( AirbyteConnectionStatus, AirbyteMessage, @@ -275,10 +276,26 @@ def read( if self.source.check_config_against_spec: self.validate_connection(source_spec, config) + # Initialize and emit initial memory metrics + metrics_client = get_metrics_client() + metrics_client.initialize() + metrics_client.emit_memory_metrics() + # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) - for message in self.source.read(self.logger, config, catalog, state): - yield self.handle_record_counts(message, stream_message_counter) + _emit_check_counter = 0 + try: + for message in self.source.read(self.logger, config, catalog, state): + yield self.handle_record_counts(message, stream_message_counter) + # Check for metric emission every 1000 records to avoid + # a time.monotonic() syscall on every single record. + _emit_check_counter += 1 + if _emit_check_counter >= 1000: + _emit_check_counter = 0 + metrics_client.maybe_emit_memory_metrics() + finally: + # Emit final memory metrics snapshot (runs even if the read loop raises) + metrics_client.emit_memory_metrics() for message in self._emit_queued_messages(self.source): yield self.handle_record_counts(message, stream_message_counter) diff --git a/airbyte_cdk/metrics/__init__.py b/airbyte_cdk/metrics/__init__.py new file mode 100644 index 000000000..fb2c3cc06 --- /dev/null +++ b/airbyte_cdk/metrics/__init__.py @@ -0,0 +1,31 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# + +""" +Metrics module for Python source connectors. + +Provides DogStatsD-based metric emission for memory and resource monitoring. +Designed to be a graceful no-op when DD_AGENT_HOST is not set (local dev, CI). +""" + +from airbyte_cdk.metrics.memory import MemoryInfo, get_memory_info +from airbyte_cdk.metrics.metrics_client import ( + DEFAULT_EMISSION_INTERVAL_SECONDS, + METRIC_MEMORY_LIMIT_BYTES, + METRIC_MEMORY_USAGE_BYTES, + MetricsClient, + get_metrics_client, + reset_metrics_client, +) + +__all__ = [ + "DEFAULT_EMISSION_INTERVAL_SECONDS", + "METRIC_MEMORY_LIMIT_BYTES", + "METRIC_MEMORY_USAGE_BYTES", + "MemoryInfo", + "MetricsClient", + "get_memory_info", + "get_metrics_client", + "reset_metrics_client", +] diff --git a/airbyte_cdk/metrics/memory.py b/airbyte_cdk/metrics/memory.py new file mode 100644 index 000000000..087551992 --- /dev/null +++ b/airbyte_cdk/metrics/memory.py @@ -0,0 +1,155 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# + +""" +Memory and resource metrics reader for Python source connectors. + +Reads container memory usage and limits from cgroup v2 files (standard in K8s pods), +with fallback to resource.getrusage for non-containerized environments. +""" + +import logging +import resource +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +# cgroup v2 file paths (standard in modern K8s pods) +CGROUP_V2_MEMORY_CURRENT = Path("/sys/fs/cgroup/memory.current") +CGROUP_V2_MEMORY_MAX = Path("/sys/fs/cgroup/memory.max") + +# cgroup v1 file paths (legacy, some older environments) +CGROUP_V1_MEMORY_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes") +CGROUP_V1_MEMORY_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes") + +# "max" in cgroup v2 means no limit is set +CGROUP_NO_LIMIT = "max" + +# Threshold for considering a cgroup v1 limit as "no limit" (very large values ~= PAGE_COUNTER_MAX) +# Values near 2^63 typically mean no limit is configured +CGROUP_V1_NO_LIMIT_THRESHOLD = 2**62 + + +@dataclass(frozen=True) +class MemoryInfo: + """Container memory usage information.""" + + usage_bytes: int + limit_bytes: Optional[int] + + @property + def usage_percent(self) -> Optional[float]: + """Return memory usage as a fraction of the limit (0.0 to 1.0), or None if no limit is known.""" + if self.limit_bytes is not None and self.limit_bytes > 0: + return self.usage_bytes / self.limit_bytes + return None + + +def _read_cgroup_file(path: Path) -> Optional[str]: + """Read a cgroup file and return its stripped contents, or None if unavailable.""" + try: + return path.read_text().strip() + except (FileNotFoundError, PermissionError, OSError): + return None + + +def _read_cgroup_v2_memory() -> Optional[MemoryInfo]: + """ + Read memory usage from cgroup v2 files. + + Returns MemoryInfo if cgroup v2 files are available, None otherwise. + """ + usage_str = _read_cgroup_file(CGROUP_V2_MEMORY_CURRENT) + if usage_str is None: + return None + + try: + usage_bytes = int(usage_str) + except ValueError: + logger.debug("Could not parse cgroup v2 memory.current value: %s", usage_str) + return None + + limit_bytes: Optional[int] = None + max_str = _read_cgroup_file(CGROUP_V2_MEMORY_MAX) + if max_str is not None and max_str != CGROUP_NO_LIMIT: + try: + limit_bytes = int(max_str) + except ValueError: + logger.debug("Could not parse cgroup v2 memory.max value: %s", max_str) + + return MemoryInfo(usage_bytes=usage_bytes, limit_bytes=limit_bytes) + + +def _read_cgroup_v1_memory() -> Optional[MemoryInfo]: + """ + Read memory usage from cgroup v1 files (legacy fallback). + + Returns MemoryInfo if cgroup v1 files are available, None otherwise. + """ + usage_str = _read_cgroup_file(CGROUP_V1_MEMORY_USAGE) + if usage_str is None: + return None + + try: + usage_bytes = int(usage_str) + except ValueError: + logger.debug("Could not parse cgroup v1 memory usage value: %s", usage_str) + return None + + limit_bytes: Optional[int] = None + limit_str = _read_cgroup_file(CGROUP_V1_MEMORY_LIMIT) + if limit_str is not None: + try: + raw_limit = int(limit_str) + # Very large values indicate no real limit is set + if raw_limit < CGROUP_V1_NO_LIMIT_THRESHOLD: + limit_bytes = raw_limit + except ValueError: + logger.debug("Could not parse cgroup v1 memory limit value: %s", limit_str) + + return MemoryInfo(usage_bytes=usage_bytes, limit_bytes=limit_bytes) + + +def _read_rusage_memory() -> MemoryInfo: + """ + Fallback: read memory usage via resource.getrusage (works in non-containerized environments). + + Note: ru_maxrss is in kilobytes on Linux, bytes on macOS. + This fallback cannot determine the container memory limit. + """ + rusage = resource.getrusage(resource.RUSAGE_SELF) + # ru_maxrss is in kilobytes on Linux, but bytes on macOS + if sys.platform == "darwin": + usage_bytes = rusage.ru_maxrss + else: + usage_bytes = rusage.ru_maxrss * 1024 + return MemoryInfo(usage_bytes=usage_bytes, limit_bytes=None) + + +def get_memory_info() -> MemoryInfo: + """ + Get current container memory usage information. + + Attempts to read from (in order): + 1. cgroup v2 files (modern K8s pods) + 2. cgroup v1 files (legacy environments) + 3. resource.getrusage (local dev, CI) + + Returns a MemoryInfo dataclass with usage_bytes, limit_bytes, and usage_percent. + """ + # Try cgroup v2 first (most common in modern K8s) + info = _read_cgroup_v2_memory() + if info is not None: + return info + + # Try cgroup v1 (legacy) + info = _read_cgroup_v1_memory() + if info is not None: + return info + + # Fallback to rusage + return _read_rusage_memory() diff --git a/airbyte_cdk/metrics/metrics_client.py b/airbyte_cdk/metrics/metrics_client.py new file mode 100644 index 000000000..f820939db --- /dev/null +++ b/airbyte_cdk/metrics/metrics_client.py @@ -0,0 +1,220 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# + +""" +DogStatsD-based metrics client for Python source connectors. + +Provides metric emission for memory and resource monitoring. +Designed to be a graceful no-op when DD_AGENT_HOST is not set (local dev, CI). +""" + +import logging +import os +import time +from typing import Any, Optional + +from airbyte_cdk.metrics.memory import MemoryInfo, get_memory_info + +logger = logging.getLogger(__name__) + +# Metric names +METRIC_MEMORY_USAGE_BYTES = "cdk.memory.usage_bytes" +METRIC_MEMORY_LIMIT_BYTES = "cdk.memory.limit_bytes" + +# Default emission interval in seconds +DEFAULT_EMISSION_INTERVAL_SECONDS = 30.0 + + +class MetricsClient: + """ + DogStatsD metrics client for Python source connectors. + + Initializes a DogStatsD client when DD_AGENT_HOST is available, + otherwise all metric calls are silent no-ops. + """ + + def __init__(self) -> None: + self._statsd: Any = None + self._tags: list[str] = [] + self._last_emission_time: float = 0.0 + self._initialized = False + + def initialize(self) -> None: + """ + Initialize the DogStatsD client if DD_AGENT_HOST is available. + + Should be called once during connector startup. Safe to call multiple times. + """ + if self._initialized: + return + + self._initialized = True + dd_agent_host = os.environ.get("DD_AGENT_HOST") + if not dd_agent_host: + logger.debug("DD_AGENT_HOST not set; metrics emission disabled") + return + + port_str = os.environ.get("DD_DOGSTATSD_PORT") + if not port_str: + dd_dogstatsd_port = 8125 + else: + try: + dd_dogstatsd_port = int(port_str) + except ValueError: + logger.warning( + "Invalid DD_DOGSTATSD_PORT value %r; falling back to default port 8125", + port_str, + ) + dd_dogstatsd_port = 8125 + + try: + from datadog.dogstatsd import DogStatsd + + self._statsd = DogStatsd( + host=dd_agent_host, + port=dd_dogstatsd_port, + # Disable telemetry to reduce overhead + disable_telemetry=True, + ) + logger.info( + "DogStatsD metrics client initialized (host=%s, port=%d)", + dd_agent_host, + dd_dogstatsd_port, + ) + except ImportError: + logger.warning( + "datadog package not installed; metrics emission disabled. " + "Install with: pip install datadog" + ) + except Exception: + logger.warning( + "Failed to initialize DogStatsD client; metrics emission disabled", exc_info=True + ) + + # Build standard tags from environment + self._tags = self._build_tags() + + @property + def enabled(self) -> bool: + """Return True if the DogStatsD client is active and ready to emit metrics.""" + return self._statsd is not None + + def _build_tags(self) -> list[str]: + """Build standard metric tags from environment variables.""" + tags: list[str] = [] + + # DD_SERVICE and DD_VERSION are set by ConnectorApmSupportHelper + dd_service = os.environ.get("DD_SERVICE") + if dd_service: + tags.append(f"connector:{dd_service}") + + dd_version = os.environ.get("DD_VERSION") + if dd_version: + tags.append(f"version:{dd_version}") + + # Connection-level tags from platform env vars + connection_id = os.environ.get("CONNECTION_ID") + if connection_id: + tags.append(f"connection_id:{connection_id}") + + workspace_id = os.environ.get("WORKSPACE_ID") + if workspace_id: + tags.append(f"workspace_id:{workspace_id}") + + return tags + + def gauge(self, metric_name: str, value: float, extra_tags: Optional[list[str]] = None) -> None: + """ + Emit a gauge metric via DogStatsD. + + No-op if the client is not initialized or DD_AGENT_HOST is not set. + """ + if self._statsd is None: + return + + tags = self._tags + (extra_tags or []) + try: + # _statsd is a DogStatsd instance set during initialize(); call gauge directly + self._statsd.gauge(metric_name, value, tags=tags) + except Exception: + # Never let metric emission failures affect the sync + logger.debug("Failed to emit metric %s", metric_name, exc_info=True) + + def emit_memory_metrics(self) -> None: + """ + Read and emit all memory-related metrics. + + Emits: + - cdk.memory.usage_bytes: Current container memory usage + - cdk.memory.limit_bytes: Container memory limit (if known) + + The usage/limit ratio is not emitted as a separate metric — compute it + in Datadog using ``a / b`` formulas to avoid extra custom-metric costs. + + Also updates the last-emission timestamp so that subsequent calls to + ``should_emit`` / ``maybe_emit_memory_metrics`` respect the interval. + """ + if not self.enabled: + return + + # Update the last-emission timestamp to avoid duplicate snapshots + self._last_emission_time = time.monotonic() + + try: + info: MemoryInfo = get_memory_info() + + self.gauge(METRIC_MEMORY_USAGE_BYTES, float(info.usage_bytes)) + + if info.limit_bytes is not None: + self.gauge(METRIC_MEMORY_LIMIT_BYTES, float(info.limit_bytes)) + + except Exception: + # Never let metric collection failures affect the sync + logger.debug("Failed to collect memory metrics", exc_info=True) + + def should_emit(self, interval_seconds: float = DEFAULT_EMISSION_INTERVAL_SECONDS) -> bool: + """ + Check if enough time has passed since the last emission to emit again. + + Returns True if at least interval_seconds have elapsed since the last emission. + This is a pure query — it does not update any internal state. + """ + return time.monotonic() - self._last_emission_time >= interval_seconds + + def maybe_emit_memory_metrics( + self, interval_seconds: float = DEFAULT_EMISSION_INTERVAL_SECONDS + ) -> None: + """ + Emit memory metrics if the emission interval has elapsed. + + This is the primary method to call periodically during read() — it handles + both the timing check and the metric emission. + """ + if self.enabled and self.should_emit(interval_seconds): + self.emit_memory_metrics() + + +# Module-level singleton for convenience +_metrics_client: Optional[MetricsClient] = None + + +def get_metrics_client() -> MetricsClient: + """ + Get or create the module-level MetricsClient singleton. + + Note: The caller is responsible for calling ``initialize()`` on the + returned client before emitting metrics. Construction and initialization + are separate so that the caller controls *when* environment variables + are read and the DogStatsD connection is established. + """ + global _metrics_client + if _metrics_client is None: + _metrics_client = MetricsClient() + return _metrics_client + + +def reset_metrics_client() -> None: + """Reset the module-level singleton. Intended for testing only.""" + global _metrics_client + _metrics_client = None diff --git a/poetry.lock b/poetry.lock index 55c2fa668..367d77eaf 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1047,6 +1047,22 @@ files = [ marshmallow = ">=3.18.0,<4.0.0" typing-inspect = ">=0.4.0,<1" +[[package]] +name = "datadog" +version = "0.52.1" +description = "The Datadog Python library" +optional = true +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +groups = ["main"] +markers = "(python_version <= \"3.11\" or python_version >= \"3.12.0\") and extra == \"metrics\"" +files = [ + {file = "datadog-0.52.1-py2.py3-none-any.whl", hash = "sha256:b8c92cd761618ee062f114171067e4c400d48c9f0dad16cb285042439d9d5d4e"}, + {file = "datadog-0.52.1.tar.gz", hash = "sha256:44c6deb563c4522dba206fba2e2bb93d3b04113c40191851ba3a241d82b5fd0b"}, +] + +[package.dependencies] +requests = ">=2.6.0" + [[package]] name = "dateparser" version = "1.2.2" @@ -7039,10 +7055,11 @@ cffi = ["cffi (>=1.17,<2.0)", "cffi (>=2.0.0b)"] dev = ["pytest"] file-based = ["avro", "fastavro", "markdown", "openpyxl", "pdf2image", "pdfminer.six", "pyarrow", "pytesseract", "python-calamine", "python-snappy", "unstructured", "unstructured.pytesseract"] manifest-server = ["ddtrace", "fastapi", "uvicorn"] +metrics = ["datadog"] sql = ["sqlalchemy"] vector-db-based = ["cohere", "langchain_community", "langchain_core", "langchain_text_splitters", "openai", "tiktoken"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.14" -content-hash = "b785d39f246498c8facd7854999dbdbfb78808489a09922dd3a1551be331ea7d" +content-hash = "0b2ea534070d0cfe204b2747f1268e69b74637005a3d7ba3480dc9325f8b036e" diff --git a/pyproject.toml b/pyproject.toml index bcdab217b..1153c382b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,6 +88,7 @@ serpyco-rs = "^1.10.2" sqlalchemy = {version = "^2.0,!=2.0.36", optional = true } fastapi = { version = ">=0.116.1", optional = true } uvicorn = { version = ">=0.35.0", optional = true} +datadog = { version = ">=0.49.0", optional = true } ddtrace = { version = "^3", optional = true } xmltodict = ">=0.13,<0.15" anyascii = "^0.3.2" @@ -125,6 +126,7 @@ file-based = ["avro", "fastavro", "pyarrow", "unstructured", "pdf2image", "pdfmi vector-db-based = ["langchain_community", "langchain_core", "langchain_text_splitters", "openai", "cohere", "tiktoken"] sql = ["sqlalchemy"] dev = ["pytest"] +metrics = ["datadog"] manifest-server = ["fastapi", "uvicorn", "ddtrace"] [tool.poetry.scripts] @@ -216,7 +218,7 @@ python_versions = [ "3.13", ] optional_poetry_groups = ["dev"] -poetry_extras = ["file-based", "vector-db-based", "manifest-server"] +poetry_extras = ["file-based", "vector-db-based", "manifest-server", "metrics"] poe_tasks = ["check-ci"] mount_docker_socket = true diff --git a/unit_tests/metrics/__init__.py b/unit_tests/metrics/__init__.py new file mode 100644 index 000000000..89adeb6e6 --- /dev/null +++ b/unit_tests/metrics/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# diff --git a/unit_tests/metrics/test_memory.py b/unit_tests/metrics/test_memory.py new file mode 100644 index 000000000..99c288378 --- /dev/null +++ b/unit_tests/metrics/test_memory.py @@ -0,0 +1,178 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# + +"""Tests for airbyte_cdk.metrics.memory module.""" + +from pathlib import Path +from unittest.mock import patch + +import pytest + +from airbyte_cdk.metrics.memory import ( + MemoryInfo, + _read_cgroup_v1_memory, + _read_cgroup_v2_memory, + _read_rusage_memory, + get_memory_info, +) + + +class TestMemoryInfo: + def test_usage_percent_with_limit(self) -> None: + info = MemoryInfo(usage_bytes=500, limit_bytes=1000) + assert info.usage_percent == 0.5 + + def test_usage_percent_no_limit(self) -> None: + info = MemoryInfo(usage_bytes=500, limit_bytes=None) + assert info.usage_percent is None + + def test_usage_percent_zero_limit(self) -> None: + info = MemoryInfo(usage_bytes=500, limit_bytes=0) + assert info.usage_percent is None + + def test_frozen_dataclass(self) -> None: + info = MemoryInfo(usage_bytes=100, limit_bytes=200) + with pytest.raises(AttributeError): + info.usage_bytes = 300 # type: ignore[misc] + + +class TestCgroupV2Memory: + def test_reads_memory_current_and_max(self, tmp_path: Path) -> None: + current_file = tmp_path / "memory.current" + max_file = tmp_path / "memory.max" + current_file.write_text("104857600\n") + max_file.write_text("209715200\n") + + with ( + patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", current_file), + patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_MAX", max_file), + ): + info = _read_cgroup_v2_memory() + + assert info is not None + assert info.usage_bytes == 104857600 + assert info.limit_bytes == 209715200 + assert info.usage_percent == pytest.approx(0.5) + + def test_memory_max_is_max_string(self, tmp_path: Path) -> None: + """When cgroup reports 'max', it means no limit is set.""" + current_file = tmp_path / "memory.current" + max_file = tmp_path / "memory.max" + current_file.write_text("104857600\n") + max_file.write_text("max\n") + + with ( + patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", current_file), + patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_MAX", max_file), + ): + info = _read_cgroup_v2_memory() + + assert info is not None + assert info.usage_bytes == 104857600 + assert info.limit_bytes is None + assert info.usage_percent is None + + def test_returns_none_when_files_missing(self) -> None: + with patch( + "airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", Path("/nonexistent/path") + ): + info = _read_cgroup_v2_memory() + assert info is None + + def test_handles_invalid_content(self, tmp_path: Path) -> None: + current_file = tmp_path / "memory.current" + current_file.write_text("not_a_number\n") + + with patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", current_file): + info = _read_cgroup_v2_memory() + assert info is None + + +class TestCgroupV1Memory: + def test_reads_usage_and_limit(self, tmp_path: Path) -> None: + usage_file = tmp_path / "memory.usage_in_bytes" + limit_file = tmp_path / "memory.limit_in_bytes" + usage_file.write_text("104857600\n") + limit_file.write_text("209715200\n") + + with ( + patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_USAGE", usage_file), + patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_LIMIT", limit_file), + ): + info = _read_cgroup_v1_memory() + + assert info is not None + assert info.usage_bytes == 104857600 + assert info.limit_bytes == 209715200 + + def test_very_large_limit_treated_as_no_limit(self, tmp_path: Path) -> None: + """Very large cgroup v1 limits (near 2^63) indicate no real limit.""" + usage_file = tmp_path / "memory.usage_in_bytes" + limit_file = tmp_path / "memory.limit_in_bytes" + usage_file.write_text("104857600\n") + limit_file.write_text(f"{2**63}\n") + + with ( + patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_USAGE", usage_file), + patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_LIMIT", limit_file), + ): + info = _read_cgroup_v1_memory() + + assert info is not None + assert info.limit_bytes is None + + def test_returns_none_when_files_missing(self) -> None: + with patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_USAGE", Path("/nonexistent/path")): + info = _read_cgroup_v1_memory() + assert info is None + + +class TestRusageMemory: + def test_returns_memory_info(self) -> None: + info = _read_rusage_memory() + assert info.usage_bytes > 0 + assert info.limit_bytes is None + + +class TestGetMemoryInfo: + def test_prefers_cgroup_v2(self, tmp_path: Path) -> None: + current_file = tmp_path / "memory.current" + max_file = tmp_path / "memory.max" + current_file.write_text("100\n") + max_file.write_text("200\n") + + with ( + patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", current_file), + patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_MAX", max_file), + ): + info = get_memory_info() + + assert info.usage_bytes == 100 + assert info.limit_bytes == 200 + + def test_falls_back_to_cgroup_v1(self, tmp_path: Path) -> None: + usage_file = tmp_path / "memory.usage_in_bytes" + limit_file = tmp_path / "memory.limit_in_bytes" + usage_file.write_text("300\n") + limit_file.write_text("600\n") + + with ( + patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", Path("/nonexistent")), + patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_USAGE", usage_file), + patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_LIMIT", limit_file), + ): + info = get_memory_info() + + assert info.usage_bytes == 300 + assert info.limit_bytes == 600 + + def test_falls_back_to_rusage(self) -> None: + with ( + patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", Path("/nonexistent")), + patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_USAGE", Path("/nonexistent")), + ): + info = get_memory_info() + + assert info.usage_bytes > 0 + assert info.limit_bytes is None diff --git a/unit_tests/metrics/test_metrics_client.py b/unit_tests/metrics/test_metrics_client.py new file mode 100644 index 000000000..2e6f447a2 --- /dev/null +++ b/unit_tests/metrics/test_metrics_client.py @@ -0,0 +1,215 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# + +"""Tests for airbyte_cdk.metrics MetricsClient.""" + +import sys +import time +import types +from unittest.mock import MagicMock, patch + +import pytest + +from airbyte_cdk.metrics.memory import MemoryInfo + + +@pytest.fixture(autouse=True) +def _mock_datadog(monkeypatch: pytest.MonkeyPatch) -> MagicMock: + """Provide a mock datadog module so tests work regardless of whether the + optional ``datadog`` package is installed. ``monkeypatch`` automatically + restores ``sys.modules`` after each test, preventing cross-test pollution.""" + mock_cls = MagicMock() + mock_mod = types.ModuleType("datadog.dogstatsd") + mock_mod.DogStatsd = mock_cls # type: ignore[attr-defined] + mock_datadog = types.ModuleType("datadog") + mock_datadog.dogstatsd = mock_mod # type: ignore[attr-defined] + monkeypatch.setitem(sys.modules, "datadog", mock_datadog) + monkeypatch.setitem(sys.modules, "datadog.dogstatsd", mock_mod) + return mock_cls + + +from airbyte_cdk.metrics import ( # noqa: E402 + MetricsClient, + get_metrics_client, + reset_metrics_client, +) + + +def _make_enabled_client(mock_dogstatsd_cls: MagicMock) -> tuple[MetricsClient, MagicMock]: + """Helper to create an enabled MetricsClient with a mock DogStatsd instance.""" + mock_instance = MagicMock() + mock_dogstatsd_cls.reset_mock() + mock_dogstatsd_cls.return_value = mock_instance + + client = MetricsClient() + with ( + patch.dict("os.environ", {"DD_AGENT_HOST": "localhost"}, clear=True), + patch("datadog.dogstatsd.DogStatsd", mock_dogstatsd_cls), + ): + client.initialize() + return client, mock_instance + + +class TestMetricsClientInitialization: + def test_disabled_when_dd_agent_host_not_set(self) -> None: + client = MetricsClient() + with patch.dict("os.environ", {}, clear=True): + client.initialize() + assert not client.enabled + + def test_enabled_when_dd_agent_host_set(self, _mock_datadog: MagicMock) -> None: + client, _ = _make_enabled_client(_mock_datadog) + assert client.enabled + + def test_initialize_idempotent(self) -> None: + client = MetricsClient() + with patch.dict("os.environ", {}, clear=True): + client.initialize() + client.initialize() # should not raise + assert not client.enabled + + def test_disabled_when_datadog_import_fails(self) -> None: + client = MetricsClient() + with ( + patch.dict("os.environ", {"DD_AGENT_HOST": "localhost"}), + patch("datadog.dogstatsd.DogStatsd", side_effect=ImportError("No module")), + ): + client.initialize() + assert not client.enabled + + +class TestMetricsClientTags: + def test_builds_tags_from_env(self, _mock_datadog: MagicMock) -> None: + mock_instance = MagicMock() + _mock_datadog.reset_mock() + _mock_datadog.return_value = mock_instance + + client = MetricsClient() + env = { + "DD_AGENT_HOST": "localhost", + "DD_SERVICE": "airbyte/source-github", + "DD_VERSION": "1.2.3", + "CONNECTION_ID": "conn-123", + "WORKSPACE_ID": "ws-456", + } + with ( + patch.dict("os.environ", env, clear=True), + patch("datadog.dogstatsd.DogStatsd", _mock_datadog), + ): + client.initialize() + + assert "connector:airbyte/source-github" in client._tags + assert "version:1.2.3" in client._tags + assert "connection_id:conn-123" in client._tags + assert "workspace_id:ws-456" in client._tags + + +class TestMetricsClientGauge: + def test_gauge_noop_when_disabled(self) -> None: + client = MetricsClient() + # Should not raise even when not initialized + client.gauge("test.metric", 42.0) + + def test_gauge_emits_when_enabled(self, _mock_datadog: MagicMock) -> None: + client, mock_instance = _make_enabled_client(_mock_datadog) + + client.gauge("test.metric", 42.0) + mock_instance.gauge.assert_called_once_with("test.metric", 42.0, tags=client._tags) + + def test_gauge_with_extra_tags(self, _mock_datadog: MagicMock) -> None: + client, mock_instance = _make_enabled_client(_mock_datadog) + + client.gauge("test.metric", 42.0, extra_tags=["stream:users"]) + call_tags = mock_instance.gauge.call_args[1]["tags"] + assert "stream:users" in call_tags + + def test_gauge_swallows_exceptions(self, _mock_datadog: MagicMock) -> None: + client, mock_instance = _make_enabled_client(_mock_datadog) + mock_instance.gauge.side_effect = Exception("network error") + + # Should not raise + client.gauge("test.metric", 42.0) + + +class TestEmitMemoryMetrics: + def test_emits_all_metrics_when_enabled(self, _mock_datadog: MagicMock) -> None: + client, mock_instance = _make_enabled_client(_mock_datadog) + + mock_info = MemoryInfo(usage_bytes=100_000_000, limit_bytes=200_000_000) + with patch("airbyte_cdk.metrics.metrics_client.get_memory_info", return_value=mock_info): + client.emit_memory_metrics() + + gauge_calls = {call[0][0]: call[0][1] for call in mock_instance.gauge.call_args_list} + assert gauge_calls["cdk.memory.usage_bytes"] == 100_000_000.0 + assert gauge_calls["cdk.memory.limit_bytes"] == 200_000_000.0 + # usage_percent is intentionally NOT emitted as a metric; + # compute it in Datadog using a / b formulas instead. + assert "cdk.memory.usage_percent" not in gauge_calls + + def test_skips_limit_when_unknown(self, _mock_datadog: MagicMock) -> None: + client, mock_instance = _make_enabled_client(_mock_datadog) + + mock_info = MemoryInfo(usage_bytes=100_000_000, limit_bytes=None) + with patch("airbyte_cdk.metrics.metrics_client.get_memory_info", return_value=mock_info): + client.emit_memory_metrics() + + metric_names = [call[0][0] for call in mock_instance.gauge.call_args_list] + assert "cdk.memory.usage_bytes" in metric_names + assert "cdk.memory.limit_bytes" not in metric_names + + def test_noop_when_disabled(self) -> None: + client = MetricsClient() + # Should not raise + client.emit_memory_metrics() + + +class TestShouldEmit: + def test_emits_on_first_call(self) -> None: + client = MetricsClient() + assert client.should_emit(interval_seconds=30.0) + + def test_does_not_emit_before_interval(self) -> None: + client = MetricsClient() + assert client.should_emit(interval_seconds=30.0) + # Manually advance the timestamp (emit_memory_metrics is a no-op + # when the client is disabled, so set it directly). + client._last_emission_time = time.monotonic() + assert not client.should_emit(interval_seconds=30.0) + + def test_emits_after_interval(self) -> None: + client = MetricsClient() + assert client.should_emit(interval_seconds=0.01) + client._last_emission_time = time.monotonic() + time.sleep(0.02) + assert client.should_emit(interval_seconds=0.01) + + +class TestMaybeEmitMemoryMetrics: + def test_emits_on_interval(self, _mock_datadog: MagicMock) -> None: + client, mock_instance = _make_enabled_client(_mock_datadog) + + mock_info = MemoryInfo(usage_bytes=100, limit_bytes=200) + with patch("airbyte_cdk.metrics.metrics_client.get_memory_info", return_value=mock_info): + client.maybe_emit_memory_metrics(interval_seconds=0.0) + first_call_count = mock_instance.gauge.call_count + + # Should not emit again immediately with a long interval + client.maybe_emit_memory_metrics(interval_seconds=9999.0) + assert mock_instance.gauge.call_count == first_call_count + + def test_noop_when_disabled(self) -> None: + client = MetricsClient() + # Should not raise + client.maybe_emit_memory_metrics() + + +class TestGetMetricsClient: + def test_returns_singleton(self) -> None: + reset_metrics_client() + try: + client1 = get_metrics_client() + client2 = get_metrics_client() + assert client1 is client2 + finally: + reset_metrics_client()