-
Notifications
You must be signed in to change notification settings - Fork 44
feat: add memory & resource metrics to Python CDK via DogStatsD (Tier 1) #931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
3c98bd4
7ee981e
5fbcf2a
58978c7
c8b0197
b208bd0
7e9881e
f410aba
0f44eaa
4b4fc96
dd958d5
d951916
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |||||||||||||||||||||||||||||
| from airbyte_cdk.connector import TConfig | ||||||||||||||||||||||||||||||
| from airbyte_cdk.exception_handler import init_uncaught_exception_handler | ||||||||||||||||||||||||||||||
| from airbyte_cdk.logger import PRINT_BUFFER, init_logger, is_platform_debug_log_enabled | ||||||||||||||||||||||||||||||
| from airbyte_cdk.metrics import get_metrics_client | ||||||||||||||||||||||||||||||
| from airbyte_cdk.models import ( | ||||||||||||||||||||||||||||||
| AirbyteConnectionStatus, | ||||||||||||||||||||||||||||||
| AirbyteMessage, | ||||||||||||||||||||||||||||||
|
|
@@ -275,10 +276,19 @@ def read( | |||||||||||||||||||||||||||||
| if self.source.check_config_against_spec: | ||||||||||||||||||||||||||||||
| self.validate_connection(source_spec, config) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Initialize and emit initial memory metrics | ||||||||||||||||||||||||||||||
| metrics_client = get_metrics_client() | ||||||||||||||||||||||||||||||
| metrics_client.initialize() | ||||||||||||||||||||||||||||||
| metrics_client.emit_memory_metrics() | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows | ||||||||||||||||||||||||||||||
| stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) | ||||||||||||||||||||||||||||||
| for message in self.source.read(self.logger, config, catalog, state): | ||||||||||||||||||||||||||||||
| yield self.handle_record_counts(message, stream_message_counter) | ||||||||||||||||||||||||||||||
| # Periodically emit memory metrics (every 30s by default) | ||||||||||||||||||||||||||||||
| metrics_client.maybe_emit_memory_metrics() | ||||||||||||||||||||||||||||||
| # Emit final memory metrics snapshot | ||||||||||||||||||||||||||||||
| metrics_client.emit_memory_metrics() | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
| for message in self.source.read(self.logger, config, catalog, state): | |
| yield self.handle_record_counts(message, stream_message_counter) | |
| # Periodically emit memory metrics (every 30s by default) | |
| metrics_client.maybe_emit_memory_metrics() | |
| # Emit final memory metrics snapshot | |
| metrics_client.emit_memory_metrics() | |
| try: | |
| for message in self.source.read(self.logger, config, catalog, state): | |
| yield self.handle_record_counts(message, stream_message_counter) | |
| # Periodically emit memory metrics (every 30s by default) | |
| metrics_client.maybe_emit_memory_metrics() | |
| finally: | |
| # Emit final memory metrics snapshot | |
| metrics_client.emit_memory_metrics() |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,205 @@ | ||||||||||||||||||||||||||||
| # | ||||||||||||||||||||||||||||
| # Copyright (c) 2026 Airbyte, Inc., all rights reserved. | ||||||||||||||||||||||||||||
| # | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| Metrics module for Python source connectors. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Provides DogStatsD-based metric emission for memory and resource monitoring. | ||||||||||||||||||||||||||||
| Designed to be a graceful no-op when DD_AGENT_HOST is not set (local dev, CI). | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||||||
| import time | ||||||||||||||||||||||||||||
| from typing import Optional | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| from airbyte_cdk.metrics.memory import MemoryInfo, get_memory_info, get_python_heap_bytes | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Metric names | ||||||||||||||||||||||||||||
| METRIC_MEMORY_USAGE_BYTES = "cdk.memory.usage_bytes" | ||||||||||||||||||||||||||||
| METRIC_MEMORY_LIMIT_BYTES = "cdk.memory.limit_bytes" | ||||||||||||||||||||||||||||
| METRIC_MEMORY_USAGE_PERCENT = "cdk.memory.usage_percent" | ||||||||||||||||||||||||||||
| METRIC_MEMORY_PYTHON_HEAP_BYTES = "cdk.memory.python_heap_bytes" | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Default emission interval in seconds | ||||||||||||||||||||||||||||
| DEFAULT_EMISSION_INTERVAL_SECONDS = 30.0 | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| class MetricsClient: | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| DogStatsD metrics client for Python source connectors. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Initializes a DogStatsD client when DD_AGENT_HOST is available, | ||||||||||||||||||||||||||||
| otherwise all metric calls are silent no-ops. | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def __init__(self) -> None: | ||||||||||||||||||||||||||||
| self._statsd: Optional[object] = None | ||||||||||||||||||||||||||||
| self._tags: list[str] = [] | ||||||||||||||||||||||||||||
| self._last_emission_time: float = 0.0 | ||||||||||||||||||||||||||||
| self._initialized = False | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def initialize(self) -> None: | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| Initialize the DogStatsD client if DD_AGENT_HOST is available. | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Should be called once during connector startup. Safe to call multiple times. | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| if self._initialized: | ||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| self._initialized = True | ||||||||||||||||||||||||||||
| dd_agent_host = os.environ.get("DD_AGENT_HOST") | ||||||||||||||||||||||||||||
| if not dd_agent_host: | ||||||||||||||||||||||||||||
| logger.debug("DD_AGENT_HOST not set; metrics emission disabled") | ||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| dd_dogstatsd_port = int(os.environ.get("DD_DOGSTATSD_PORT", "8125")) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
| dd_dogstatsd_port = int(os.environ.get("DD_DOGSTATSD_PORT", "8125")) | |
| port_str = os.environ.get("DD_DOGSTATSD_PORT") | |
| if not port_str: | |
| dd_dogstatsd_port = 8125 | |
| else: | |
| try: | |
| dd_dogstatsd_port = int(port_str) | |
| except ValueError: | |
| logger.warning( | |
| "Invalid DD_DOGSTATSD_PORT value %r; falling back to default port 8125", | |
| port_str, | |
| ) | |
| dd_dogstatsd_port = 8125 |
Copilot
AI
Mar 5, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get_metrics_client() docstring says "The client is initialized lazily on first access", but the function currently only constructs the singleton and does not call initialize(). Either update the docstring to match the behavior, or call initialize() inside get_metrics_client() (keeping it idempotent).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read()emits an initial memory snapshot, butmaybe_emit_memory_metrics()will also emit on the first loop iteration becauseMetricsClient._last_emission_timestarts at 0.0 andshould_emit()treats that as "interval elapsed". This results in two near-identical snapshots at the beginning of every sync. Consider making the initial emission go throughmaybe_emit_memory_metrics(interval_seconds=0)(so it updates the last emission time) or haveemit_memory_metrics()update the last-emission timestamp when called directly.