Skip to content

Commit a5b7544

Browse files
committed
add memory benchmark
1 parent fa2863f commit a5b7544

File tree

1 file changed

+280
-0
lines changed

1 file changed

+280
-0
lines changed
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Memory benchmarks for manifest cache efficiency.
18+
19+
These benchmarks reproduce the manifest cache memory issue described in:
20+
https://github.com/apache/iceberg-python/issues/2325
21+
22+
The issue: When caching manifest lists as tuples, overlapping ManifestFile objects
23+
are duplicated across cache entries, causing O(N²) memory growth instead of O(N).
24+
25+
Run with: uv run pytest tests/benchmark/test_memory_benchmark.py -v -s -m benchmark
26+
"""
27+
28+
import gc
29+
import tracemalloc
30+
from datetime import datetime, timezone
31+
32+
import pyarrow as pa
33+
import pytest
34+
35+
from pyiceberg.catalog.memory import InMemoryCatalog
36+
from pyiceberg.manifest import _manifest_cache
37+
38+
39+
def generate_test_dataframe() -> pa.Table:
40+
"""Generate a PyArrow table for testing, similar to the issue's example."""
41+
n_rows = 100 # Smaller for faster tests, increase for more realistic benchmarks
42+
43+
return pa.table(
44+
{
45+
"event_type": ["playback"] * n_rows,
46+
"event_origin": ["origin1"] * n_rows,
47+
"event_send_at": [datetime.now(timezone.utc)] * n_rows,
48+
"event_saved_at": [datetime.now(timezone.utc)] * n_rows,
49+
"id": list(range(n_rows)),
50+
"reference_id": [f"ref-{i}" for i in range(n_rows)],
51+
}
52+
)
53+
54+
55+
@pytest.fixture
56+
def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> InMemoryCatalog:
57+
"""Create an in-memory catalog for memory testing."""
58+
warehouse_path = str(tmp_path_factory.mktemp("warehouse"))
59+
catalog = InMemoryCatalog("memory_test", warehouse=f"file://{warehouse_path}")
60+
catalog.create_namespace("default")
61+
return catalog
62+
63+
64+
@pytest.fixture(autouse=True)
65+
def clear_caches() -> None:
66+
"""Clear caches before each test."""
67+
_manifest_cache.clear()
68+
gc.collect()
69+
70+
71+
@pytest.mark.benchmark
72+
def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None:
73+
"""Benchmark memory growth of manifest cache during repeated appends.
74+
75+
This test reproduces the issue from GitHub #2325 where each append creates
76+
a new manifest list entry in the cache, causing memory to grow.
77+
78+
With the old caching strategy (tuple per manifest list), memory grew as O(N²).
79+
With the new strategy (individual ManifestFile objects), memory grows as O(N).
80+
"""
81+
df = generate_test_dataframe()
82+
table = memory_catalog.create_table("default.memory_test", schema=df.schema)
83+
84+
tracemalloc.start()
85+
86+
num_iterations = 50
87+
memory_samples: list[tuple[int, int, int]] = [] # (iteration, current_memory, cache_size)
88+
89+
print("\n--- Manifest Cache Memory Growth Benchmark ---")
90+
print(f"Running {num_iterations} append operations...")
91+
92+
for i in range(num_iterations):
93+
table.append(df)
94+
95+
# Sample memory at intervals
96+
if (i + 1) % 10 == 0:
97+
current, _ = tracemalloc.get_traced_memory()
98+
cache_size = len(_manifest_cache)
99+
100+
memory_samples.append((i + 1, current, cache_size))
101+
print(f" Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache entries={cache_size}")
102+
103+
tracemalloc.stop()
104+
105+
# Analyze memory growth
106+
if len(memory_samples) >= 2:
107+
first_memory = memory_samples[0][1]
108+
last_memory = memory_samples[-1][1]
109+
memory_growth = last_memory - first_memory
110+
growth_per_iteration = memory_growth / (memory_samples[-1][0] - memory_samples[0][0])
111+
112+
print("\nResults:")
113+
print(f" Initial memory: {first_memory / 1024:.1f} KB")
114+
print(f" Final memory: {last_memory / 1024:.1f} KB")
115+
print(f" Total growth: {memory_growth / 1024:.1f} KB")
116+
print(f" Growth per iteration: {growth_per_iteration:.1f} bytes")
117+
print(f" Final cache size: {memory_samples[-1][2]} entries")
118+
119+
# With efficient caching, growth should be roughly linear (O(N))
120+
# rather than quadratic (O(N²)) as it was before
121+
# Memory growth includes ManifestFile objects, metadata, and other overhead
122+
# We expect about 5-10 KB per iteration for typical workloads
123+
# The key improvement is that growth is O(N) not O(N²)
124+
assert growth_per_iteration < 15000, (
125+
f"Memory growth per iteration ({growth_per_iteration:.0f} bytes) is too high. "
126+
"This may indicate the O(N²) cache inefficiency is present."
127+
)
128+
129+
130+
@pytest.mark.benchmark
131+
def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) -> None:
132+
"""Test that clearing the cache allows memory to be reclaimed.
133+
134+
This test verifies that when we clear the manifest cache, the associated
135+
memory can be garbage collected.
136+
"""
137+
df = generate_test_dataframe()
138+
table = memory_catalog.create_table("default.gc_test", schema=df.schema)
139+
140+
tracemalloc.start()
141+
142+
print("\n--- Memory After GC Benchmark ---")
143+
144+
# Phase 1: Fill the cache
145+
print("Phase 1: Filling cache with 20 appends...")
146+
for _ in range(20):
147+
table.append(df)
148+
149+
gc.collect()
150+
before_clear_memory, _ = tracemalloc.get_traced_memory()
151+
cache_size_before = len(_manifest_cache)
152+
print(f" Memory before clear: {before_clear_memory / 1024:.1f} KB")
153+
print(f" Cache size: {cache_size_before}")
154+
155+
# Phase 2: Clear cache and GC
156+
print("\nPhase 2: Clearing cache and running GC...")
157+
_manifest_cache.clear()
158+
gc.collect()
159+
gc.collect() # Multiple GC passes for thorough cleanup
160+
161+
after_clear_memory, _ = tracemalloc.get_traced_memory()
162+
print(f" Memory after clear: {after_clear_memory / 1024:.1f} KB")
163+
print(f" Memory reclaimed: {(before_clear_memory - after_clear_memory) / 1024:.1f} KB")
164+
165+
tracemalloc.stop()
166+
167+
memory_reclaimed = before_clear_memory - after_clear_memory
168+
print("\nResults:")
169+
print(f" Memory reclaimed by clearing cache: {memory_reclaimed / 1024:.1f} KB")
170+
171+
172+
@pytest.mark.benchmark
173+
def test_manifest_cache_deduplication_efficiency() -> None:
174+
"""Benchmark the efficiency of the per-ManifestFile caching strategy.
175+
176+
This test verifies that when multiple manifest lists share the same
177+
ManifestFile objects, they are properly deduplicated in the cache.
178+
"""
179+
from tempfile import TemporaryDirectory
180+
181+
from pyiceberg.io.pyarrow import PyArrowFileIO
182+
from pyiceberg.manifest import (
183+
DataFile,
184+
DataFileContent,
185+
FileFormat,
186+
ManifestEntry,
187+
ManifestEntryStatus,
188+
read_manifest_list,
189+
write_manifest,
190+
write_manifest_list,
191+
)
192+
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
193+
from pyiceberg.schema import Schema
194+
from pyiceberg.typedef import Record
195+
from pyiceberg.types import IntegerType, NestedField
196+
197+
io = PyArrowFileIO()
198+
199+
print("\n--- Manifest Cache Deduplication Benchmark ---")
200+
201+
with TemporaryDirectory() as tmp_dir:
202+
schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True))
203+
spec = UNPARTITIONED_PARTITION_SPEC
204+
205+
# Create N manifest files
206+
num_manifests = 20
207+
manifest_files = []
208+
209+
print(f"Creating {num_manifests} manifest files...")
210+
for i in range(num_manifests):
211+
manifest_path = f"{tmp_dir}/manifest_{i}.avro"
212+
with write_manifest(
213+
format_version=2,
214+
spec=spec,
215+
schema=schema,
216+
output_file=io.new_output(manifest_path),
217+
snapshot_id=i + 1,
218+
avro_compression="null",
219+
) as writer:
220+
data_file = DataFile.from_args(
221+
content=DataFileContent.DATA,
222+
file_path=f"{tmp_dir}/data_{i}.parquet",
223+
file_format=FileFormat.PARQUET,
224+
partition=Record(),
225+
record_count=100,
226+
file_size_in_bytes=1000,
227+
)
228+
writer.add_entry(
229+
ManifestEntry.from_args(
230+
status=ManifestEntryStatus.ADDED,
231+
snapshot_id=i + 1,
232+
data_file=data_file,
233+
)
234+
)
235+
manifest_files.append(writer.to_manifest_file())
236+
237+
# Create multiple manifest lists with overlapping manifest files
238+
# List i contains manifest files 0 through i
239+
num_lists = 10
240+
print(f"Creating {num_lists} manifest lists with overlapping manifests...")
241+
242+
_manifest_cache.clear()
243+
244+
for i in range(num_lists):
245+
list_path = f"{tmp_dir}/manifest-list_{i}.avro"
246+
manifests_to_include = manifest_files[: i + 1]
247+
248+
with write_manifest_list(
249+
format_version=2,
250+
output_file=io.new_output(list_path),
251+
snapshot_id=i + 1,
252+
parent_snapshot_id=i if i > 0 else None,
253+
sequence_number=i + 1,
254+
avro_compression="null",
255+
) as list_writer:
256+
list_writer.add_manifests(manifests_to_include)
257+
258+
# Read the manifest list (this populates the cache)
259+
input_file = io.new_input(list_path)
260+
list(read_manifest_list(input_file))
261+
262+
# Analyze cache efficiency
263+
cache_entries = len(_manifest_cache)
264+
265+
print("\nResults:")
266+
print(f" Manifest lists created: {num_lists}")
267+
print(f" Total unique manifest files: {num_manifests}")
268+
print(f" Cache entries: {cache_entries}")
269+
270+
# With efficient per-ManifestFile caching, we should have at most
271+
# num_manifests entries (one per unique manifest path), not
272+
# sum(1..num_lists) entries as with the old strategy
273+
print(f"\n Expected cache entries (efficient): <= {num_manifests}")
274+
print(f" Actual cache entries: {cache_entries}")
275+
276+
# The cache should be efficient - one entry per unique manifest path
277+
assert cache_entries <= num_manifests + num_lists, (
278+
f"Cache has {cache_entries} entries, expected at most {num_manifests + num_lists}. "
279+
"The cache may not be deduplicating properly."
280+
)

0 commit comments

Comments
 (0)