-
Notifications
You must be signed in to change notification settings - Fork 44
feat: add memory & resource metrics to Python CDK via DogStatsD (Tier 1) #931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
3c98bd4
7ee981e
5fbcf2a
58978c7
c8b0197
b208bd0
7e9881e
f410aba
0f44eaa
4b4fc96
dd958d5
d951916
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |||||||||||||||||||||||||||||
| from airbyte_cdk.connector import TConfig | ||||||||||||||||||||||||||||||
| from airbyte_cdk.exception_handler import init_uncaught_exception_handler | ||||||||||||||||||||||||||||||
| from airbyte_cdk.logger import PRINT_BUFFER, init_logger, is_platform_debug_log_enabled | ||||||||||||||||||||||||||||||
| from airbyte_cdk.metrics import get_metrics_client | ||||||||||||||||||||||||||||||
| from airbyte_cdk.models import ( | ||||||||||||||||||||||||||||||
| AirbyteConnectionStatus, | ||||||||||||||||||||||||||||||
| AirbyteMessage, | ||||||||||||||||||||||||||||||
|
|
@@ -275,10 +276,19 @@ def read( | |||||||||||||||||||||||||||||
| if self.source.check_config_against_spec: | ||||||||||||||||||||||||||||||
| self.validate_connection(source_spec, config) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Initialize and emit initial memory metrics | ||||||||||||||||||||||||||||||
| metrics_client = get_metrics_client() | ||||||||||||||||||||||||||||||
| metrics_client.initialize() | ||||||||||||||||||||||||||||||
| metrics_client.emit_memory_metrics() | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows | ||||||||||||||||||||||||||||||
| stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) | ||||||||||||||||||||||||||||||
| for message in self.source.read(self.logger, config, catalog, state): | ||||||||||||||||||||||||||||||
| yield self.handle_record_counts(message, stream_message_counter) | ||||||||||||||||||||||||||||||
| # Periodically emit memory metrics (every 30s by default) | ||||||||||||||||||||||||||||||
| metrics_client.maybe_emit_memory_metrics() | ||||||||||||||||||||||||||||||
| # Emit final memory metrics snapshot | ||||||||||||||||||||||||||||||
| metrics_client.emit_memory_metrics() | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
| for message in self.source.read(self.logger, config, catalog, state): | |
| yield self.handle_record_counts(message, stream_message_counter) | |
| # Periodically emit memory metrics (every 30s by default) | |
| metrics_client.maybe_emit_memory_metrics() | |
| # Emit final memory metrics snapshot | |
| metrics_client.emit_memory_metrics() | |
| try: | |
| for message in self.source.read(self.logger, config, catalog, state): | |
| yield self.handle_record_counts(message, stream_message_counter) | |
| # Periodically emit memory metrics (every 30s by default) | |
| metrics_client.maybe_emit_memory_metrics() | |
| finally: | |
| # Emit final memory metrics snapshot | |
| metrics_client.emit_memory_metrics() |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,199 @@ | ||||||||||||||||||||||||||||
| # | ||||||||||||||||||||||||||||
| # Copyright (c) 2026 Airbyte, Inc., all rights reserved. | ||||||||||||||||||||||||||||
| # | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| Metrics module for Python source connectors. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Provides DogStatsD-based metric emission for memory and resource monitoring. | ||||||||||||||||||||||||||||
| Designed to be a graceful no-op when DD_AGENT_HOST is not set (local dev, CI). | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||||||
| import time | ||||||||||||||||||||||||||||
| from typing import Any, Optional | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| from airbyte_cdk.metrics.memory import MemoryInfo, get_memory_info | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Metric names | ||||||||||||||||||||||||||||
| METRIC_MEMORY_USAGE_BYTES = "cdk.memory.usage_bytes" | ||||||||||||||||||||||||||||
| METRIC_MEMORY_LIMIT_BYTES = "cdk.memory.limit_bytes" | ||||||||||||||||||||||||||||
| METRIC_MEMORY_USAGE_PERCENT = "cdk.memory.usage_percent" | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Default emission interval in seconds | ||||||||||||||||||||||||||||
| DEFAULT_EMISSION_INTERVAL_SECONDS = 30.0 | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| class MetricsClient: | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| DogStatsD metrics client for Python source connectors. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Initializes a DogStatsD client when DD_AGENT_HOST is available, | ||||||||||||||||||||||||||||
| otherwise all metric calls are silent no-ops. | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def __init__(self) -> None: | ||||||||||||||||||||||||||||
| self._statsd: Any = None | ||||||||||||||||||||||||||||
| self._tags: list[str] = [] | ||||||||||||||||||||||||||||
| self._last_emission_time: float = 0.0 | ||||||||||||||||||||||||||||
| self._initialized = False | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def initialize(self) -> None: | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| Initialize the DogStatsD client if DD_AGENT_HOST is available. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Should be called once during connector startup. Safe to call multiple times. | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| if self._initialized: | ||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| self._initialized = True | ||||||||||||||||||||||||||||
| dd_agent_host = os.environ.get("DD_AGENT_HOST") | ||||||||||||||||||||||||||||
| if not dd_agent_host: | ||||||||||||||||||||||||||||
| logger.debug("DD_AGENT_HOST not set; metrics emission disabled") | ||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| dd_dogstatsd_port = int(os.environ.get("DD_DOGSTATSD_PORT", "8125")) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
| dd_dogstatsd_port = int(os.environ.get("DD_DOGSTATSD_PORT", "8125")) | |
| port_str = os.environ.get("DD_DOGSTATSD_PORT") | |
| if not port_str: | |
| dd_dogstatsd_port = 8125 | |
| else: | |
| try: | |
| dd_dogstatsd_port = int(port_str) | |
| except ValueError: | |
| logger.warning( | |
| "Invalid DD_DOGSTATSD_PORT value %r; falling back to default port 8125", | |
| port_str, | |
| ) | |
| dd_dogstatsd_port = 8125 |
Copilot
AI
Mar 5, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get_metrics_client() docstring says "The client is initialized lazily on first access", but the function currently only constructs the singleton and does not call initialize(). Either update the docstring to match the behavior, or call initialize() inside get_metrics_client() (keeping it idempotent).
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,155 @@ | ||
| # | ||
| # Copyright (c) 2026 Airbyte, Inc., all rights reserved. | ||
| # | ||
|
|
||
| """ | ||
| Memory and resource metrics reader for Python source connectors. | ||
|
|
||
| Reads container memory usage and limits from cgroup v2 files (standard in K8s pods), | ||
| with fallback to resource.getrusage for non-containerized environments. | ||
| """ | ||
|
|
||
| import logging | ||
| import resource | ||
| import sys | ||
| from dataclasses import dataclass | ||
| from pathlib import Path | ||
| from typing import Optional | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # cgroup v2 file paths (standard in modern K8s pods) | ||
| CGROUP_V2_MEMORY_CURRENT = Path("/sys/fs/cgroup/memory.current") | ||
| CGROUP_V2_MEMORY_MAX = Path("/sys/fs/cgroup/memory.max") | ||
|
|
||
| # cgroup v1 file paths (legacy, some older environments) | ||
| CGROUP_V1_MEMORY_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes") | ||
| CGROUP_V1_MEMORY_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes") | ||
|
|
||
| # "max" in cgroup v2 means no limit is set | ||
| CGROUP_NO_LIMIT = "max" | ||
|
|
||
| # Threshold for considering a cgroup v1 limit as "no limit" (very large values ~= PAGE_COUNTER_MAX) | ||
| # Values near 2^63 typically mean no limit is configured | ||
| CGROUP_V1_NO_LIMIT_THRESHOLD = 2**62 | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class MemoryInfo: | ||
| """Container memory usage information.""" | ||
|
|
||
| usage_bytes: int | ||
| limit_bytes: Optional[int] | ||
|
|
||
| @property | ||
| def usage_percent(self) -> Optional[float]: | ||
| """Return memory usage as a fraction of the limit (0.0 to 1.0), or None if no limit is known.""" | ||
| if self.limit_bytes is not None and self.limit_bytes > 0: | ||
| return self.usage_bytes / self.limit_bytes | ||
| return None | ||
|
|
||
|
|
||
| def _read_cgroup_file(path: Path) -> Optional[str]: | ||
| """Read a cgroup file and return its stripped contents, or None if unavailable.""" | ||
| try: | ||
| return path.read_text().strip() | ||
| except (FileNotFoundError, PermissionError, OSError): | ||
| return None | ||
|
|
||
|
|
||
| def _read_cgroup_v2_memory() -> Optional[MemoryInfo]: | ||
| """ | ||
| Read memory usage from cgroup v2 files. | ||
|
|
||
| Returns MemoryInfo if cgroup v2 files are available, None otherwise. | ||
| """ | ||
| usage_str = _read_cgroup_file(CGROUP_V2_MEMORY_CURRENT) | ||
| if usage_str is None: | ||
| return None | ||
|
|
||
| try: | ||
| usage_bytes = int(usage_str) | ||
| except ValueError: | ||
| logger.debug("Could not parse cgroup v2 memory.current value: %s", usage_str) | ||
| return None | ||
|
|
||
| limit_bytes: Optional[int] = None | ||
| max_str = _read_cgroup_file(CGROUP_V2_MEMORY_MAX) | ||
| if max_str is not None and max_str != CGROUP_NO_LIMIT: | ||
| try: | ||
| limit_bytes = int(max_str) | ||
| except ValueError: | ||
| logger.debug("Could not parse cgroup v2 memory.max value: %s", max_str) | ||
|
|
||
| return MemoryInfo(usage_bytes=usage_bytes, limit_bytes=limit_bytes) | ||
|
|
||
|
|
||
| def _read_cgroup_v1_memory() -> Optional[MemoryInfo]: | ||
| """ | ||
| Read memory usage from cgroup v1 files (legacy fallback). | ||
|
|
||
| Returns MemoryInfo if cgroup v1 files are available, None otherwise. | ||
| """ | ||
| usage_str = _read_cgroup_file(CGROUP_V1_MEMORY_USAGE) | ||
| if usage_str is None: | ||
| return None | ||
|
|
||
| try: | ||
| usage_bytes = int(usage_str) | ||
| except ValueError: | ||
| logger.debug("Could not parse cgroup v1 memory usage value: %s", usage_str) | ||
| return None | ||
|
|
||
| limit_bytes: Optional[int] = None | ||
| limit_str = _read_cgroup_file(CGROUP_V1_MEMORY_LIMIT) | ||
| if limit_str is not None: | ||
| try: | ||
| raw_limit = int(limit_str) | ||
| # Very large values indicate no real limit is set | ||
| if raw_limit < CGROUP_V1_NO_LIMIT_THRESHOLD: | ||
| limit_bytes = raw_limit | ||
| except ValueError: | ||
| logger.debug("Could not parse cgroup v1 memory limit value: %s", limit_str) | ||
|
|
||
| return MemoryInfo(usage_bytes=usage_bytes, limit_bytes=limit_bytes) | ||
|
|
||
|
|
||
| def _read_rusage_memory() -> MemoryInfo: | ||
| """ | ||
| Fallback: read memory usage via resource.getrusage (works in non-containerized environments). | ||
|
|
||
| Note: ru_maxrss is in kilobytes on Linux, bytes on macOS. | ||
| This fallback cannot determine the container memory limit. | ||
| """ | ||
| rusage = resource.getrusage(resource.RUSAGE_SELF) | ||
| # ru_maxrss is in kilobytes on Linux, but bytes on macOS | ||
| if sys.platform == "darwin": | ||
| usage_bytes = rusage.ru_maxrss | ||
| else: | ||
| usage_bytes = rusage.ru_maxrss * 1024 | ||
| return MemoryInfo(usage_bytes=usage_bytes, limit_bytes=None) | ||
|
|
||
|
|
||
| def get_memory_info() -> MemoryInfo: | ||
| """ | ||
| Get current container memory usage information. | ||
|
|
||
| Attempts to read from (in order): | ||
| 1. cgroup v2 files (modern K8s pods) | ||
| 2. cgroup v1 files (legacy environments) | ||
| 3. resource.getrusage (local dev, CI) | ||
|
|
||
| Returns a MemoryInfo dataclass with usage_bytes, limit_bytes, and usage_percent. | ||
| """ | ||
| # Try cgroup v2 first (most common in modern K8s) | ||
| info = _read_cgroup_v2_memory() | ||
| if info is not None: | ||
| return info | ||
|
|
||
| # Try cgroup v1 (legacy) | ||
| info = _read_cgroup_v1_memory() | ||
| if info is not None: | ||
| return info | ||
|
|
||
| # Fallback to rusage | ||
| return _read_rusage_memory() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read()emits an initial memory snapshot, butmaybe_emit_memory_metrics()will also emit on the first loop iteration becauseMetricsClient._last_emission_timestarts at 0.0 andshould_emit()treats that as "interval elapsed". This results in two near-identical snapshots at the beginning of every sync. Consider making the initial emission go throughmaybe_emit_memory_metrics(interval_seconds=0)(so it updates the last emission time) or haveemit_memory_metrics()update the last-emission timestamp when called directly.