Skip to content

Commit 4b4fc96

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
fix: address reviewer feedback - pure should_emit, pytest fixture, read loop counter, Docker build
Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 0f44eaa commit 4b4fc96

4 files changed

Lines changed: 55 additions & 42 deletions

File tree

Dockerfile

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@ COPY dist/*.whl ./dist/
1717
RUN poetry config virtualenvs.create false \
1818
&& poetry install --only main --no-interaction --no-ansi || true
1919

20-
# Build and install the package (with metrics extra for DogStatsD support)
21-
RUN pip install "dist/*.whl[metrics]"
20+
# Build and install the package with the metrics extra (for DogStatsD support).
21+
# Two-step install: first install the wheel, then reinstall with extras using
22+
# the resolved filename (shell glob + [extras] syntax don't mix in a single arg).
23+
RUN pip install dist/*.whl \
24+
&& pip install "$(ls dist/*.whl)[metrics]"
2225

2326
# Recreate the original structure
2427
RUN mkdir -p source_declarative_manifest \

airbyte_cdk/entrypoint.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,11 +283,16 @@ def read(
283283

284284
# The Airbyte protocol dictates that counts be expressed as float/double to better protect against integer overflows
285285
stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float)
286+
_emit_check_counter = 0
286287
try:
287288
for message in self.source.read(self.logger, config, catalog, state):
288289
yield self.handle_record_counts(message, stream_message_counter)
289-
# Periodically emit memory metrics (every 30s by default)
290-
metrics_client.maybe_emit_memory_metrics()
290+
# Check for metric emission every 1000 records to avoid
291+
# a time.monotonic() syscall on every single record.
292+
_emit_check_counter += 1
293+
if _emit_check_counter >= 1000:
294+
_emit_check_counter = 0
295+
metrics_client.maybe_emit_memory_metrics()
291296
finally:
292297
# Emit final memory metrics snapshot (runs even if the read loop raises)
293298
metrics_client.emit_memory_metrics()

airbyte_cdk/metrics/__init__.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,9 @@ def should_emit(self, interval_seconds: float = DEFAULT_EMISSION_INTERVAL_SECOND
180180
Check if enough time has passed since the last emission to emit again.
181181
182182
Returns True if at least interval_seconds have elapsed since the last emission.
183+
This is a pure query — it does not update any internal state.
183184
"""
184-
now = time.monotonic()
185-
if now - self._last_emission_time >= interval_seconds:
186-
self._last_emission_time = now
187-
return True
188-
return False
185+
return time.monotonic() - self._last_emission_time >= interval_seconds
189186

190187
def maybe_emit_memory_metrics(
191188
self, interval_seconds: float = DEFAULT_EMISSION_INTERVAL_SECONDS

unit_tests/metrics/test_metrics_client.py

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,32 +13,36 @@
1313

1414
from airbyte_cdk.metrics.memory import MemoryInfo
1515

16-
# Ensure mock datadog module is available for tests regardless of whether
17-
# the optional `datadog` package is installed.
18-
_mock_dogstatsd_cls = MagicMock()
19-
20-
if "datadog" not in sys.modules:
21-
_mock_datadog = types.ModuleType("datadog")
22-
_mock_dogstatsd_mod = types.ModuleType("datadog.dogstatsd")
23-
_mock_dogstatsd_mod.DogStatsd = _mock_dogstatsd_cls # type: ignore[attr-defined]
24-
_mock_datadog.dogstatsd = _mock_dogstatsd_mod # type: ignore[attr-defined]
25-
sys.modules["datadog"] = _mock_datadog
26-
sys.modules["datadog.dogstatsd"] = _mock_dogstatsd_mod
16+
17+
@pytest.fixture(autouse=True)
18+
def _mock_datadog(monkeypatch: pytest.MonkeyPatch) -> MagicMock:
19+
"""Provide a mock datadog module so tests work regardless of whether the
20+
optional ``datadog`` package is installed. ``monkeypatch`` automatically
21+
restores ``sys.modules`` after each test, preventing cross-test pollution."""
22+
mock_cls = MagicMock()
23+
mock_mod = types.ModuleType("datadog.dogstatsd")
24+
mock_mod.DogStatsd = mock_cls # type: ignore[attr-defined]
25+
mock_datadog = types.ModuleType("datadog")
26+
mock_datadog.dogstatsd = mock_mod # type: ignore[attr-defined]
27+
monkeypatch.setitem(sys.modules, "datadog", mock_datadog)
28+
monkeypatch.setitem(sys.modules, "datadog.dogstatsd", mock_mod)
29+
return mock_cls
30+
2731

2832
import airbyte_cdk.metrics as metrics_module # noqa: E402
2933
from airbyte_cdk.metrics import MetricsClient, get_metrics_client # noqa: E402
3034

3135

32-
def _make_enabled_client() -> tuple[MetricsClient, MagicMock]:
36+
def _make_enabled_client(mock_dogstatsd_cls: MagicMock) -> tuple[MetricsClient, MagicMock]:
3337
"""Helper to create an enabled MetricsClient with a mock DogStatsd instance."""
3438
mock_instance = MagicMock()
35-
_mock_dogstatsd_cls.reset_mock()
36-
_mock_dogstatsd_cls.return_value = mock_instance
39+
mock_dogstatsd_cls.reset_mock()
40+
mock_dogstatsd_cls.return_value = mock_instance
3741

3842
client = MetricsClient()
3943
with (
4044
patch.dict("os.environ", {"DD_AGENT_HOST": "localhost"}, clear=True),
41-
patch("datadog.dogstatsd.DogStatsd", _mock_dogstatsd_cls),
45+
patch("datadog.dogstatsd.DogStatsd", mock_dogstatsd_cls),
4246
):
4347
client.initialize()
4448
return client, mock_instance
@@ -51,8 +55,8 @@ def test_disabled_when_dd_agent_host_not_set(self) -> None:
5155
client.initialize()
5256
assert not client.enabled
5357

54-
def test_enabled_when_dd_agent_host_set(self) -> None:
55-
client, _ = _make_enabled_client()
58+
def test_enabled_when_dd_agent_host_set(self, _mock_datadog: MagicMock) -> None:
59+
client, _ = _make_enabled_client(_mock_datadog)
5660
assert client.enabled
5761

5862
def test_initialize_idempotent(self) -> None:
@@ -73,10 +77,10 @@ def test_disabled_when_datadog_import_fails(self) -> None:
7377

7478

7579
class TestMetricsClientTags:
76-
def test_builds_tags_from_env(self) -> None:
80+
def test_builds_tags_from_env(self, _mock_datadog: MagicMock) -> None:
7781
mock_instance = MagicMock()
78-
_mock_dogstatsd_cls.reset_mock()
79-
_mock_dogstatsd_cls.return_value = mock_instance
82+
_mock_datadog.reset_mock()
83+
_mock_datadog.return_value = mock_instance
8084

8185
client = MetricsClient()
8286
env = {
@@ -88,7 +92,7 @@ def test_builds_tags_from_env(self) -> None:
8892
}
8993
with (
9094
patch.dict("os.environ", env, clear=True),
91-
patch("datadog.dogstatsd.DogStatsd", _mock_dogstatsd_cls),
95+
patch("datadog.dogstatsd.DogStatsd", _mock_datadog),
9296
):
9397
client.initialize()
9498

@@ -104,30 +108,30 @@ def test_gauge_noop_when_disabled(self) -> None:
104108
# Should not raise even when not initialized
105109
client.gauge("test.metric", 42.0)
106110

107-
def test_gauge_emits_when_enabled(self) -> None:
108-
client, mock_instance = _make_enabled_client()
111+
def test_gauge_emits_when_enabled(self, _mock_datadog: MagicMock) -> None:
112+
client, mock_instance = _make_enabled_client(_mock_datadog)
109113

110114
client.gauge("test.metric", 42.0)
111115
mock_instance.gauge.assert_called_once_with("test.metric", 42.0, tags=client._tags)
112116

113-
def test_gauge_with_extra_tags(self) -> None:
114-
client, mock_instance = _make_enabled_client()
117+
def test_gauge_with_extra_tags(self, _mock_datadog: MagicMock) -> None:
118+
client, mock_instance = _make_enabled_client(_mock_datadog)
115119

116120
client.gauge("test.metric", 42.0, extra_tags=["stream:users"])
117121
call_tags = mock_instance.gauge.call_args[1]["tags"]
118122
assert "stream:users" in call_tags
119123

120-
def test_gauge_swallows_exceptions(self) -> None:
121-
client, mock_instance = _make_enabled_client()
124+
def test_gauge_swallows_exceptions(self, _mock_datadog: MagicMock) -> None:
125+
client, mock_instance = _make_enabled_client(_mock_datadog)
122126
mock_instance.gauge.side_effect = Exception("network error")
123127

124128
# Should not raise
125129
client.gauge("test.metric", 42.0)
126130

127131

128132
class TestEmitMemoryMetrics:
129-
def test_emits_all_metrics_when_enabled(self) -> None:
130-
client, mock_instance = _make_enabled_client()
133+
def test_emits_all_metrics_when_enabled(self, _mock_datadog: MagicMock) -> None:
134+
client, mock_instance = _make_enabled_client(_mock_datadog)
131135

132136
mock_info = MemoryInfo(usage_bytes=100_000_000, limit_bytes=200_000_000)
133137
with patch("airbyte_cdk.metrics.get_memory_info", return_value=mock_info):
@@ -138,8 +142,8 @@ def test_emits_all_metrics_when_enabled(self) -> None:
138142
assert gauge_calls["cdk.memory.limit_bytes"] == 200_000_000.0
139143
assert gauge_calls["cdk.memory.usage_percent"] == pytest.approx(0.5)
140144

141-
def test_skips_limit_when_unknown(self) -> None:
142-
client, mock_instance = _make_enabled_client()
145+
def test_skips_limit_when_unknown(self, _mock_datadog: MagicMock) -> None:
146+
client, mock_instance = _make_enabled_client(_mock_datadog)
143147

144148
mock_info = MemoryInfo(usage_bytes=100_000_000, limit_bytes=None)
145149
with patch("airbyte_cdk.metrics.get_memory_info", return_value=mock_info):
@@ -164,18 +168,22 @@ def test_emits_on_first_call(self) -> None:
164168
def test_does_not_emit_before_interval(self) -> None:
165169
client = MetricsClient()
166170
assert client.should_emit(interval_seconds=30.0)
171+
# Manually advance the timestamp (emit_memory_metrics is a no-op
172+
# when the client is disabled, so set it directly).
173+
client._last_emission_time = time.monotonic()
167174
assert not client.should_emit(interval_seconds=30.0)
168175

169176
def test_emits_after_interval(self) -> None:
170177
client = MetricsClient()
171178
assert client.should_emit(interval_seconds=0.01)
179+
client._last_emission_time = time.monotonic()
172180
time.sleep(0.02)
173181
assert client.should_emit(interval_seconds=0.01)
174182

175183

176184
class TestMaybeEmitMemoryMetrics:
177-
def test_emits_on_interval(self) -> None:
178-
client, mock_instance = _make_enabled_client()
185+
def test_emits_on_interval(self, _mock_datadog: MagicMock) -> None:
186+
client, mock_instance = _make_enabled_client(_mock_datadog)
179187

180188
mock_info = MemoryInfo(usage_bytes=100, limit_bytes=200)
181189
with patch("airbyte_cdk.metrics.get_memory_info", return_value=mock_info):

0 commit comments

Comments
 (0)