-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathtest_memory.py
More file actions
244 lines (197 loc) · 8.66 KB
/
test_memory.py
File metadata and controls
244 lines (197 loc) · 8.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
#
"""Tests for airbyte_cdk.metrics.memory module."""
import tracemalloc
from pathlib import Path
from unittest.mock import patch
import pytest
from airbyte_cdk.metrics.memory import (
ENV_CDK_TRACEMALLOC_ENABLED,
MemoryInfo,
_read_cgroup_v1_memory,
_read_cgroup_v2_memory,
_read_rusage_memory,
get_memory_info,
get_python_heap_bytes,
)
class TestMemoryInfo:
def test_usage_percent_with_limit(self) -> None:
info = MemoryInfo(usage_bytes=500, limit_bytes=1000)
assert info.usage_percent == 0.5
def test_usage_percent_no_limit(self) -> None:
info = MemoryInfo(usage_bytes=500, limit_bytes=None)
assert info.usage_percent is None
def test_usage_percent_zero_limit(self) -> None:
info = MemoryInfo(usage_bytes=500, limit_bytes=0)
assert info.usage_percent is None
def test_frozen_dataclass(self) -> None:
info = MemoryInfo(usage_bytes=100, limit_bytes=200)
with pytest.raises(AttributeError):
info.usage_bytes = 300 # type: ignore[misc]
class TestCgroupV2Memory:
def test_reads_memory_current_and_max(self, tmp_path: Path) -> None:
current_file = tmp_path / "memory.current"
max_file = tmp_path / "memory.max"
current_file.write_text("104857600\n")
max_file.write_text("209715200\n")
with (
patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", current_file),
patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_MAX", max_file),
):
info = _read_cgroup_v2_memory()
assert info is not None
assert info.usage_bytes == 104857600
assert info.limit_bytes == 209715200
assert info.usage_percent == pytest.approx(0.5)
def test_memory_max_is_max_string(self, tmp_path: Path) -> None:
"""When cgroup reports 'max', it means no limit is set."""
current_file = tmp_path / "memory.current"
max_file = tmp_path / "memory.max"
current_file.write_text("104857600\n")
max_file.write_text("max\n")
with (
patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", current_file),
patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_MAX", max_file),
):
info = _read_cgroup_v2_memory()
assert info is not None
assert info.usage_bytes == 104857600
assert info.limit_bytes is None
assert info.usage_percent is None
def test_returns_none_when_files_missing(self) -> None:
with patch(
"airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", Path("/nonexistent/path")
):
info = _read_cgroup_v2_memory()
assert info is None
def test_handles_invalid_content(self, tmp_path: Path) -> None:
current_file = tmp_path / "memory.current"
current_file.write_text("not_a_number\n")
with patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", current_file):
info = _read_cgroup_v2_memory()
assert info is None
class TestCgroupV1Memory:
def test_reads_usage_and_limit(self, tmp_path: Path) -> None:
usage_file = tmp_path / "memory.usage_in_bytes"
limit_file = tmp_path / "memory.limit_in_bytes"
usage_file.write_text("104857600\n")
limit_file.write_text("209715200\n")
with (
patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_USAGE", usage_file),
patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_LIMIT", limit_file),
):
info = _read_cgroup_v1_memory()
assert info is not None
assert info.usage_bytes == 104857600
assert info.limit_bytes == 209715200
def test_very_large_limit_treated_as_no_limit(self, tmp_path: Path) -> None:
"""Very large cgroup v1 limits (near 2^63) indicate no real limit."""
usage_file = tmp_path / "memory.usage_in_bytes"
limit_file = tmp_path / "memory.limit_in_bytes"
usage_file.write_text("104857600\n")
limit_file.write_text(f"{2**63}\n")
with (
patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_USAGE", usage_file),
patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_LIMIT", limit_file),
):
info = _read_cgroup_v1_memory()
assert info is not None
assert info.limit_bytes is None
def test_returns_none_when_files_missing(self) -> None:
with patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_USAGE", Path("/nonexistent/path")):
info = _read_cgroup_v1_memory()
assert info is None
class TestRusageMemory:
def test_returns_memory_info(self) -> None:
info = _read_rusage_memory()
assert info.usage_bytes > 0
assert info.limit_bytes is None
class TestGetMemoryInfo:
def test_prefers_cgroup_v2(self, tmp_path: Path) -> None:
current_file = tmp_path / "memory.current"
max_file = tmp_path / "memory.max"
current_file.write_text("100\n")
max_file.write_text("200\n")
with (
patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", current_file),
patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_MAX", max_file),
):
info = get_memory_info()
assert info.usage_bytes == 100
assert info.limit_bytes == 200
def test_falls_back_to_cgroup_v1(self, tmp_path: Path) -> None:
usage_file = tmp_path / "memory.usage_in_bytes"
limit_file = tmp_path / "memory.limit_in_bytes"
usage_file.write_text("300\n")
limit_file.write_text("600\n")
with (
patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", Path("/nonexistent")),
patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_USAGE", usage_file),
patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_LIMIT", limit_file),
):
info = get_memory_info()
assert info.usage_bytes == 300
assert info.limit_bytes == 600
def test_falls_back_to_rusage(self) -> None:
with (
patch("airbyte_cdk.metrics.memory.CGROUP_V2_MEMORY_CURRENT", Path("/nonexistent")),
patch("airbyte_cdk.metrics.memory.CGROUP_V1_MEMORY_USAGE", Path("/nonexistent")),
):
info = get_memory_info()
assert info.usage_bytes > 0
assert info.limit_bytes is None
class TestGetPythonHeapBytes:
"""Tests for the opt-in tracemalloc-based heap metric."""
def test_returns_none_when_env_var_not_set(self) -> None:
with patch.dict("os.environ", {}, clear=True):
result = get_python_heap_bytes()
assert result is None
def test_returns_none_when_env_var_is_empty(self) -> None:
with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: ""}):
result = get_python_heap_bytes()
assert result is None
def test_returns_none_when_env_var_is_false(self) -> None:
with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "false"}):
result = get_python_heap_bytes()
assert result is None
def test_returns_bytes_when_enabled_with_true(self) -> None:
was_tracing = tracemalloc.is_tracing()
try:
with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "true"}):
result = get_python_heap_bytes()
assert result is not None
assert isinstance(result, int)
assert result >= 0
finally:
if not was_tracing:
tracemalloc.stop()
def test_returns_bytes_when_enabled_with_1(self) -> None:
was_tracing = tracemalloc.is_tracing()
try:
with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "1"}):
result = get_python_heap_bytes()
assert result is not None
assert isinstance(result, int)
finally:
if not was_tracing:
tracemalloc.stop()
def test_returns_bytes_when_enabled_with_yes(self) -> None:
was_tracing = tracemalloc.is_tracing()
try:
with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "yes"}):
result = get_python_heap_bytes()
assert result is not None
assert isinstance(result, int)
finally:
if not was_tracing:
tracemalloc.stop()
def test_case_insensitive(self) -> None:
was_tracing = tracemalloc.is_tracing()
try:
with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "TRUE"}):
result = get_python_heap_bytes()
assert result is not None
finally:
if not was_tracing:
tracemalloc.stop()