Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ COPY dist/*.whl ./dist/
RUN poetry config virtualenvs.create false \
&& poetry install --only main --no-interaction --no-ansi || true

# Build and install the package
RUN pip install dist/*.whl
# Build and install the package with the metrics extra (for DogStatsD support).
# Two-step install: first install the wheel, then reinstall with extras using
# the resolved filename (shell glob + [extras] syntax don't mix in a single arg).
RUN pip install dist/*.whl \
&& pip install "$(ls dist/*.whl)[metrics]"

# Recreate the original structure
RUN mkdir -p source_declarative_manifest \
Expand Down
21 changes: 19 additions & 2 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from airbyte_cdk.connector import TConfig
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.logger import PRINT_BUFFER, init_logger, is_platform_debug_log_enabled
from airbyte_cdk.metrics import get_metrics_client
from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
Expand Down Expand Up @@ -275,10 +276,26 @@ def read(
if self.source.check_config_against_spec:
self.validate_connection(source_spec, config)

# Initialize and emit initial memory metrics
metrics_client = get_metrics_client()
metrics_client.initialize()
metrics_client.emit_memory_metrics()

# The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
_emit_check_counter = 0
try:
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
# Check for metric emission every 1000 records to avoid
# a time.monotonic() syscall on every single record.
_emit_check_counter += 1
if _emit_check_counter >= 1000:
_emit_check_counter = 0
metrics_client.maybe_emit_memory_metrics()
finally:
# Emit final memory metrics snapshot (runs even if the read loop raises)
metrics_client.emit_memory_metrics()
for message in self._emit_queued_messages(self.source):
yield self.handle_record_counts(message, stream_message_counter)

Expand Down
216 changes: 216 additions & 0 deletions airbyte_cdk/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
#
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
#

"""
Metrics module for Python source connectors.

Provides DogStatsD-based metric emission for memory and resource monitoring.
Designed to be a graceful no-op when DD_AGENT_HOST is not set (local dev, CI).
"""

import logging
import os
import time
from typing import Any, Optional

from airbyte_cdk.metrics.memory import MemoryInfo, get_memory_info

logger = logging.getLogger(__name__)

# Metric names
METRIC_MEMORY_USAGE_BYTES = "cdk.memory.usage_bytes"
METRIC_MEMORY_LIMIT_BYTES = "cdk.memory.limit_bytes"
METRIC_MEMORY_USAGE_PERCENT = "cdk.memory.usage_percent"

# Default emission interval in seconds
DEFAULT_EMISSION_INTERVAL_SECONDS = 30.0


class MetricsClient:
"""
DogStatsD metrics client for Python source connectors.

Initializes a DogStatsD client when DD_AGENT_HOST is available,
otherwise all metric calls are silent no-ops.
"""

def __init__(self) -> None:
self._statsd: Any = None
self._tags: list[str] = []
self._last_emission_time: float = 0.0
self._initialized = False

def initialize(self) -> None:
"""
Initialize the DogStatsD client if DD_AGENT_HOST is available.

Should be called once during connector startup. Safe to call multiple times.
"""
if self._initialized:
return

self._initialized = True
dd_agent_host = os.environ.get("DD_AGENT_HOST")
if not dd_agent_host:
logger.debug("DD_AGENT_HOST not set; metrics emission disabled")
return

port_str = os.environ.get("DD_DOGSTATSD_PORT")
if not port_str:
dd_dogstatsd_port = 8125
else:
try:
dd_dogstatsd_port = int(port_str)
except ValueError:
logger.warning(
"Invalid DD_DOGSTATSD_PORT value %r; falling back to default port 8125",
port_str,
)
dd_dogstatsd_port = 8125

try:
from datadog.dogstatsd import DogStatsd

self._statsd = DogStatsd(
host=dd_agent_host,
port=dd_dogstatsd_port,
# Disable telemetry to reduce overhead
disable_telemetry=True,
)
logger.info(
"DogStatsD metrics client initialized (host=%s, port=%d)",
dd_agent_host,
dd_dogstatsd_port,
)
except ImportError:
logger.warning(
"datadog package not installed; metrics emission disabled. "
"Install with: pip install datadog"
)
except Exception:
logger.warning(
"Failed to initialize DogStatsD client; metrics emission disabled", exc_info=True
)

# Build standard tags from environment
self._tags = self._build_tags()

@property
def enabled(self) -> bool:
"""Return True if the DogStatsD client is active and ready to emit metrics."""
return self._statsd is not None

def _build_tags(self) -> list[str]:
"""Build standard metric tags from environment variables."""
tags: list[str] = []

# DD_SERVICE and DD_VERSION are set by ConnectorApmSupportHelper
dd_service = os.environ.get("DD_SERVICE")
if dd_service:
tags.append(f"connector:{dd_service}")

dd_version = os.environ.get("DD_VERSION")
if dd_version:
tags.append(f"version:{dd_version}")

# Connection-level tags from platform env vars
connection_id = os.environ.get("CONNECTION_ID")
if connection_id:
tags.append(f"connection_id:{connection_id}")

workspace_id = os.environ.get("WORKSPACE_ID")
if workspace_id:
tags.append(f"workspace_id:{workspace_id}")

return tags

def gauge(self, metric_name: str, value: float, extra_tags: Optional[list[str]] = None) -> None:
"""
Emit a gauge metric via DogStatsD.

No-op if the client is not initialized or DD_AGENT_HOST is not set.
"""
if self._statsd is None:
return

tags = self._tags + (extra_tags or [])
try:
# _statsd is a DogStatsd instance set during initialize(); call gauge directly
self._statsd.gauge(metric_name, value, tags=tags)
except Exception:
# Never let metric emission failures affect the sync
logger.debug("Failed to emit metric %s", metric_name, exc_info=True)

def emit_memory_metrics(self) -> None:
"""
Read and emit all memory-related metrics.

Emits:
- cdk.memory.usage_bytes: Current container memory usage
- cdk.memory.limit_bytes: Container memory limit (if known)
- cdk.memory.usage_percent: Usage/limit ratio (if limit is known)

Also updates the last-emission timestamp so that subsequent calls to
``should_emit`` / ``maybe_emit_memory_metrics`` respect the interval.
"""
if not self.enabled:
return

# Update the last-emission timestamp to avoid duplicate snapshots
self._last_emission_time = time.monotonic()

try:
info: MemoryInfo = get_memory_info()

self.gauge(METRIC_MEMORY_USAGE_BYTES, float(info.usage_bytes))

if info.limit_bytes is not None:
self.gauge(METRIC_MEMORY_LIMIT_BYTES, float(info.limit_bytes))

if info.usage_percent is not None:
self.gauge(METRIC_MEMORY_USAGE_PERCENT, info.usage_percent)

except Exception:
# Never let metric collection failures affect the sync
logger.debug("Failed to collect memory metrics", exc_info=True)

def should_emit(self, interval_seconds: float = DEFAULT_EMISSION_INTERVAL_SECONDS) -> bool:
"""
Check if enough time has passed since the last emission to emit again.

Returns True if at least interval_seconds have elapsed since the last emission.
This is a pure query — it does not update any internal state.
"""
return time.monotonic() - self._last_emission_time >= interval_seconds

def maybe_emit_memory_metrics(
self, interval_seconds: float = DEFAULT_EMISSION_INTERVAL_SECONDS
) -> None:
"""
Emit memory metrics if the emission interval has elapsed.

This is the primary method to call periodically during read() — it handles
both the timing check and the metric emission.
"""
if self.enabled and self.should_emit(interval_seconds):
self.emit_memory_metrics()


# Module-level singleton for convenience
_metrics_client: Optional[MetricsClient] = None


def get_metrics_client() -> MetricsClient:
"""
Get or create the module-level MetricsClient singleton.

Note: The caller is responsible for calling ``initialize()`` on the
returned client before emitting metrics. Construction and initialization
are separate so that the caller controls *when* environment variables
are read and the DogStatsD connection is established.
"""
global _metrics_client
if _metrics_client is None:
_metrics_client = MetricsClient()
return _metrics_client
Loading
Loading