Skip to content

Commit ca8825a

Browse files
authored
Fix balanced_allocation OOM: lazy source extraction + memory guard (#1115)
* Add sweep-performance design spec Parallel subagent triage + ralph-loop workflow for auditing all xrspatial modules for performance bottlenecks, OOM risk under 30TB dask workloads, and backend-specific anti-patterns. * Add sweep-performance implementation plan 7 tasks covering command scaffold, module scoring, parallel subagent dispatch, report merging, ralph-loop generation, and smoke tests. * Add sweep-performance slash command * Fix balanced_allocation OOM: lazy source extraction + memory guard (#1114) - _extract_sources now uses da.unique() for dask arrays instead of materializing the full raster to find source IDs - Add memory guard before computing N cost surfaces: estimates N * array_bytes + overhead and raises MemoryError if it would exceed 80% of available RAM * Add tests for balanced_allocation memory guard and lazy extraction (#1114)
1 parent 05798a8 commit ca8825a

File tree

2 files changed

+66
-6
lines changed

2 files changed

+66
-6
lines changed

xrspatial/balanced_allocation.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,23 @@ def _as_numpy(arr):
5252

5353

5454
def _extract_sources(raster, target_values):
55-
"""Return sorted array of unique source IDs from the raster."""
56-
data = _to_numpy(raster.data)
55+
"""Return sorted array of unique source IDs from the raster.
56+
57+
For dask arrays, uses ``da.unique`` (per-chunk reduction) so the full
58+
raster is never pulled into RAM just to discover source IDs.
59+
"""
5760
if len(target_values) > 0:
5861
ids = np.asarray(target_values, dtype=np.float64)
59-
else:
60-
mask = np.isfinite(data) & (data != 0)
61-
ids = np.unique(data[mask])
62-
return ids[np.isfinite(ids)]
62+
return ids[np.isfinite(ids)]
63+
64+
data = raster.data
65+
if da is not None and isinstance(data, da.Array):
66+
uniq = da.unique(data).compute() # small result array
67+
mask = np.isfinite(uniq) & (uniq != 0)
68+
return np.sort(uniq[mask])
69+
data_np = _to_numpy(data)
70+
mask = np.isfinite(data_np) & (data_np != 0)
71+
return np.unique(data_np[mask])
6372

6473

6574
def _make_single_source_raster(raster, source_id):
@@ -297,6 +306,25 @@ def balanced_allocation(
297306
return xr.DataArray(out.astype(np.float32), coords=raster.coords,
298307
dims=raster.dims, attrs=raster.attrs)
299308

309+
# Memory guard: we hold N cost surfaces + friction simultaneously.
310+
# Estimate total footprint before doing any expensive work.
311+
array_bytes = np.prod(raster.shape) * 8 # float64
312+
# N cost surfaces + friction + allocation + stacked intermediate
313+
total_estimate = array_bytes * (n_sources + 3)
314+
try:
315+
from xrspatial.zonal import _available_memory_bytes
316+
avail = _available_memory_bytes()
317+
except ImportError:
318+
avail = 2 * 1024**3
319+
if total_estimate > 0.8 * avail:
320+
raise MemoryError(
321+
f"balanced_allocation with {n_sources} sources needs "
322+
f"~{total_estimate / 1e9:.1f} GB ({n_sources} cost surfaces "
323+
f"+ friction + intermediates) but only ~{avail / 1e9:.1f} GB "
324+
f"available. Reduce the number of sources, downsample the "
325+
f"raster, or increase available memory."
326+
)
327+
300328
# Step 1: compute per-source cost-distance surfaces
301329
cost_surfaces = [] # list of raw data arrays (numpy/cupy/dask)
302330
for sid in source_ids:

xrspatial/tests/test_balanced_allocation.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,3 +268,35 @@ def test_target_values():
268268
unique = set(np.unique(out[np.isfinite(out)]))
269269
assert 3.0 not in unique
270270
assert {1.0, 2.0} == unique
271+
272+
273+
@pytest.mark.skipif(da is None, reason="dask not installed")
274+
def test_balanced_allocation_memory_guard():
275+
"""Memory guard should raise before computing N cost surfaces."""
276+
from unittest.mock import patch
277+
278+
data = np.zeros((100, 100))
279+
data[10, 10] = 1.0
280+
data[90, 90] = 2.0
281+
282+
raster = _make_raster(data, backend='dask+numpy', chunks=(50, 50))
283+
friction = _make_raster(np.ones((100, 100)), backend='dask+numpy', chunks=(50, 50))
284+
285+
# Mock available memory to 1 KB so the guard trips
286+
with patch('xrspatial.zonal._available_memory_bytes', return_value=1024):
287+
with pytest.raises(MemoryError, match="balanced_allocation with 2 sources"):
288+
balanced_allocation(raster, friction)
289+
290+
291+
@pytest.mark.skipif(da is None, reason="dask not installed")
292+
def test_extract_sources_dask_no_materialize():
293+
"""_extract_sources should use da.unique, not materialize the full array."""
294+
from xrspatial.balanced_allocation import _extract_sources
295+
296+
data = np.zeros((50, 50))
297+
data[10, 10] = 1.0
298+
data[40, 40] = 2.0
299+
raster = _make_raster(data, backend='dask+numpy', chunks=(25, 25))
300+
301+
ids = _extract_sources(raster, target_values=[])
302+
np.testing.assert_array_equal(ids, [1.0, 2.0])

0 commit comments

Comments
 (0)