Skip to content

Commit 3c98bd4

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
feat: add memory & resource metrics to Python CDK via DogStatsD (Tier 1)
Implements Tier 1 of the metrics initiative per issue #15943: - New airbyte_cdk/metrics/ module with DogStatsD client - cgroup v2/v1 memory reading with rusage fallback - Periodic metric emission during source read() operations - Graceful no-op when DD_AGENT_HOST is not set - 34 unit tests covering all code paths Metrics emitted: - cdk.memory.usage_bytes: Container memory usage - cdk.memory.limit_bytes: Container memory limit - cdk.memory.usage_percent: Usage/limit ratio - cdk.memory.python_heap_bytes: Python heap via tracemalloc Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 7f41401 commit 3c98bd4

7 files changed

Lines changed: 798 additions & 0 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from airbyte_cdk.connector import TConfig
2424
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
2525
from airbyte_cdk.logger import PRINT_BUFFER, init_logger, is_platform_debug_log_enabled
26+
from airbyte_cdk.metrics import get_metrics_client
2627
from airbyte_cdk.models import (
2728
AirbyteConnectionStatus,
2829
AirbyteMessage,
@@ -275,10 +276,19 @@ def read(
275276
if self.source.check_config_against_spec:
276277
self.validate_connection(source_spec, config)
277278

279+
# Initialize and emit initial memory metrics
280+
metrics_client = get_metrics_client()
281+
metrics_client.initialize()
282+
metrics_client.emit_memory_metrics()
283+
278284
# The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows
279285
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
280286
for message in self.source.read(self.logger, config, catalog, state):
281287
yield self.handle_record_counts(message, stream_message_counter)
288+
# Periodically emit memory metrics (every 30s by default)
289+
metrics_client.maybe_emit_memory_metrics()
290+
# Emit final memory metrics snapshot
291+
metrics_client.emit_memory_metrics()
282292
for message in self._emit_queued_messages(self.source):
283293
yield self.handle_record_counts(message, stream_message_counter)
284294

airbyte_cdk/metrics/__init__.py

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
#
2+
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
"""
6+
Metrics module for Python source connectors.
7+
8+
Provides DogStatsD-based metric emission for memory and resource monitoring.
9+
Designed to be a graceful no-op when DD_AGENT_HOST is not set (local dev, CI).
10+
"""
11+
12+
import logging
13+
import os
14+
import time
15+
from typing import Optional
16+
17+
from airbyte_cdk.metrics.memory import MemoryInfo, get_memory_info, get_python_heap_bytes
18+
19+
logger = logging.getLogger(__name__)
20+
21+
# Metric names
22+
METRIC_MEMORY_USAGE_BYTES = "cdk.memory.usage_bytes"
23+
METRIC_MEMORY_LIMIT_BYTES = "cdk.memory.limit_bytes"
24+
METRIC_MEMORY_USAGE_PERCENT = "cdk.memory.usage_percent"
25+
METRIC_MEMORY_PYTHON_HEAP_BYTES = "cdk.memory.python_heap_bytes"
26+
27+
# Default emission interval in seconds
28+
DEFAULT_EMISSION_INTERVAL_SECONDS = 30.0
29+
30+
31+
class MetricsClient:
32+
"""
33+
DogStatsD metrics client for Python source connectors.
34+
35+
Initializes a DogStatsD client when DD_AGENT_HOST is available,
36+
otherwise all metric calls are silent no-ops.
37+
"""
38+
39+
def __init__(self) -> None:
40+
self._statsd: Optional[object] = None
41+
self._tags: list[str] = []
42+
self._last_emission_time: float = 0.0
43+
self._initialized = False
44+
45+
def initialize(self) -> None:
46+
"""
47+
Initialize the DogStatsD client if DD_AGENT_HOST is available.
48+
49+
Should be called once during connector startup. Safe to call multiple times.
50+
"""
51+
if self._initialized:
52+
return
53+
54+
self._initialized = True
55+
dd_agent_host = os.environ.get("DD_AGENT_HOST")
56+
if not dd_agent_host:
57+
logger.debug("DD_AGENT_HOST not set; metrics emission disabled")
58+
return
59+
60+
dd_dogstatsd_port = int(os.environ.get("DD_DOGSTATSD_PORT", "8125"))
61+
62+
try:
63+
from datadog.dogstatsd import DogStatsd
64+
65+
self._statsd = DogStatsd(
66+
host=dd_agent_host,
67+
port=dd_dogstatsd_port,
68+
# Disable telemetry to reduce overhead
69+
disable_telemetry=True,
70+
)
71+
logger.info(
72+
"DogStatsD metrics client initialized (host=%s, port=%d)",
73+
dd_agent_host,
74+
dd_dogstatsd_port,
75+
)
76+
except ImportError:
77+
logger.warning(
78+
"datadog package not installed; metrics emission disabled. "
79+
"Install with: pip install datadog"
80+
)
81+
except Exception:
82+
logger.warning(
83+
"Failed to initialize DogStatsD client; metrics emission disabled", exc_info=True
84+
)
85+
86+
# Build standard tags from environment
87+
self._tags = self._build_tags()
88+
89+
@property
90+
def enabled(self) -> bool:
91+
"""Return True if the DogStatsD client is active and ready to emit metrics."""
92+
return self._statsd is not None
93+
94+
def _build_tags(self) -> list[str]:
95+
"""Build standard metric tags from environment variables."""
96+
tags: list[str] = []
97+
98+
# DD_SERVICE and DD_VERSION are set by ConnectorApmSupportHelper
99+
dd_service = os.environ.get("DD_SERVICE")
100+
if dd_service:
101+
tags.append(f"connector:{dd_service}")
102+
103+
dd_version = os.environ.get("DD_VERSION")
104+
if dd_version:
105+
tags.append(f"version:{dd_version}")
106+
107+
# Connection-level tags from platform env vars
108+
connection_id = os.environ.get("CONNECTION_ID")
109+
if connection_id:
110+
tags.append(f"connection_id:{connection_id}")
111+
112+
workspace_id = os.environ.get("WORKSPACE_ID")
113+
if workspace_id:
114+
tags.append(f"workspace_id:{workspace_id}")
115+
116+
return tags
117+
118+
def gauge(self, metric_name: str, value: float, extra_tags: Optional[list[str]] = None) -> None:
119+
"""
120+
Emit a gauge metric via DogStatsD.
121+
122+
No-op if the client is not initialized or DD_AGENT_HOST is not set.
123+
"""
124+
if self._statsd is None:
125+
return
126+
127+
tags = self._tags + (extra_tags or [])
128+
try:
129+
# _statsd is a DogStatsd instance set during initialize(); call gauge directly
130+
self._statsd.gauge(metric_name, value, tags=tags) # type: ignore[union-attr]
131+
except Exception:
132+
# Never let metric emission failures affect the sync
133+
logger.debug("Failed to emit metric %s", metric_name, exc_info=True)
134+
135+
def emit_memory_metrics(self) -> None:
136+
"""
137+
Read and emit all memory-related metrics.
138+
139+
Emits:
140+
- cdk.memory.usage_bytes: Current container memory usage
141+
- cdk.memory.limit_bytes: Container memory limit (if known)
142+
- cdk.memory.usage_percent: Usage/limit ratio (if limit is known)
143+
- cdk.memory.python_heap_bytes: Python-specific heap via tracemalloc
144+
"""
145+
if not self.enabled:
146+
return
147+
148+
try:
149+
info: MemoryInfo = get_memory_info()
150+
151+
self.gauge(METRIC_MEMORY_USAGE_BYTES, float(info.usage_bytes))
152+
153+
if info.limit_bytes is not None:
154+
self.gauge(METRIC_MEMORY_LIMIT_BYTES, float(info.limit_bytes))
155+
156+
if info.usage_percent is not None:
157+
self.gauge(METRIC_MEMORY_USAGE_PERCENT, info.usage_percent)
158+
159+
python_heap = get_python_heap_bytes()
160+
if python_heap is not None:
161+
self.gauge(METRIC_MEMORY_PYTHON_HEAP_BYTES, float(python_heap))
162+
163+
except Exception:
164+
# Never let metric collection failures affect the sync
165+
logger.debug("Failed to collect memory metrics", exc_info=True)
166+
167+
def should_emit(self, interval_seconds: float = DEFAULT_EMISSION_INTERVAL_SECONDS) -> bool:
168+
"""
169+
Check if enough time has passed since the last emission to emit again.
170+
171+
Returns True if at least interval_seconds have elapsed since the last emission.
172+
"""
173+
now = time.monotonic()
174+
if now - self._last_emission_time >= interval_seconds:
175+
self._last_emission_time = now
176+
return True
177+
return False
178+
179+
def maybe_emit_memory_metrics(
180+
self, interval_seconds: float = DEFAULT_EMISSION_INTERVAL_SECONDS
181+
) -> None:
182+
"""
183+
Emit memory metrics if the emission interval has elapsed.
184+
185+
This is the primary method to call periodically during read() — it handles
186+
both the timing check and the metric emission.
187+
"""
188+
if self.enabled and self.should_emit(interval_seconds):
189+
self.emit_memory_metrics()
190+
191+
192+
# Module-level singleton for convenience
193+
_metrics_client: Optional[MetricsClient] = None
194+
195+
196+
def get_metrics_client() -> MetricsClient:
197+
"""
198+
Get or create the module-level MetricsClient singleton.
199+
200+
The client is initialized lazily on first access.
201+
"""
202+
global _metrics_client
203+
if _metrics_client is None:
204+
_metrics_client = MetricsClient()
205+
return _metrics_client

0 commit comments

Comments
 (0)