Skip to content

Commit f7f4c4e

Browse files
Add: Unit tests for heapq operation complexity verification
New test file: tests/test_heapq_complexity.py Tests verify documented time complexity for heapq: - O(n): heapify - O(log n): heappush, heappop, heappushpop, heapreplace - O(N log k): nlargest, nsmallest, merge - Also includes heap invariant correctness tests Co-Authored-By: Amp <amp@ampcode.com>
1 parent ed8fb46 commit f7f4c4e

1 file changed

Lines changed: 279 additions & 0 deletions

File tree

tests/test_heapq_complexity.py

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
"""Tests to verify documented time complexity of heapq module operations.
2+
3+
These tests use timing measurements to verify that operations scale
4+
according to their documented complexity.
5+
"""
6+
7+
import heapq
8+
import math
9+
import time
10+
from typing import Any, Callable
11+
12+
13+
def measure_time(func: Callable[[], Any], iterations: int = 100) -> float:
14+
"""Measure average time for a function over multiple iterations."""
15+
start = time.perf_counter()
16+
for _ in range(iterations):
17+
func()
18+
end = time.perf_counter()
19+
return (end - start) / iterations
20+
21+
22+
def is_constant_time(
23+
small_time: float, large_time: float, tolerance: float = 3.0
24+
) -> bool:
25+
"""Check if two times are within tolerance (suggesting O(1))."""
26+
if small_time == 0:
27+
return large_time < 1e-6
28+
ratio = large_time / small_time
29+
return ratio < tolerance
30+
31+
32+
def is_linear_time(
33+
small_time: float,
34+
large_time: float,
35+
size_ratio: float,
36+
tolerance: float = 3.0,
37+
) -> bool:
38+
"""Check if time scales linearly with size."""
39+
if small_time == 0:
40+
return True
41+
ratio = large_time / small_time
42+
expected_ratio = size_ratio
43+
return ratio < expected_ratio * tolerance
44+
45+
46+
def is_logarithmic_time(
47+
small_time: float,
48+
large_time: float,
49+
small_size: int,
50+
large_size: int,
51+
tolerance: float = 3.0,
52+
) -> bool:
53+
"""Check if time scales logarithmically with size."""
54+
if small_time == 0:
55+
return True
56+
ratio = large_time / small_time
57+
expected_ratio = math.log2(large_size) / math.log2(small_size)
58+
return ratio < expected_ratio * tolerance
59+
60+
61+
class TestHeapqComplexity:
62+
"""Test heapq operation complexities as documented in docs/stdlib/heapq.md."""
63+
64+
SMALL_SIZE = 1_000
65+
LARGE_SIZE = 100_000
66+
SIZE_RATIO = LARGE_SIZE / SMALL_SIZE
67+
68+
def test_heapify_is_on(self) -> None:
69+
"""heapify() should be O(n)."""
70+
small_list = list(range(self.SMALL_SIZE, 0, -1))
71+
large_list = list(range(self.LARGE_SIZE, 0, -1))
72+
73+
def heapify_small() -> None:
74+
lst = small_list.copy()
75+
heapq.heapify(lst)
76+
77+
def heapify_large() -> None:
78+
lst = large_list.copy()
79+
heapq.heapify(lst)
80+
81+
small_time = measure_time(heapify_small, iterations=50)
82+
large_time = measure_time(heapify_large, iterations=50)
83+
84+
assert is_linear_time(small_time, large_time, self.SIZE_RATIO), (
85+
f"heapify() doesn't appear linear: {small_time:.2e}s vs {large_time:.2e}s"
86+
)
87+
88+
def test_heappush_is_ologn(self) -> None:
89+
"""heappush() should be O(log n)."""
90+
small_heap = list(range(self.SMALL_SIZE))
91+
large_heap = list(range(self.LARGE_SIZE))
92+
heapq.heapify(small_heap)
93+
heapq.heapify(large_heap)
94+
95+
def push_small() -> None:
96+
heapq.heappush(small_heap, 0)
97+
heapq.heappop(small_heap)
98+
99+
def push_large() -> None:
100+
heapq.heappush(large_heap, 0)
101+
heapq.heappop(large_heap)
102+
103+
small_time = measure_time(push_small)
104+
large_time = measure_time(push_large)
105+
106+
assert is_logarithmic_time(
107+
small_time, large_time, self.SMALL_SIZE, self.LARGE_SIZE
108+
), f"heappush() doesn't appear O(log n): {small_time:.2e}s vs {large_time:.2e}s"
109+
110+
def test_heappop_is_ologn(self) -> None:
111+
"""heappop() should be O(log n)."""
112+
small_heap = list(range(self.SMALL_SIZE))
113+
large_heap = list(range(self.LARGE_SIZE))
114+
heapq.heapify(small_heap)
115+
heapq.heapify(large_heap)
116+
117+
def pop_small() -> None:
118+
val = heapq.heappop(small_heap)
119+
heapq.heappush(small_heap, val)
120+
121+
def pop_large() -> None:
122+
val = heapq.heappop(large_heap)
123+
heapq.heappush(large_heap, val)
124+
125+
small_time = measure_time(pop_small)
126+
large_time = measure_time(pop_large)
127+
128+
assert is_logarithmic_time(
129+
small_time, large_time, self.SMALL_SIZE, self.LARGE_SIZE
130+
), f"heappop() doesn't appear O(log n): {small_time:.2e}s vs {large_time:.2e}s"
131+
132+
def test_heappushpop_is_ologn(self) -> None:
133+
"""heappushpop() should be O(log n)."""
134+
small_heap = list(range(self.SMALL_SIZE))
135+
large_heap = list(range(self.LARGE_SIZE))
136+
heapq.heapify(small_heap)
137+
heapq.heapify(large_heap)
138+
139+
def pushpop_small() -> None:
140+
val = heapq.heappushpop(small_heap, -1)
141+
heapq.heappush(small_heap, val)
142+
143+
def pushpop_large() -> None:
144+
val = heapq.heappushpop(large_heap, -1)
145+
heapq.heappush(large_heap, val)
146+
147+
small_time = measure_time(pushpop_small)
148+
large_time = measure_time(pushpop_large)
149+
150+
assert is_logarithmic_time(
151+
small_time, large_time, self.SMALL_SIZE, self.LARGE_SIZE
152+
), (
153+
f"heappushpop() doesn't appear O(log n): "
154+
f"{small_time:.2e}s vs {large_time:.2e}s"
155+
)
156+
157+
def test_heapreplace_is_ologn(self) -> None:
158+
"""heapreplace() should be O(log n)."""
159+
small_heap = list(range(self.SMALL_SIZE))
160+
large_heap = list(range(self.LARGE_SIZE))
161+
heapq.heapify(small_heap)
162+
heapq.heapify(large_heap)
163+
164+
def replace_small() -> None:
165+
val = heapq.heapreplace(small_heap, -1)
166+
heapq.heappush(small_heap, val)
167+
heapq.heappop(small_heap)
168+
169+
def replace_large() -> None:
170+
val = heapq.heapreplace(large_heap, -1)
171+
heapq.heappush(large_heap, val)
172+
heapq.heappop(large_heap)
173+
174+
small_time = measure_time(replace_small)
175+
large_time = measure_time(replace_large)
176+
177+
assert is_logarithmic_time(
178+
small_time, large_time, self.SMALL_SIZE, self.LARGE_SIZE
179+
), (
180+
f"heapreplace() doesn't appear O(log n): "
181+
f"{small_time:.2e}s vs {large_time:.2e}s"
182+
)
183+
184+
def test_nlargest_scales_with_n(self) -> None:
185+
"""nlargest(k, iterable) should be O(N log k) where N = iterable length."""
186+
small_data = list(range(self.SMALL_SIZE))
187+
large_data = list(range(self.LARGE_SIZE))
188+
k = 10
189+
190+
small_time = measure_time(lambda: heapq.nlargest(k, small_data), iterations=50)
191+
large_time = measure_time(lambda: heapq.nlargest(k, large_data), iterations=50)
192+
193+
assert is_linear_time(small_time, large_time, self.SIZE_RATIO), (
194+
f"nlargest() doesn't scale linearly with N: "
195+
f"{small_time:.2e}s vs {large_time:.2e}s"
196+
)
197+
198+
def test_nsmallest_scales_with_n(self) -> None:
199+
"""nsmallest(k, iterable) should be O(N log k) where N = iterable length."""
200+
small_data = list(range(self.SMALL_SIZE))
201+
large_data = list(range(self.LARGE_SIZE))
202+
k = 10
203+
204+
small_time = measure_time(
205+
lambda: heapq.nsmallest(k, small_data), iterations=50
206+
)
207+
large_time = measure_time(
208+
lambda: heapq.nsmallest(k, large_data), iterations=50
209+
)
210+
211+
assert is_linear_time(small_time, large_time, self.SIZE_RATIO), (
212+
f"nsmallest() doesn't scale linearly with N: "
213+
f"{small_time:.2e}s vs {large_time:.2e}s"
214+
)
215+
216+
def test_nlargest_scales_with_k(self) -> None:
217+
"""nlargest with larger k should be slower (O(N log k))."""
218+
data = list(range(self.LARGE_SIZE))
219+
220+
small_k_time = measure_time(lambda: heapq.nlargest(10, data), iterations=20)
221+
large_k_time = measure_time(lambda: heapq.nlargest(1000, data), iterations=20)
222+
223+
assert large_k_time > small_k_time, (
224+
f"nlargest() should be slower with larger k: "
225+
f"k=10: {small_k_time:.2e}s vs k=1000: {large_k_time:.2e}s"
226+
)
227+
228+
def test_merge_scales_with_total_items(self) -> None:
229+
"""merge() should scale with total items O(n log k)."""
230+
small_lists = [list(range(i, i + 100)) for i in range(10)]
231+
large_lists = [list(range(i, i + 10000)) for i in range(10)]
232+
233+
def merge_small() -> None:
234+
list(heapq.merge(*small_lists))
235+
236+
def merge_large() -> None:
237+
list(heapq.merge(*large_lists))
238+
239+
small_time = measure_time(merge_small, iterations=50)
240+
large_time = measure_time(merge_large, iterations=50)
241+
242+
assert is_linear_time(small_time, large_time, 100), (
243+
f"merge() doesn't scale linearly with total items: "
244+
f"{small_time:.2e}s vs {large_time:.2e}s"
245+
)
246+
247+
def test_heap_maintains_invariant(self) -> None:
248+
"""Verify heap property is maintained after operations."""
249+
import random
250+
251+
data = list(range(1000))
252+
random.shuffle(data)
253+
254+
heap: list[int] = []
255+
for item in data:
256+
heapq.heappush(heap, item)
257+
258+
sorted_result = []
259+
while heap:
260+
sorted_result.append(heapq.heappop(heap))
261+
262+
assert sorted_result == sorted(data), "Heap did not maintain sorted order"
263+
264+
def test_heapify_produces_valid_heap(self) -> None:
265+
"""Verify heapify produces valid min-heap."""
266+
import random
267+
268+
data = list(range(1000))
269+
random.shuffle(data)
270+
271+
heapq.heapify(data)
272+
273+
for i in range(len(data)):
274+
left = 2 * i + 1
275+
right = 2 * i + 2
276+
if left < len(data):
277+
assert data[i] <= data[left], f"Heap violated at {i} vs left {left}"
278+
if right < len(data):
279+
assert data[i] <= data[right], f"Heap violated at {i} vs right {right}"

0 commit comments

Comments
 (0)