Skip to content

feat: add memory & resource metrics to Python CDK via DogStatsD (Tier 1)#931

Closed
devin-ai-integration[bot] wants to merge 12 commits intomainfrom
devin/1772669295-add-memory-metrics-module
Closed

feat: add memory & resource metrics to Python CDK via DogStatsD (Tier 1)#931
devin-ai-integration[bot] wants to merge 12 commits intomainfrom
devin/1772669295-add-memory-metrics-module

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot commented Mar 5, 2026

(AI-Triage PR) Add memory & resource metrics to Python CDK via DogStatsD

Summary

Implements Tier 1 of the memory/resource metrics initiative per airbytehq/airbyte-internal-issues#15943. Adds a new airbyte_cdk/metrics/ module that emits container memory metrics via DogStatsD during source read() operations.

New files:

  • airbyte_cdk/metrics/memory.py — Reads container memory from cgroup v2 → cgroup v1 → resource.getrusage() fallback chain
  • airbyte_cdk/metrics/metrics_client.pyMetricsClient class, singleton, and constants wrapping DogStatsD with graceful no-op when DD_AGENT_HOST is absent
  • airbyte_cdk/metrics/__init__.py — Thin re-exports only (follows repo convention for __init__.py)
  • unit_tests/metrics/test_memory.py — 15 tests covering cgroup v2, v1, and rusage fallback
  • unit_tests/metrics/test_metrics_client.py — 18 tests covering initialization, gauge emission, timing, tags, and singleton behavior

Modified files:

  • pyproject.toml — Adds datadog as an optional dependency under a new metrics extra
  • poetry.lock — Regenerated to include the new datadog dependency
  • Dockerfile — Installs the metrics extra in the Docker image (two-step install to work around shell glob + [extras] syntax conflict)
  • airbyte_cdk/entrypoint.py — Integrates periodic metric emission into the read() loop (initial → every 30s via counter-based throttling → final snapshot in try/finally)

Metrics emitted: cdk.memory.usage_bytes, cdk.memory.limit_bytes, cdk.memory.usage_percent

All metric emission is opt-in: silent no-op unless DD_AGENT_HOST is set by the platform's ConnectorApmSupportHelper.

