Skip to content

Commit 7e9881e

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
fix: address Copilot review comments
- Defensive DD_DOGSTATSD_PORT parsing (catch ValueError, fallback to 8125) - Wrap read loop in try/finally for final metrics snapshot on error - Update emit_memory_metrics() to set _last_emission_time, avoiding duplicate initial snapshots - Fix get_metrics_client() docstring to clarify caller must call initialize() Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent b208bd0 commit 7e9881e

2 files changed

Lines changed: 30 additions & 8 deletions

File tree

airbyte_cdk/entrypoint.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -283,12 +283,14 @@ def read(
283283

284284
# The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows
285285
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
286-
for message in self.source.read(self.logger, config, catalog, state):
287-
yield self.handle_record_counts(message, stream_message_counter)
288-
# Periodically emit memory metrics (every 30s by default)
289-
metrics_client.maybe_emit_memory_metrics()
290-
# Emit final memory metrics snapshot
291-
metrics_client.emit_memory_metrics()
286+
try:
287+
for message in self.source.read(self.logger, config, catalog, state):
288+
yield self.handle_record_counts(message, stream_message_counter)
289+
# Periodically emit memory metrics (every 30s by default)
290+
metrics_client.maybe_emit_memory_metrics()
291+
finally:
292+
# Emit final memory metrics snapshot (runs even if the read loop raises)
293+
metrics_client.emit_memory_metrics()
292294
for message in self._emit_queued_messages(self.source):
293295
yield self.handle_record_counts(message, stream_message_counter)
294296

airbyte_cdk/metrics/__init__.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,18 @@ def initialize(self) -> None:
5656
logger.debug("DD_AGENT_HOST not set; metrics emission disabled")
5757
return
5858

59-
dd_dogstatsd_port = int(os.environ.get("DD_DOGSTATSD_PORT", "8125"))
59+
port_str = os.environ.get("DD_DOGSTATSD_PORT")
60+
if not port_str:
61+
dd_dogstatsd_port = 8125
62+
else:
63+
try:
64+
dd_dogstatsd_port = int(port_str)
65+
except ValueError:
66+
logger.warning(
67+
"Invalid DD_DOGSTATSD_PORT value %r; falling back to default port 8125",
68+
port_str,
69+
)
70+
dd_dogstatsd_port = 8125
6071

6172
try:
6273
from datadog.dogstatsd import DogStatsd
@@ -139,10 +150,16 @@ def emit_memory_metrics(self) -> None:
139150
- cdk.memory.usage_bytes: Current container memory usage
140151
- cdk.memory.limit_bytes: Container memory limit (if known)
141152
- cdk.memory.usage_percent: Usage/limit ratio (if limit is known)
153+
154+
Also updates the last-emission timestamp so that subsequent calls to
155+
``should_emit`` / ``maybe_emit_memory_metrics`` respect the interval.
142156
"""
143157
if not self.enabled:
144158
return
145159

160+
# Update the last-emission timestamp to avoid duplicate snapshots
161+
self._last_emission_time = time.monotonic()
162+
146163
try:
147164
info: MemoryInfo = get_memory_info()
148165

@@ -191,7 +208,10 @@ def get_metrics_client() -> MetricsClient:
191208
"""
192209
Get or create the module-level MetricsClient singleton.
193210
194-
The client is initialized lazily on first access.
211+
Note: The caller is responsible for calling ``initialize()`` on the
212+
returned client before emitting metrics. Construction and initialization
213+
are separate so that the caller controls *when* environment variables
214+
are read and the DogStatsD connection is established.
195215
"""
196216
global _metrics_client
197217
if _metrics_client is None:

0 commit comments

Comments
 (0)