Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from airbyte_cdk.connector import TConfig
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.logger import PRINT_BUFFER, init_logger, is_platform_debug_log_enabled
from airbyte_cdk.metrics import get_metrics_client
from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
Expand Down Expand Up @@ -275,10 +276,19 @@ def read(
if self.source.check_config_against_spec:
self.validate_connection(source_spec, config)

# Initialize and emit initial memory metrics
metrics_client = get_metrics_client()
metrics_client.initialize()
metrics_client.emit_memory_metrics()

# The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
# Periodically emit memory metrics (every 30s by default)
metrics_client.maybe_emit_memory_metrics()
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read() emits an initial memory snapshot, but maybe_emit_memory_metrics() will also emit on the first loop iteration because MetricsClient._last_emission_time starts at 0.0 and should_emit() treats that as "interval elapsed". This results in two near-identical snapshots at the beginning of every sync. Consider making the initial emission go through maybe_emit_memory_metrics(interval_seconds=0) (so it updates the last emission time) or have emit_memory_metrics() update the last-emission timestamp when called directly.

Copilot uses AI. Check for mistakes.
# Emit final memory metrics snapshot
metrics_client.emit_memory_metrics()
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "final memory metrics snapshot" is only emitted on the normal completion path. If self.source.read(...) raises (or the generator is otherwise aborted), the final snapshot won't run. If the intent is to always capture an end-of-sync measurement, wrap the read loop in a try/finally and emit the final snapshot in the finally block.

Suggested change
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
# Periodically emit memory metrics (every 30s by default)
metrics_client.maybe_emit_memory_metrics()
# Emit final memory metrics snapshot
metrics_client.emit_memory_metrics()
try:
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
# Periodically emit memory metrics (every 30s by default)
metrics_client.maybe_emit_memory_metrics()
finally:
# Emit final memory metrics snapshot
metrics_client.emit_memory_metrics()

Copilot uses AI. Check for mistakes.
for message in self._emit_queued_messages(self.source):
yield self.handle_record_counts(message, stream_message_counter)

Expand Down
199 changes: 199 additions & 0 deletions airbyte_cdk/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
#

"""
Metrics module for Python source connectors.

Provides DogStatsD-based metric emission for memory and resource monitoring.
Designed to be a graceful no-op when DD_AGENT_HOST is not set (local dev, CI).
"""

import logging
import os
import time
from typing import Any, Optional

from airbyte_cdk.metrics.memory import MemoryInfo, get_memory_info

logger = logging.getLogger(__name__)

# Metric names
METRIC_MEMORY_USAGE_BYTES = "cdk.memory.usage_bytes"
METRIC_MEMORY_LIMIT_BYTES = "cdk.memory.limit_bytes"
METRIC_MEMORY_USAGE_PERCENT = "cdk.memory.usage_percent"

# Default emission interval in seconds
DEFAULT_EMISSION_INTERVAL_SECONDS = 30.0


class MetricsClient:
"""
DogStatsD metrics client for Python source connectors.

Initializes a DogStatsD client when DD_AGENT_HOST is available,
otherwise all metric calls are silent no-ops.
"""

def __init__(self) -> None:
self._statsd: Any = None
self._tags: list[str] = []
self._last_emission_time: float = 0.0
self._initialized = False

def initialize(self) -> None:
"""
Initialize the DogStatsD client if DD_AGENT_HOST is available.

Should be called once during connector startup. Safe to call multiple times.
"""
if self._initialized:
return

self._initialized = True
dd_agent_host = os.environ.get("DD_AGENT_HOST")
if not dd_agent_host:
logger.debug("DD_AGENT_HOST not set; metrics emission disabled")
return

dd_dogstatsd_port = int(os.environ.get("DD_DOGSTATSD_PORT", "8125"))
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DD_DOGSTATSD_PORT is parsed with int(...) outside of any try/except. If the env var is set to a non-integer value, initialization will raise ValueError and can crash connector startup, which defeats the "graceful no-op" goal. Parse the port defensively (catch ValueError and either fall back to 8125 or disable metrics with a warning).

Suggested change
dd_dogstatsd_port = int(os.environ.get("DD_DOGSTATSD_PORT", "8125"))
port_str = os.environ.get("DD_DOGSTATSD_PORT")
if not port_str:
dd_dogstatsd_port = 8125
else:
try:
dd_dogstatsd_port = int(port_str)
except ValueError:
logger.warning(
"Invalid DD_DOGSTATSD_PORT value %r; falling back to default port 8125",
port_str,
)
dd_dogstatsd_port = 8125

Copilot uses AI. Check for mistakes.

try:
from datadog.dogstatsd import DogStatsd

self._statsd = DogStatsd(
host=dd_agent_host,
port=dd_dogstatsd_port,
# Disable telemetry to reduce overhead
disable_telemetry=True,
)
logger.info(
"DogStatsD metrics client initialized (host=%s, port=%d)",
dd_agent_host,
dd_dogstatsd_port,
)
except ImportError:
logger.warning(
"datadog package not installed; metrics emission disabled. "
"Install with: pip install datadog"
)
except Exception:
logger.warning(
"Failed to initialize DogStatsD client; metrics emission disabled", exc_info=True
)

# Build standard tags from environment
self._tags = self._build_tags()

@property
def enabled(self) -> bool:
"""Return True if the DogStatsD client is active and ready to emit metrics."""
return self._statsd is not None

def _build_tags(self) -> list[str]:
"""Build standard metric tags from environment variables."""
tags: list[str] = []

# DD_SERVICE and DD_VERSION are set by ConnectorApmSupportHelper
dd_service = os.environ.get("DD_SERVICE")
if dd_service:
tags.append(f"connector:{dd_service}")

dd_version = os.environ.get("DD_VERSION")
if dd_version:
tags.append(f"version:{dd_version}")

# Connection-level tags from platform env vars
connection_id = os.environ.get("CONNECTION_ID")
if connection_id:
tags.append(f"connection_id:{connection_id}")

workspace_id = os.environ.get("WORKSPACE_ID")
if workspace_id:
tags.append(f"workspace_id:{workspace_id}")

return tags

def gauge(self, metric_name: str, value: float, extra_tags: Optional[list[str]] = None) -> None:
"""
Emit a gauge metric via DogStatsD.

No-op if the client is not initialized or DD_AGENT_HOST is not set.
"""
if self._statsd is None:
return

tags = self._tags + (extra_tags or [])
try:
# _statsd is a DogStatsd instance set during initialize(); call gauge directly
self._statsd.gauge(metric_name, value, tags=tags)
except Exception:
# Never let metric emission failures affect the sync
logger.debug("Failed to emit metric %s", metric_name, exc_info=True)

def emit_memory_metrics(self) -> None:
"""
Read and emit all memory-related metrics.

Emits:
- cdk.memory.usage_bytes: Current container memory usage
- cdk.memory.limit_bytes: Container memory limit (if known)
- cdk.memory.usage_percent: Usage/limit ratio (if limit is known)
"""
if not self.enabled:
return

try:
info: MemoryInfo = get_memory_info()

self.gauge(METRIC_MEMORY_USAGE_BYTES, float(info.usage_bytes))

if info.limit_bytes is not None:
self.gauge(METRIC_MEMORY_LIMIT_BYTES, float(info.limit_bytes))

if info.usage_percent is not None:
self.gauge(METRIC_MEMORY_USAGE_PERCENT, info.usage_percent)

except Exception:
# Never let metric collection failures affect the sync
logger.debug("Failed to collect memory metrics", exc_info=True)

def should_emit(self, interval_seconds: float = DEFAULT_EMISSION_INTERVAL_SECONDS) -> bool:
"""
Check if enough time has passed since the last emission to emit again.

Returns True if at least interval_seconds have elapsed since the last emission.
"""
now = time.monotonic()
if now - self._last_emission_time >= interval_seconds:
self._last_emission_time = now
return True
return False

def maybe_emit_memory_metrics(
self, interval_seconds: float = DEFAULT_EMISSION_INTERVAL_SECONDS
) -> None:
"""
Emit memory metrics if the emission interval has elapsed.

This is the primary method to call periodically during read() — it handles
both the timing check and the metric emission.
"""
if self.enabled and self.should_emit(interval_seconds):
self.emit_memory_metrics()


# Module-level singleton for convenience
_metrics_client: Optional[MetricsClient] = None


def get_metrics_client() -> MetricsClient:
"""
Get or create the module-level MetricsClient singleton.

The client is initialized lazily on first access.
"""
global _metrics_client
if _metrics_client is None:
_metrics_client = MetricsClient()
return _metrics_client
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get_metrics_client() docstring says "The client is initialized lazily on first access", but the function currently only constructs the singleton and does not call initialize(). Either update the docstring to match the behavior, or call initialize() inside get_metrics_client() (keeping it idempotent).

Copilot uses AI. Check for mistakes.
155 changes: 155 additions & 0 deletions airbyte_cdk/metrics/memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
#

"""
Memory and resource metrics reader for Python source connectors.

Reads container memory usage and limits from cgroup v2 files (standard in K8s pods),
with fallback to resource.getrusage for non-containerized environments.
"""

import logging
import resource
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

logger = logging.getLogger(__name__)

# cgroup v2 file paths (standard in modern K8s pods)
CGROUP_V2_MEMORY_CURRENT = Path("/sys/fs/cgroup/memory.current")
CGROUP_V2_MEMORY_MAX = Path("/sys/fs/cgroup/memory.max")

# cgroup v1 file paths (legacy, some older environments)
CGROUP_V1_MEMORY_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes")
CGROUP_V1_MEMORY_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes")

# "max" in cgroup v2 means no limit is set
CGROUP_NO_LIMIT = "max"

# Threshold for considering a cgroup v1 limit as "no limit" (very large values ~= PAGE_COUNTER_MAX)
# Values near 2^63 typically mean no limit is configured
CGROUP_V1_NO_LIMIT_THRESHOLD = 2**62


@dataclass(frozen=True)
class MemoryInfo:
"""Container memory usage information."""

usage_bytes: int
limit_bytes: Optional[int]

@property
def usage_percent(self) -> Optional[float]:
"""Return memory usage as a fraction of the limit (0.0 to 1.0), or None if no limit is known."""
if self.limit_bytes is not None and self.limit_bytes > 0:
return self.usage_bytes / self.limit_bytes
return None


def _read_cgroup_file(path: Path) -> Optional[str]:
"""Read a cgroup file and return its stripped contents, or None if unavailable."""
try:
return path.read_text().strip()
except (FileNotFoundError, PermissionError, OSError):
return None


def _read_cgroup_v2_memory() -> Optional[MemoryInfo]:
"""
Read memory usage from cgroup v2 files.

Returns MemoryInfo if cgroup v2 files are available, None otherwise.
"""
usage_str = _read_cgroup_file(CGROUP_V2_MEMORY_CURRENT)
if usage_str is None:
return None

try:
usage_bytes = int(usage_str)
except ValueError:
logger.debug("Could not parse cgroup v2 memory.current value: %s", usage_str)
return None

limit_bytes: Optional[int] = None
max_str = _read_cgroup_file(CGROUP_V2_MEMORY_MAX)
if max_str is not None and max_str != CGROUP_NO_LIMIT:
try:
limit_bytes = int(max_str)
except ValueError:
logger.debug("Could not parse cgroup v2 memory.max value: %s", max_str)

return MemoryInfo(usage_bytes=usage_bytes, limit_bytes=limit_bytes)


def _read_cgroup_v1_memory() -> Optional[MemoryInfo]:
"""
Read memory usage from cgroup v1 files (legacy fallback).

Returns MemoryInfo if cgroup v1 files are available, None otherwise.
"""
usage_str = _read_cgroup_file(CGROUP_V1_MEMORY_USAGE)
if usage_str is None:
return None

try:
usage_bytes = int(usage_str)
except ValueError:
logger.debug("Could not parse cgroup v1 memory usage value: %s", usage_str)
return None

limit_bytes: Optional[int] = None
limit_str = _read_cgroup_file(CGROUP_V1_MEMORY_LIMIT)
if limit_str is not None:
try:
raw_limit = int(limit_str)
# Very large values indicate no real limit is set
if raw_limit < CGROUP_V1_NO_LIMIT_THRESHOLD:
limit_bytes = raw_limit
except ValueError:
logger.debug("Could not parse cgroup v1 memory limit value: %s", limit_str)

return MemoryInfo(usage_bytes=usage_bytes, limit_bytes=limit_bytes)


def _read_rusage_memory() -> MemoryInfo:
"""
Fallback: read memory usage via resource.getrusage (works in non-containerized environments).

Note: ru_maxrss is in kilobytes on Linux, bytes on macOS.
This fallback cannot determine the container memory limit.
"""
rusage = resource.getrusage(resource.RUSAGE_SELF)
# ru_maxrss is in kilobytes on Linux, but bytes on macOS
if sys.platform == "darwin":
usage_bytes = rusage.ru_maxrss
else:
usage_bytes = rusage.ru_maxrss * 1024
return MemoryInfo(usage_bytes=usage_bytes, limit_bytes=None)


def get_memory_info() -> MemoryInfo:
"""
Get current container memory usage information.

Attempts to read from (in order):
1. cgroup v2 files (modern K8s pods)
2. cgroup v1 files (legacy environments)
3. resource.getrusage (local dev, CI)

Returns a MemoryInfo dataclass with usage_bytes, limit_bytes, and usage_percent.
"""
# Try cgroup v2 first (most common in modern K8s)
info = _read_cgroup_v2_memory()
if info is not None:
return info

# Try cgroup v1 (legacy)
info = _read_cgroup_v1_memory()
if info is not None:
return info

# Fallback to rusage
return _read_rusage_memory()
Loading
Loading