Updates since last revision

  • Moved MetricsClient out of __init__.py — Per reviewer feedback, MetricsClient class, singleton logic, and constants now live in airbyte_cdk/metrics/metrics_client.py. __init__.py is thin re-exports only (33 lines), following the repo convention seen in airbyte_cdk/models/__init__.py, airbyte_cdk/sources/__init__.py, etc. Import paths are unchanged.
  • Added reset_metrics_client() — Public API to reset the singleton, replacing direct mutation of _metrics_client in tests. Tests now use this function for cleanup instead of reaching into private module state.
  • should_emit() is now a pure query — Removed the _last_emission_time side-effect from should_emit(). Only emit_memory_metrics() advances the timestamp, eliminating the duplicate update when called via maybe_emit_memory_metrics().
  • Replaced sys.modules monkey-patching with pytest fixture — The test file's module-level sys.modules injection (which persisted for the entire test session) is replaced with a monkeypatch-based autouse fixture that automatically restores sys.modules after each test.
  • Counter-based throttling in read loopmaybe_emit_memory_metrics() is now called every 1000 records instead of on every record, avoiding a time.monotonic() syscall on every yielded message.
  • Dockerfile fixedpip install "dist/*.whl[metrics]" failed because the quoted glob is passed literally to pip. Now uses a two-step install: pip install dist/*.whl && pip install "$(ls dist/*.whl)[metrics]".
  • Defensive DD_DOGSTATSD_PORT parsing — Invalid port values now log a warning and fall back to 8125 instead of crashing.
  • try/finally for final metrics snapshotentrypoint.py read loop now emits a final memory snapshot even if the loop raises, ensuring we capture peak memory usage.

Earlier updates:

  • Removed tracemalloc / python_heap_bytes — Per reviewer feedback, get_python_heap_bytes() and METRIC_MEMORY_PYTHON_HEAP_BYTES have been removed. tracemalloc.start() hooks into CPython's allocator with ~10-30% runtime overhead, and was being activated silently for all production connectors. The three cgroup-based metrics are sufficient for container memory pressure visibility. A separate issue tracks adding tracemalloc as an opt-in feature behind CDK_TRACEMALLOC_ENABLED.
  • Fixed macOS ru_maxrss platform differencesys.platform == "darwin" check added to handle ru_maxrss units (bytes on macOS, kilobytes on Linux).

Review & Testing Checklist for Human

  • entrypoint.py modification safety — This PR modifies the core read() loop. Any bug here affects all Python connectors. Review the integration points carefully:
    • Counter-based throttling means metrics could be delayed by up to 1000 records before the first periodic check (first emission is still immediate).
    • try/finally ensures final metrics snapshot even if the read loop raises, but verify this doesn't suppress exceptions or affect error handling.
  • Dockerfile two-step pip install — The pip install dist/*.whl && pip install "$(ls dist/*.whl)[metrics]" pattern works around shell glob + [extras] syntax conflict. Verify this is acceptable for the CDK Dockerfile and doesn't break any build workflows.
  • _statsd typed as Any — All calls to self._statsd.gauge() bypass MyPy checking. If the DogStatsd API changes in a future version, type errors will not be caught statically. Consider whether a TYPE_CHECKING-guarded conditional import of DogStatsd would be preferable.
  • Singleton lifecycleget_metrics_client() returns a module-level singleton. A reset_metrics_client() function is now available for tests, but the singleton is never cleaned up in production. Verify this is acceptable for the connector execution model (one process per sync).
  • Error swallowinggauge() and emit_memory_metrics() silently catch and log all exceptions. This prevents metrics failures from breaking syncs, but could mask real issues (e.g., auth failures, OOM during metric collection). Consider whether debug logs are sufficient or if warnings should be emitted for persistent failures.

Recommended test plan:

  1. Install with poetry install --extras metrics and verify datadog is installed.
  2. Run unit tests (poetry run pytest unit_tests/metrics/ -v) to verify all 33 tests pass.
  3. Manually test a connector with DD_AGENT_HOST=localhost (and a local DogStatsD agent) to verify metrics appear in Datadog. Check that cdk.memory.* gauges are present with correct tags (connector, version, connection_id, workspace_id).
  4. Manually test a connector without DD_AGENT_HOST set and verify no errors/warnings are logged (graceful no-op behavior).
  5. Run a connector with DD_AGENT_HOST set but no DogStatsD agent listening, verify the sync completes successfully and only debug logs are emitted (no warnings/errors).
  6. Verify the Docker image builds successfully with the two-step pip install and includes the datadog package.

Notes

Implements Tier 1 of the metrics initiative per issue #15943:
- New airbyte_cdk/metrics/ module with DogStatsD client
- cgroup v2/v1 memory reading with rusage fallback
- Periodic metric emission during source read() operations
- Graceful no-op when DD_AGENT_HOST is not set
- 34 unit tests covering all code paths

Metrics emitted:
- cdk.memory.usage_bytes: Container memory usage
- cdk.memory.limit_bytes: Container memory limit
- cdk.memory.usage_percent: Usage/limit ratio
- cdk.memory.python_heap_bytes: Python heap via tracemalloc

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 5, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1772669295-add-memory-metrics-module#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1772669295-add-memory-metrics-module

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 5, 2026

PyTest Results (Full)

3 926 tests   3 914 ✅  11m 10s ⏱️
    1 suites     12 💤
    1 files        0 ❌

Results for commit d951916.

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 5, 2026

PyTest Results (Fast)

3 923 tests  +54   3 911 ✅ +54   6m 47s ⏱️ -4s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit d951916. ± Comparison against base commit 7f41401.

♻️ This comment has been updated with latest results.

- Regenerate poetry.lock after adding datadog optional dependency
- Remove unused imports (CGROUP_V1_MEMORY_LIMIT, etc.) from test_memory.py
- Fix duplicate import style in test_metrics_client.py

Co-Authored-By: bot_apk <apk@cognition.ai>
…tatsd

Co-Authored-By: bot_apk <apk@cognition.ai>
@pnilan
Copy link
Copy Markdown
Contributor

Review Feedback: Remove tracemalloc / python_heap_bytes from this PR

The three cgroup-based metrics (cdk.memory.usage_bytes, cdk.memory.limit_bytes, cdk.memory.usage_percent) are exactly what's needed here — they're free to collect (just reading a file from /sys/fs/cgroup/) and give the full picture of container memory pressure.

However, get_python_heap_bytes() and the cdk.memory.python_heap_bytes metric should be removed from this PR. Here's why:

  • tracemalloc.start() hooks into CPython's memory allocator and tracks every malloc/free call for the rest of the process. It's a debugging/profiling tool with meaningful runtime overhead (~10-30% depending on workload).
  • It gets activated silently whenever DD_AGENT_HOST is set, which will be all production connectors. There's no way to get the cheap cgroup metrics without also opting into this.
  • The metric it provides (Python-specific heap) is a subset of what cgroup usage_bytes already reports. It's useful for diagnosing where within Python memory is allocated, but that's a debugging investigation — not an ongoing production metric.

Action: Please remove get_python_heap_bytes() from memory.py, remove the METRIC_MEMORY_PYTHON_HEAP_BYTES gauge from MetricsClient.emit_memory_metrics(), and remove the associated tests.

A separate issue has been created to add tracemalloc-based heap metrics as an opt-in feature gated behind an explicit env var flag (e.g., CDK_TRACEMALLOC_ENABLED=true).

tracemalloc.start() has ~10-30% runtime overhead and was being activated
silently for all production connectors. The cgroup-based metrics already
provide container memory pressure visibility. tracemalloc-based heap
metrics will be added separately behind an explicit opt-in env var flag.

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Done in c8b0197 — removed get_python_heap_bytes(), METRIC_MEMORY_PYTHON_HEAP_BYTES, and associated tests. The PR now only emits the three cgroup-based metrics (usage_bytes, limit_bytes, usage_percent). All local checks pass (ruff, mypy, 33 tests in 0.14s).

@pnilan
Copy link
Copy Markdown
Contributor

Nit: _read_rusage_memory() reports wrong values on macOS

In memory.py, the rusage fallback does:

usage_bytes = rusage.ru_maxrss * 1024

This assumes Linux, where ru_maxrss is in kilobytes. On macOS, ru_maxrss is already in bytes, so this will report ~1000x inflated values.

A simple fix:

import sys

if sys.platform == "darwin":
    usage_bytes = rusage.ru_maxrss
else:
    usage_bytes = rusage.ru_maxrss * 1024

Low priority since this fallback only triggers when cgroups aren't available (i.e., local dev, not production K8s pods), but easy to fix.

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Fixed in b208bd0 — added sys.platform == "darwin" check so ru_maxrss is treated as bytes on macOS and kilobytes on Linux. All local checks still pass (33 tests, ruff, mypy).

@pnilan
Copy link
Copy Markdown
Contributor

Patrick Nilan (pnilan) commented Mar 5, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/22736479691

@pnilan Patrick Nilan (pnilan) marked this pull request as ready for review March 5, 2026 21:00
Copilot AI review requested due to automatic review settings March 5, 2026 21:00
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

The prerelease workflow's Validate PR title check is failing because the current title (AI-Triage PR) Add memory & resource metrics... doesn't follow conventional commits format. Could you update the PR title to something like:

feat: add memory & resource metrics to Python CDK via DogStatsD (AI-Triage PR, Tier 1)

The main CI's Validate PR title check passed on the original workflow, but the prerelease workflow triggered a second instance that enforces the conventional commit prefix.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds opt-in (via DD_AGENT_HOST) container memory telemetry to the Python CDK by introducing a small metrics layer that reads memory usage/limits (cgroups → rusage fallback) and emits DogStatsD gauges during source.read() execution, aligning with the Tier 1 memory/resource metrics initiative.

Changes:

  • Introduces airbyte_cdk.metrics with a MetricsClient wrapper around DogStatsD and a cgroup-based memory reader with fallbacks.
  • Integrates periodic memory metric emission into the core AirbyteEntrypoint.read() loop (initial / periodic / final).
  • Adds datadog as an optional dependency under a new metrics extra, plus unit tests for memory reading and metrics client behavior.

Reviewed changes

Copilot reviewed 7 out of 8 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
airbyte_cdk/metrics/memory.py Implements cgroup v2/v1 and resource.getrusage() fallback chain to produce MemoryInfo.
airbyte_cdk/metrics/__init__.py Adds MetricsClient, metric constants, emission interval logic, and a module-level singleton accessor.
airbyte_cdk/entrypoint.py Hooks metrics initialization and emission into the read() loop for all Python connectors.
pyproject.toml Adds optional datadog dependency and metrics extra.
poetry.lock Updates lockfile to include datadog and extra metadata.
unit_tests/metrics/test_memory.py Adds unit tests for cgroup v2/v1 parsing and rusage fallback behavior.
unit_tests/metrics/test_metrics_client.py Adds unit tests for client initialization, tags, gauge emission, timing/interval logic, and singleton behavior.
unit_tests/metrics/__init__.py Adds package marker for the new metrics unit test package.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +279 to +289
# Initialize and emit initial memory metrics
metrics_client = get_metrics_client()
metrics_client.initialize()
metrics_client.emit_memory_metrics()

# The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
# Periodically emit memory metrics (every 30s by default)
metrics_client.maybe_emit_memory_metrics()
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read() emits an initial memory snapshot, but maybe_emit_memory_metrics() will also emit on the first loop iteration because MetricsClient._last_emission_time starts at 0.0 and should_emit() treats that as "interval elapsed". This results in two near-identical snapshots at the beginning of every sync. Consider making the initial emission go through maybe_emit_memory_metrics(interval_seconds=0) (so it updates the last emission time) or have emit_memory_metrics() update the last-emission timestamp when called directly.

Copilot uses AI. Check for mistakes.
logger.debug("DD_AGENT_HOST not set; metrics emission disabled")
return

dd_dogstatsd_port = int(os.environ.get("DD_DOGSTATSD_PORT", "8125"))
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DD_DOGSTATSD_PORT is parsed with int(...) outside of any try/except. If the env var is set to a non-integer value, initialization will raise ValueError and can crash connector startup, which defeats the "graceful no-op" goal. Parse the port defensively (catch ValueError and either fall back to 8125 or disable metrics with a warning).

Suggested change
dd_dogstatsd_port = int(os.environ.get("DD_DOGSTATSD_PORT", "8125"))
port_str = os.environ.get("DD_DOGSTATSD_PORT")
if not port_str:
dd_dogstatsd_port = 8125
else:
try:
dd_dogstatsd_port = int(port_str)
except ValueError:
logger.warning(
"Invalid DD_DOGSTATSD_PORT value %r; falling back to default port 8125",
port_str,
)
dd_dogstatsd_port = 8125

Copilot uses AI. Check for mistakes.
Comment on lines +190 to +199
def get_metrics_client() -> MetricsClient:
"""
Get or create the module-level MetricsClient singleton.

The client is initialized lazily on first access.
"""
global _metrics_client
if _metrics_client is None:
_metrics_client = MetricsClient()
return _metrics_client
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get_metrics_client() docstring says "The client is initialized lazily on first access", but the function currently only constructs the singleton and does not call initialize(). Either update the docstring to match the behavior, or call initialize() inside get_metrics_client() (keeping it idempotent).

Copilot uses AI. Check for mistakes.
Comment on lines +286 to +291
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
# Periodically emit memory metrics (every 30s by default)
metrics_client.maybe_emit_memory_metrics()
# Emit final memory metrics snapshot
metrics_client.emit_memory_metrics()
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "final memory metrics snapshot" is only emitted on the normal completion path. If self.source.read(...) raises (or the generator is otherwise aborted), the final snapshot won't run. If the intent is to always capture an end-of-sync measurement, wrap the read loop in a try/finally and emit the final snapshot in the finally block.

Suggested change
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
# Periodically emit memory metrics (every 30s by default)
metrics_client.maybe_emit_memory_metrics()
# Emit final memory metrics snapshot
metrics_client.emit_memory_metrics()
try:
for message in self.source.read(self.logger, config, catalog, state):
yield self.handle_record_counts(message, stream_message_counter)
# Periodically emit memory metrics (every 30s by default)
metrics_client.maybe_emit_memory_metrics()
finally:
# Emit final memory metrics snapshot
metrics_client.emit_memory_metrics()

Copilot uses AI. Check for mistakes.
- Defensive DD_DOGSTATSD_PORT parsing (catch ValueError, fallback to 8125)
- Wrap read loop in try/finally for final metrics snapshot on error
- Update emit_memory_metrics() to set _last_emission_time, avoiding
  duplicate initial snapshots
- Fix get_metrics_client() docstring to clarify caller must call initialize()

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

All four Copilot review comments addressed in 7e9881e:

  1. Defensive DD_DOGSTATSD_PORT parsing — Now catches ValueError and falls back to port 8125 with a warning log.
  2. try/finally for final metrics snapshot — Read loop is now wrapped so the final emit_memory_metrics() runs even if source.read() raises.
  3. Duplicate initial emissionemit_memory_metrics() now updates _last_emission_time, so the first maybe_emit_memory_metrics() in the loop won't re-emit immediately.
  4. get_metrics_client() docstring — Updated to clarify the caller must call initialize() separately.

@pnilan
Copy link
Copy Markdown
Contributor

Missing: datadog package not installed in the Docker image

The datadog dependency is declared as an optional extra in pyproject.toml, but the Dockerfile doesn't install optional extras. The current build does:

poetry install --only main --no-interaction
pip install dist/*.whl

Neither of these will install the datadog package. At runtime, MetricsClient.initialize() will hit the ImportError branch and silently disable all metrics — making the entire feature a no-op in production.

Action: Add the following to the Dockerfile (after the pip install dist/*.whl line):

# Install datadog for DogStatsD metrics emission.
# No-op unless DD_AGENT_HOST is set at runtime.
RUN pip install "datadog>=0.49.0"

@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Good catch — fixed in f410aba. Added RUN pip install "datadog>=0.49.0" to the Dockerfile after the pip install dist/*.whl line.

@pnilan
Copy link
Copy Markdown
Contributor

Patrick Nilan (pnilan) commented Mar 5, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/22737746087

@pnilan
Copy link
Copy Markdown
Contributor

Two issues to address before merge

1. Dockerfile: Replace raw pip install with Poetry extras

The PR adds a hard-coded RUN pip install "datadog>=0.49.0" in the Dockerfile, but the metrics extra is already properly declared in pyproject.toml. This bypasses Poetry and creates a version drift risk — if the constraint in pyproject.toml changes, the Dockerfile won't follow.

The existing pattern in the Dockerfile uses Poetry for dependency management. Replace:

# Install datadog for DogStatsD metrics emission.
# No-op unless DD_AGENT_HOST is set at runtime.
RUN pip install "datadog>=0.49.0"

With installing the wheel using the extras:

RUN pip install "dist/*.whl[metrics]"

Or adjust the existing wheel install line to include the extra. This keeps pyproject.toml as the single source of truth for the datadog version constraint, consistent with how manifest-server handles ddtrace.

2. Add metrics to CI extras

The new metrics extra is not included in the [tool.airbyte_ci] section of pyproject.toml, which means CI won't install datadog during test runs. Add it here:

# pyproject.toml, line 219
poetry_extras = ["file-based", "vector-db-based", "manifest-server", "metrics"]

Without this, CI never installs datadog, and tests rely entirely on the sys.modules mock hack in test_metrics_client.py rather than validating against the real package.

…I extras

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Both addressed in 0f44eaa:

  1. Dockerfile: Replaced raw pip install "datadog>=0.49.0" with pip install "dist/*.whl[metrics]"pyproject.toml is now the single source of truth for the version constraint.
  2. CI extras: Added "metrics" to poetry_extras in [tool.airbyte_ci].

@pnilan
Copy link
Copy Markdown
Contributor

Patrick Nilan (pnilan) commented Mar 6, 2026

Additional recommendations

Remove side effect from should_emit()

should_emit() updates _last_emission_time (line 243), and emit_memory_metrics() also updates it (line 218). When called via maybe_emit_memory_metrics(), the timestamp gets set twice. should_emit should be a pure query — move the timestamp update into emit_memory_metrics() only:

def should_emit(self, interval_seconds: float = DEFAULT_EMISSION_INTERVAL_SECONDS) -> bool:
    return time.monotonic() - self._last_emission_time >= interval_seconds

def maybe_emit_memory_metrics(
    self, interval_seconds: float = DEFAULT_EMISSION_INTERVAL_SECONDS
) -> None:
    if self.enabled and self.should_emit(interval_seconds):
        self.emit_memory_metrics()  # already sets _last_emission_time

Replace sys.modules monkey-patching with a pytest fixture

test_metrics_client.py permanently injects a mock datadog module into sys.modules at import time (lines 14-22). This persists for the entire test session — if any other test file imports datadog after this module loads, it gets the mock. Use a scoped fixture with proper cleanup instead:

@pytest.fixture(autouse=True)
def _mock_datadog(monkeypatch):
    mock_cls = MagicMock()
    mock_mod = types.ModuleType("datadog.dogstatsd")
    mock_mod.DogStatsd = mock_cls
    mock_datadog = types.ModuleType("datadog")
    mock_datadog.dogstatsd = mock_mod
    monkeypatch.setitem(sys.modules, "datadog", mock_datadog)
    monkeypatch.setitem(sys.modules, "datadog.dogstatsd", mock_mod)
    yield mock_cls

monkeypatch automatically restores sys.modules after the test, preventing pollution across the suite.

Reduce per-record overhead in the read loop

maybe_emit_memory_metrics() is called on every yielded record in the read loop. While time.monotonic() is cheap (~50ns), for high-volume connectors doing millions of records/sec it's unnecessary work. A simple counter avoids the syscall on most iterations:

_emit_check_counter = 0
for message in self.source.read(self.logger, config, catalog, state):
    yield self.handle_record_counts(message, stream_message_counter)
    _emit_check_counter += 1
    if _emit_check_counter >= 1000:
        _emit_check_counter = 0
        metrics_client.maybe_emit_memory_metrics()

This is low priority — the current approach works, but this is a trivial optimization for a hot path.

…ad loop counter, Docker build

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

devin-ai-integration bot commented Mar 6, 2026

All three addressed in 4b4fc96:

Pure should_emit(): Removed the _last_emission_time side effect. should_emit() is now a pure query; only emit_memory_metrics() advances the timestamp.

Replace sys.modules monkey-patching: Replaced the module-level sys.modules injection with a monkeypatch-based autouse fixture that restores sys.modules after each test.

Read loop counter: Added a _emit_check_counter that only calls maybe_emit_memory_metrics() every 1000 records, avoiding a time.monotonic() syscall on every yielded message.

Also fixed the Docker build — pip install "dist/*.whl[metrics]" failed because the shell passes the quoted glob literally to pip. Now uses pip install dist/*.whl && pip install "$(ls dist/*.whl)[metrics]" to resolve the filename first, then reinstall with the extras.

All 33 tests pass locally, ruff + mypy clean.

@pnilan
Copy link
Copy Markdown
Contributor

Move MetricsClient out of __init__.py

The __init__.py currently contains 219 lines of class implementation. The repo convention is to keep __init__.py thin (just re-exports) and put implementation in dedicated modules — see airbyte_cdk/models/__init__.py, airbyte_cdk/sources/__init__.py, etc.

Recommend moving the MetricsClient class and singleton logic into airbyte_cdk/metrics/metrics_client.py, then re-exporting from __init__.py:

# airbyte_cdk/metrics/__init__.py
from airbyte_cdk.metrics.metrics_client import MetricsClient, get_metrics_client

__all__ = ["MetricsClient", "get_metrics_client"]

This keeps the import path (from airbyte_cdk.metrics import get_metrics_client) unchanged while following the existing codebase convention.

@pnilan
Copy link
Copy Markdown
Contributor

Singleton with mutable global state and no reset mechanism

The get_metrics_client() singleton has no public API to reset it. Tests directly mutate the private module variable to work around this:

# test_metrics_client.py
metrics_module._metrics_client = None

This is brittle — if the variable is renamed or the singleton strategy changes, tests silently break. It also means test ordering matters: if one test initializes the client with DD_AGENT_HOST set, a later test that expects an uninitialized client could fail.

Two options:

Option A: Expose a reset function (minimal change)

def reset_metrics_client() -> None:
    """Reset the singleton. Intended for testing only."""
    global _metrics_client
    _metrics_client = None

Then tests use reset_metrics_client() instead of reaching into internals.

Option B: Don't use the singleton in tests at all (preferred)

Tests should just instantiate MetricsClient() directly and avoid the singleton entirely. The singleton is only needed in production code (entrypoint.py). If tests never call get_metrics_client(), the reset problem goes away. The one test that validates singleton behavior (TestGetMetricsClient) can use Option A's reset function for cleanup.

This is low priority — the current approach works, but either option improves test isolation.

@pnilan Patrick Nilan (pnilan) changed the title (AI-Triage PR) Add memory & resource metrics to Python CDK via DogStatsD (Tier 1) feat: add memory & resource metrics to Python CDK via DogStatsD (Tier 1) Mar 6, 2026
…client()

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Both addressed in dd958d5:

Move MetricsClient out of __init__.py: Moved the full class + singleton + constants into airbyte_cdk/metrics/metrics_client.py. __init__.py is now thin re-exports only (34 lines). Import paths (from airbyte_cdk.metrics import MetricsClient) are unchanged.

Singleton reset: Added reset_metrics_client() to the public API. The singleton test now uses it instead of reaching into _metrics_client directly. All other tests instantiate MetricsClient() directly per Option B.

All 33 tests pass, ruff + mypy clean.

@pnilan
Copy link
Copy Markdown
Contributor

Drop cdk.memory.usage_percent — compute it in Datadog instead

METRIC_MEMORY_USAGE_PERCENT is just usage_bytes / limit_bytes. Emitting it as a separate gauge means an extra UDP packet every 30s per connector and an extra custom metric in Datadog (custom metrics are billed per unique metric + tag combination) — for no new information.

Datadog's query language handles this natively in dashboards and monitors:

a = avg:cdk.memory.usage_bytes{*}
b = avg:cdk.memory.limit_bytes{*}
formula = a / b

Recommend removing METRIC_MEMORY_USAGE_PERCENT and the associated gauge() call in emit_memory_metrics(), and just computing the ratio in the dashboard.

…adog instead

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

Done in d951916 — removed METRIC_MEMORY_USAGE_PERCENT and the associated gauge() call. The MemoryInfo.usage_percent property is kept on the dataclass (it's free to compute and useful for logging), but the metric is no longer emitted. Two gauges now: cdk.memory.usage_bytes and cdk.memory.limit_bytes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants