Skip to content

Commit df4898d

Browse files
refactor(memory-monitor): humanize error/log byte values and log configured thresholds on init
- Add _format_bytes helper that renders byte counts as GB/MB/KB with 2 decimal places (decimal units). - Replace raw byte values in the fail-fast internal_message and the two critical-but-not-raising log paths with _format_bytes(...). - Emit a one-shot INFO log on MemoryMonitor instantiation reporting the configured thresholds and check intervals. - Clarify docstrings that the tightened high-pressure check interval overrides the configurable check_interval (default 5000), addressing Copilot review feedback. - Tests: parametrized _format_bytes coverage, instantiation-log assertion, and assertions that the internal_message uses the humanized format. Co-Authored-By: patrick.nilan@airbyte.io <patrick.nilan@airbyte.io>
1 parent f5738e5 commit df4898d

2 files changed

Lines changed: 98 additions & 9 deletions

File tree

airbyte_cdk/utils/memory_monitor.py

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,24 @@
2525
# Process-level anonymous RSS from /proc/self/status (Linux only, no extra dependency)
2626
_PROC_SELF_STATUS = Path("/proc/self/status")
2727

28+
29+
def _format_bytes(num_bytes: int) -> str:
30+
"""Render a byte count as a short human-readable string with 2 decimals.
31+
32+
Uses decimal units (GB = 10^9, MB = 10^6, KB = 10^3) so that raw cgroup
33+
byte values render close to the way operators describe container limits
34+
(e.g. a 2_147_483_648-byte limit renders as ``2.15 GB`` rather than the
35+
binary ``2.00 GiB``). Values below 1 KB are rendered as plain bytes.
36+
"""
37+
if num_bytes >= 1_000_000_000:
38+
return f"{num_bytes / 1_000_000_000:.2f} GB"
39+
if num_bytes >= 1_000_000:
40+
return f"{num_bytes / 1_000_000:.2f} MB"
41+
if num_bytes >= 1_000:
42+
return f"{num_bytes / 1_000:.2f} KB"
43+
return f"{num_bytes} B"
44+
45+
2846
# Raise AirbyteTracedException when BOTH conditions are met:
2947
# 1. cgroup usage >= critical threshold
3048
# 2. anonymous memory >= anon-share threshold of *current cgroup usage*
@@ -104,8 +122,8 @@ class MemoryMonitor:
104122
transitions, not periodic sampling
105123
106124
**High-pressure polling:** Once cgroup usage first crosses 90%, the check
107-
interval permanently tightens from 5000 to 100 messages to narrow the race
108-
window near OOM.
125+
interval permanently tightens from the configured ``check_interval``
126+
(default 5000) to 100 messages to narrow the race window near OOM.
109127
110128
**Fail-fast:** Raises ``AirbyteTracedException`` with
111129
``FailureType.system_error`` when *both*:
@@ -138,6 +156,16 @@ def __init__(
138156
self._probed = False
139157
self._high_pressure_mode = False
140158
self._critical_logged = False
159+
logger.info(
160+
"MemoryMonitor instantiated with critical threshold: %d%%, "
161+
"anon share of usage threshold: %d%%, high-pressure threshold: %d%%, "
162+
"check interval: %d messages (tightens to %d under high pressure).",
163+
int(_CRITICAL_THRESHOLD * 100),
164+
int(_ANON_SHARE_OF_USAGE_THRESHOLD * 100),
165+
int(_HIGH_PRESSURE_THRESHOLD * 100),
166+
self._check_interval,
167+
_HIGH_PRESSURE_CHECK_INTERVAL,
168+
)
141169

142170
def _probe_cgroup(self) -> None:
143171
"""Detect which cgroup version (if any) is available.
@@ -216,7 +244,8 @@ def check_memory_usage(self) -> None:
216244
Intended to be called on every message. The monitor internally tracks
217245
a message counter and only reads cgroup files every ``check_interval``
218246
messages (default 5000). Once usage crosses 90%, the interval tightens
219-
to 100 messages for the remainder of the sync.
247+
to 100 messages for the remainder of the sync regardless of the
248+
configured ``check_interval``.
220249
221250
Logging is event-based (one-shot on state transitions), not periodic.
222251
@@ -240,8 +269,6 @@ def check_memory_usage(self) -> None:
240269
usage_bytes, limit_bytes = memory_info
241270
usage_ratio = usage_bytes / limit_bytes
242271
usage_percent = int(usage_ratio * 100)
243-
usage_gb = usage_bytes / (1024**3)
244-
limit_gb = limit_bytes / (1024**3)
245272

246273
if usage_ratio >= _HIGH_PRESSURE_THRESHOLD and not self._high_pressure_mode:
247274
self._high_pressure_mode = True
@@ -262,8 +289,9 @@ def check_memory_usage(self) -> None:
262289
raise AirbyteTracedException(
263290
message=f"Source memory usage exceeded critical threshold ({usage_percent}% of container limit).",
264291
internal_message=(
265-
f"Cgroup memory: {usage_bytes} / {limit_bytes} bytes ({usage_percent}%). "
266-
f"Anonymous memory ({anon_source}): {anon_bytes} bytes "
292+
f"Cgroup memory: {_format_bytes(usage_bytes)} / "
293+
f"{_format_bytes(limit_bytes)} ({usage_percent}%). "
294+
f"Anonymous memory ({anon_source}): {_format_bytes(anon_bytes)} "
267295
f"({int(anon_share * 100)}% of current cgroup usage). "
268296
f"Thresholds: cgroup >= {int(_CRITICAL_THRESHOLD * 100)}%, "
269297
f"anon share of usage >= {int(_ANON_SHARE_OF_USAGE_THRESHOLD * 100)}%."
@@ -273,13 +301,17 @@ def check_memory_usage(self) -> None:
273301
elif not self._critical_logged:
274302
self._critical_logged = True
275303
logger.info(
276-
"Cgroup usage crossed %d%% but anonymous memory is only %d%% of current cgroup usage; not raising.",
304+
"Cgroup usage crossed %d%% (%s of %s) but anonymous memory is only %d%% of current cgroup usage; not raising.",
277305
int(_CRITICAL_THRESHOLD * 100),
306+
_format_bytes(usage_bytes),
307+
_format_bytes(limit_bytes),
278308
int(anon_share * 100),
279309
)
280310
elif not self._critical_logged:
281311
self._critical_logged = True
282312
logger.warning(
283-
"Cgroup usage crossed %d%% but anonymous memory signal unavailable; skipping fail-fast.",
313+
"Cgroup usage crossed %d%% (%s of %s) but anonymous memory signal unavailable; skipping fail-fast.",
284314
int(_CRITICAL_THRESHOLD * 100),
315+
_format_bytes(usage_bytes),
316+
_format_bytes(limit_bytes),
285317
)

unit_tests/utils/test_memory_monitor.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
_CGROUP_V2_STAT,
1818
_PROC_SELF_STATUS,
1919
MemoryMonitor,
20+
_format_bytes,
2021
_read_cgroup_v2_anon_bytes,
2122
_read_process_anon_rss_bytes,
2223
)
@@ -88,6 +89,46 @@ def test_check_interval_negative_raises() -> None:
8889
MemoryMonitor(check_interval=-1)
8990

9091

92+
def test_init_logs_configured_thresholds(caplog: pytest.LogCaptureFixture) -> None:
93+
"""Instantiation should emit one INFO line with the configured thresholds and intervals."""
94+
with caplog.at_level(logging.INFO, logger="airbyte"):
95+
MemoryMonitor(check_interval=1234)
96+
init_logs = [r for r in caplog.records if "MemoryMonitor instantiated" in r.message]
97+
assert len(init_logs) == 1
98+
msg = init_logs[0].message
99+
assert "critical threshold: 95%" in msg
100+
assert "anon share of usage threshold: 85%" in msg
101+
assert "high-pressure threshold: 90%" in msg
102+
assert "check interval: 1234 messages" in msg
103+
assert "tightens to 100 under high pressure" in msg
104+
105+
106+
# ---------------------------------------------------------------------------
107+
# _format_bytes — unit tests
108+
# ---------------------------------------------------------------------------
109+
110+
111+
@pytest.mark.parametrize(
112+
("num_bytes", "expected"),
113+
[
114+
(0, "0 B"),
115+
(999, "999 B"),
116+
(1_000, "1.00 KB"),
117+
(1_500, "1.50 KB"),
118+
(999_999, "1000.00 KB"),
119+
(1_000_000, "1.00 MB"),
120+
(960_000_000, "960.00 MB"),
121+
(999_999_999, "1000.00 MB"),
122+
(1_000_000_000, "1.00 GB"),
123+
(2_109_915_136, "2.11 GB"),
124+
(2_147_483_648, "2.15 GB"),
125+
],
126+
)
127+
def test_format_bytes(num_bytes: int, expected: str) -> None:
128+
"""`_format_bytes` renders byte counts with 2 decimals using decimal units."""
129+
assert _format_bytes(num_bytes) == expected
130+
131+
91132
# ---------------------------------------------------------------------------
92133
# check_memory_usage — no-op paths
93134
# ---------------------------------------------------------------------------
@@ -96,6 +137,7 @@ def test_check_interval_negative_raises() -> None:
96137
def test_noop_when_no_cgroup(caplog: pytest.LogCaptureFixture) -> None:
97138
"""check_memory_usage should be a no-op when cgroup is unavailable."""
98139
monitor = MemoryMonitor()
140+
caplog.clear() # discard the one-shot instantiation log
99141
with (
100142
caplog.at_level(logging.WARNING, logger="airbyte"),
101143
patch.object(Path, "exists", return_value=False),
@@ -107,6 +149,7 @@ def test_noop_when_no_cgroup(caplog: pytest.LogCaptureFixture) -> None:
107149
def test_noop_when_limit_is_max(caplog: pytest.LogCaptureFixture) -> None:
108150
"""When cgroup v2 memory.max is 'max' (unlimited), should be a no-op."""
109151
monitor = MemoryMonitor(check_interval=1)
152+
caplog.clear()
110153
with (
111154
caplog.at_level(logging.WARNING, logger="airbyte"),
112155
patch.object(Path, "exists", _v2_exists),
@@ -119,6 +162,7 @@ def test_noop_when_limit_is_max(caplog: pytest.LogCaptureFixture) -> None:
119162
def test_noop_when_limit_is_zero(caplog: pytest.LogCaptureFixture) -> None:
120163
"""When cgroup limit file contains '0', should be a no-op."""
121164
monitor = MemoryMonitor(check_interval=1)
165+
caplog.clear()
122166
with (
123167
caplog.at_level(logging.WARNING, logger="airbyte"),
124168
patch.object(Path, "exists", _v2_exists),
@@ -136,6 +180,7 @@ def test_noop_when_limit_is_zero(caplog: pytest.LogCaptureFixture) -> None:
136180
def test_no_log_below_threshold(caplog: pytest.LogCaptureFixture) -> None:
137181
"""No log should be emitted when usage is below the 90% high-pressure threshold."""
138182
monitor = MemoryMonitor(check_interval=1)
183+
caplog.clear()
139184
with (
140185
caplog.at_level(logging.DEBUG, logger="airbyte"),
141186
patch.object(Path, "exists", _v2_exists),
@@ -182,6 +227,7 @@ def mock_read_text(self: Path) -> str:
182227
def test_check_interval_skips_intermediate_calls(caplog: pytest.LogCaptureFixture) -> None:
183228
"""Monitor should only check cgroup files every check_interval messages."""
184229
monitor = MemoryMonitor(check_interval=5000)
230+
caplog.clear()
185231
with (
186232
caplog.at_level(logging.INFO, logger="airbyte"),
187233
patch.object(Path, "exists", _v2_exists),
@@ -205,6 +251,7 @@ def test_check_interval_skips_intermediate_calls(caplog: pytest.LogCaptureFixtur
205251
def test_malformed_cgroup_file_degrades_gracefully(caplog: pytest.LogCaptureFixture) -> None:
206252
"""Malformed cgroup files should not crash the sync."""
207253
monitor = MemoryMonitor(check_interval=1)
254+
caplog.clear()
208255
with (
209256
caplog.at_level(logging.WARNING, logger="airbyte"),
210257
patch.object(Path, "exists", _v2_exists),
@@ -217,6 +264,7 @@ def test_malformed_cgroup_file_degrades_gracefully(caplog: pytest.LogCaptureFixt
217264
def test_empty_cgroup_file_degrades_gracefully(caplog: pytest.LogCaptureFixture) -> None:
218265
"""Empty cgroup file content should not crash the sync."""
219266
monitor = MemoryMonitor(check_interval=1)
267+
caplog.clear()
220268
with (
221269
caplog.at_level(logging.WARNING, logger="airbyte"),
222270
patch.object(Path, "exists", _v2_exists),
@@ -233,6 +281,7 @@ def mock_read_text(self: Path) -> str:
233281
raise OSError("Permission denied")
234282

235283
monitor = MemoryMonitor(check_interval=1)
284+
caplog.clear()
236285
with (
237286
caplog.at_level(logging.WARNING, logger="airbyte"),
238287
patch.object(Path, "exists", _v2_exists),
@@ -342,6 +391,14 @@ def test_raises_when_cgroup_critical_and_anon_share_of_usage_above_threshold() -
342391
assert "critical threshold" in (exc_info.value.message or "")
343392
assert "96%" in (exc_info.value.message or "")
344393
assert "anon share of usage" in (exc_info.value.internal_message or "")
394+
# Human-readable byte formatting: 960 MB usage, 1.00 GB limit, 840 MB anon.
395+
internal = exc_info.value.internal_message or ""
396+
assert "960.00 MB" in internal
397+
assert "1.00 GB" in internal
398+
assert "840.00 MB" in internal
399+
# Raw byte counts should no longer appear in the message.
400+
assert "960000000" not in internal
401+
assert "1000000000" not in internal
345402

346403

347404
def test_no_raise_when_cgroup_critical_but_anon_share_of_usage_below_threshold(

0 commit comments

Comments
 (0)