feat: add memory & resource metrics to Python CDK via DogStatsD (Tier 1)#931
feat: add memory & resource metrics to Python CDK via DogStatsD (Tier 1)#931devin-ai-integration[bot] wants to merge 12 commits intomainfrom
Conversation
Implements Tier 1 of the metrics initiative per issue #15943: - New airbyte_cdk/metrics/ module with DogStatsD client - cgroup v2/v1 memory reading with rusage fallback - Periodic metric emission during source read() operations - Graceful no-op when DD_AGENT_HOST is not set - 34 unit tests covering all code paths Metrics emitted: - cdk.memory.usage_bytes: Container memory usage - cdk.memory.limit_bytes: Container memory limit - cdk.memory.usage_percent: Usage/limit ratio - cdk.memory.python_heap_bytes: Python heap via tracemalloc Co-Authored-By: bot_apk <apk@cognition.ai>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1772669295-add-memory-metrics-module#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1772669295-add-memory-metrics-modulePR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
PyTest Results (Full)3 926 tests 3 914 ✅ 11m 10s ⏱️ Results for commit d951916. ♻️ This comment has been updated with latest results. |
- Regenerate poetry.lock after adding datadog optional dependency - Remove unused imports (CGROUP_V1_MEMORY_LIMIT, etc.) from test_memory.py - Fix duplicate import style in test_metrics_client.py Co-Authored-By: bot_apk <apk@cognition.ai>
Co-Authored-By: bot_apk <apk@cognition.ai>
…tatsd Co-Authored-By: bot_apk <apk@cognition.ai>
Review Feedback: Remove
|
tracemalloc.start() has ~10-30% runtime overhead and was being activated silently for all production connectors. The cgroup-based metrics already provide container memory pressure visibility. tracemalloc-based heap metrics will be added separately behind an explicit opt-in env var flag. Co-Authored-By: bot_apk <apk@cognition.ai>
|
Done in c8b0197 — removed |
Nit:
|
Co-Authored-By: bot_apk <apk@cognition.ai>
|
Fixed in b208bd0 — added |
|
/prerelease
|
|
The prerelease workflow's The main CI's |
There was a problem hiding this comment.
Pull request overview
Adds opt-in (via DD_AGENT_HOST) container memory telemetry to the Python CDK by introducing a small metrics layer that reads memory usage/limits (cgroups → rusage fallback) and emits DogStatsD gauges during source.read() execution, aligning with the Tier 1 memory/resource metrics initiative.
Changes:
- Introduces
airbyte_cdk.metricswith aMetricsClientwrapper around DogStatsD and a cgroup-based memory reader with fallbacks. - Integrates periodic memory metric emission into the core
AirbyteEntrypoint.read()loop (initial / periodic / final). - Adds
datadogas an optional dependency under a newmetricsextra, plus unit tests for memory reading and metrics client behavior.
Reviewed changes
Copilot reviewed 7 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
airbyte_cdk/metrics/memory.py |
Implements cgroup v2/v1 and resource.getrusage() fallback chain to produce MemoryInfo. |
airbyte_cdk/metrics/__init__.py |
Adds MetricsClient, metric constants, emission interval logic, and a module-level singleton accessor. |
airbyte_cdk/entrypoint.py |
Hooks metrics initialization and emission into the read() loop for all Python connectors. |
pyproject.toml |
Adds optional datadog dependency and metrics extra. |
poetry.lock |
Updates lockfile to include datadog and extra metadata. |
unit_tests/metrics/test_memory.py |
Adds unit tests for cgroup v2/v1 parsing and rusage fallback behavior. |
unit_tests/metrics/test_metrics_client.py |
Adds unit tests for client initialization, tags, gauge emission, timing/interval logic, and singleton behavior. |
unit_tests/metrics/__init__.py |
Adds package marker for the new metrics unit test package. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
airbyte_cdk/entrypoint.py
Outdated
| # Initialize and emit initial memory metrics | ||
| metrics_client = get_metrics_client() | ||
| metrics_client.initialize() | ||
| metrics_client.emit_memory_metrics() | ||
|
|
||
| # The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows | ||
| stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) | ||
| for message in self.source.read(self.logger, config, catalog, state): | ||
| yield self.handle_record_counts(message, stream_message_counter) | ||
| # Periodically emit memory metrics (every 30s by default) | ||
| metrics_client.maybe_emit_memory_metrics() |
There was a problem hiding this comment.
read() emits an initial memory snapshot, but maybe_emit_memory_metrics() will also emit on the first loop iteration because MetricsClient._last_emission_time starts at 0.0 and should_emit() treats that as "interval elapsed". This results in two near-identical snapshots at the beginning of every sync. Consider making the initial emission go through maybe_emit_memory_metrics(interval_seconds=0) (so it updates the last emission time) or have emit_memory_metrics() update the last-emission timestamp when called directly.
airbyte_cdk/metrics/__init__.py
Outdated
| logger.debug("DD_AGENT_HOST not set; metrics emission disabled") | ||
| return | ||
|
|
||
| dd_dogstatsd_port = int(os.environ.get("DD_DOGSTATSD_PORT", "8125")) |
There was a problem hiding this comment.
DD_DOGSTATSD_PORT is parsed with int(...) outside of any try/except. If the env var is set to a non-integer value, initialization will raise ValueError and can crash connector startup, which defeats the "graceful no-op" goal. Parse the port defensively (catch ValueError and either fall back to 8125 or disable metrics with a warning).
| dd_dogstatsd_port = int(os.environ.get("DD_DOGSTATSD_PORT", "8125")) | |
| port_str = os.environ.get("DD_DOGSTATSD_PORT") | |
| if not port_str: | |
| dd_dogstatsd_port = 8125 | |
| else: | |
| try: | |
| dd_dogstatsd_port = int(port_str) | |
| except ValueError: | |
| logger.warning( | |
| "Invalid DD_DOGSTATSD_PORT value %r; falling back to default port 8125", | |
| port_str, | |
| ) | |
| dd_dogstatsd_port = 8125 |
airbyte_cdk/metrics/__init__.py
Outdated
| def get_metrics_client() -> MetricsClient: | ||
| """ | ||
| Get or create the module-level MetricsClient singleton. | ||
|
|
||
| The client is initialized lazily on first access. | ||
| """ | ||
| global _metrics_client | ||
| if _metrics_client is None: | ||
| _metrics_client = MetricsClient() | ||
| return _metrics_client |
There was a problem hiding this comment.
The get_metrics_client() docstring says "The client is initialized lazily on first access", but the function currently only constructs the singleton and does not call initialize(). Either update the docstring to match the behavior, or call initialize() inside get_metrics_client() (keeping it idempotent).
airbyte_cdk/entrypoint.py
Outdated
| for message in self.source.read(self.logger, config, catalog, state): | ||
| yield self.handle_record_counts(message, stream_message_counter) | ||
| # Periodically emit memory metrics (every 30s by default) | ||
| metrics_client.maybe_emit_memory_metrics() | ||
| # Emit final memory metrics snapshot | ||
| metrics_client.emit_memory_metrics() |
There was a problem hiding this comment.
The "final memory metrics snapshot" is only emitted on the normal completion path. If self.source.read(...) raises (or the generator is otherwise aborted), the final snapshot won't run. If the intent is to always capture an end-of-sync measurement, wrap the read loop in a try/finally and emit the final snapshot in the finally block.
| for message in self.source.read(self.logger, config, catalog, state): | |
| yield self.handle_record_counts(message, stream_message_counter) | |
| # Periodically emit memory metrics (every 30s by default) | |
| metrics_client.maybe_emit_memory_metrics() | |
| # Emit final memory metrics snapshot | |
| metrics_client.emit_memory_metrics() | |
| try: | |
| for message in self.source.read(self.logger, config, catalog, state): | |
| yield self.handle_record_counts(message, stream_message_counter) | |
| # Periodically emit memory metrics (every 30s by default) | |
| metrics_client.maybe_emit_memory_metrics() | |
| finally: | |
| # Emit final memory metrics snapshot | |
| metrics_client.emit_memory_metrics() |
- Defensive DD_DOGSTATSD_PORT parsing (catch ValueError, fallback to 8125) - Wrap read loop in try/finally for final metrics snapshot on error - Update emit_memory_metrics() to set _last_emission_time, avoiding duplicate initial snapshots - Fix get_metrics_client() docstring to clarify caller must call initialize() Co-Authored-By: bot_apk <apk@cognition.ai>
|
All four Copilot review comments addressed in 7e9881e:
|
Missing:
|
Co-Authored-By: bot_apk <apk@cognition.ai>
|
Good catch — fixed in f410aba. Added |
|
/prerelease
|
Two issues to address before merge1. Dockerfile: Replace raw
|
…I extras Co-Authored-By: bot_apk <apk@cognition.ai>
|
Both addressed in 0f44eaa:
|
Additional recommendationsRemove side effect from
|
…ad loop counter, Docker build Co-Authored-By: bot_apk <apk@cognition.ai>
|
All three addressed in 4b4fc96: Pure Replace Read loop counter: Added a Also fixed the Docker build — All 33 tests pass locally, ruff + mypy clean. |
Move
|
Singleton with mutable global state and no reset mechanismThe # test_metrics_client.py
metrics_module._metrics_client = NoneThis is brittle — if the variable is renamed or the singleton strategy changes, tests silently break. It also means test ordering matters: if one test initializes the client with Two options: Option A: Expose a reset function (minimal change) def reset_metrics_client() -> None:
"""Reset the singleton. Intended for testing only."""
global _metrics_client
_metrics_client = NoneThen tests use Option B: Don't use the singleton in tests at all (preferred) Tests should just instantiate This is low priority — the current approach works, but either option improves test isolation. |
…client() Co-Authored-By: bot_apk <apk@cognition.ai>
|
Both addressed in dd958d5: Move Singleton reset: Added All 33 tests pass, ruff + mypy clean. |
Drop
|
…adog instead Co-Authored-By: bot_apk <apk@cognition.ai>
|
Done in d951916 — removed |
(AI-Triage PR) Add memory & resource metrics to Python CDK via DogStatsD
Summary
Implements Tier 1 of the memory/resource metrics initiative per airbytehq/airbyte-internal-issues#15943. Adds a new
airbyte_cdk/metrics/module that emits container memory metrics via DogStatsD during sourceread()operations.New files:
airbyte_cdk/metrics/memory.py— Reads container memory from cgroup v2 → cgroup v1 →resource.getrusage()fallback chainairbyte_cdk/metrics/metrics_client.py—MetricsClientclass, singleton, and constants wrapping DogStatsD with graceful no-op whenDD_AGENT_HOSTis absentairbyte_cdk/metrics/__init__.py— Thin re-exports only (follows repo convention for__init__.py)unit_tests/metrics/test_memory.py— 15 tests covering cgroup v2, v1, and rusage fallbackunit_tests/metrics/test_metrics_client.py— 18 tests covering initialization, gauge emission, timing, tags, and singleton behaviorModified files:
pyproject.toml— Addsdatadogas an optional dependency under a newmetricsextrapoetry.lock— Regenerated to include the newdatadogdependencyDockerfile— Installs themetricsextra in the Docker image (two-step install to work around shell glob +[extras]syntax conflict)airbyte_cdk/entrypoint.py— Integrates periodic metric emission into theread()loop (initial → every 30s via counter-based throttling → final snapshot intry/finally)Metrics emitted:
cdk.memory.usage_bytes,cdk.memory.limit_bytes,cdk.memory.usage_percentAll metric emission is opt-in: silent no-op unless
DD_AGENT_HOSTis set by the platform'sConnectorApmSupportHelper.Updates since last revision
MetricsClientout of__init__.py— Per reviewer feedback,MetricsClientclass, singleton logic, and constants now live inairbyte_cdk/metrics/metrics_client.py.__init__.pyis thin re-exports only (33 lines), following the repo convention seen inairbyte_cdk/models/__init__.py,airbyte_cdk/sources/__init__.py, etc. Import paths are unchanged.reset_metrics_client()— Public API to reset the singleton, replacing direct mutation of_metrics_clientin tests. Tests now use this function for cleanup instead of reaching into private module state.should_emit()is now a pure query — Removed the_last_emission_timeside-effect fromshould_emit(). Onlyemit_memory_metrics()advances the timestamp, eliminating the duplicate update when called viamaybe_emit_memory_metrics().sys.modulesmonkey-patching with pytest fixture — The test file's module-levelsys.modulesinjection (which persisted for the entire test session) is replaced with amonkeypatch-basedautousefixture that automatically restoressys.modulesafter each test.maybe_emit_memory_metrics()is now called every 1000 records instead of on every record, avoiding atime.monotonic()syscall on every yielded message.pip install "dist/*.whl[metrics]"failed because the quoted glob is passed literally to pip. Now uses a two-step install:pip install dist/*.whl && pip install "$(ls dist/*.whl)[metrics]".DD_DOGSTATSD_PORTparsing — Invalid port values now log a warning and fall back to 8125 instead of crashing.try/finallyfor final metrics snapshot —entrypoint.pyread loop now emits a final memory snapshot even if the loop raises, ensuring we capture peak memory usage.Earlier updates:
tracemalloc/python_heap_bytes— Per reviewer feedback,get_python_heap_bytes()andMETRIC_MEMORY_PYTHON_HEAP_BYTEShave been removed.tracemalloc.start()hooks into CPython's allocator with ~10-30% runtime overhead, and was being activated silently for all production connectors. The three cgroup-based metrics are sufficient for container memory pressure visibility. A separate issue tracks adding tracemalloc as an opt-in feature behindCDK_TRACEMALLOC_ENABLED.ru_maxrssplatform difference —sys.platform == "darwin"check added to handleru_maxrssunits (bytes on macOS, kilobytes on Linux).Review & Testing Checklist for Human
entrypoint.pymodification safety — This PR modifies the coreread()loop. Any bug here affects all Python connectors. Review the integration points carefully:try/finallyensures final metrics snapshot even if the read loop raises, but verify this doesn't suppress exceptions or affect error handling.pip install dist/*.whl && pip install "$(ls dist/*.whl)[metrics]"pattern works around shell glob +[extras]syntax conflict. Verify this is acceptable for the CDK Dockerfile and doesn't break any build workflows._statsdtyped asAny— All calls toself._statsd.gauge()bypass MyPy checking. If the DogStatsd API changes in a future version, type errors will not be caught statically. Consider whether aTYPE_CHECKING-guarded conditional import ofDogStatsdwould be preferable.get_metrics_client()returns a module-level singleton. Areset_metrics_client()function is now available for tests, but the singleton is never cleaned up in production. Verify this is acceptable for the connector execution model (one process per sync).gauge()andemit_memory_metrics()silently catch and log all exceptions. This prevents metrics failures from breaking syncs, but could mask real issues (e.g., auth failures, OOM during metric collection). Consider whether debug logs are sufficient or if warnings should be emitted for persistent failures.Recommended test plan:
poetry install --extras metricsand verifydatadogis installed.poetry run pytest unit_tests/metrics/ -v) to verify all 33 tests pass.DD_AGENT_HOST=localhost(and a local DogStatsD agent) to verify metrics appear in Datadog. Check thatcdk.memory.*gauges are present with correct tags (connector,version,connection_id,workspace_id).DD_AGENT_HOSTset and verify no errors/warnings are logged (graceful no-op behavior).DD_AGENT_HOSTset but no DogStatsD agent listening, verify the sync completes successfully and only debug logs are emitted (no warnings/errors).datadogpackage.Notes