Skip to content

Commit 356f73d

Browse files
authored
Fix hotspots dask performance: 2-pass streaming with O(chunk_size) memory (#855)
Rewrite _hotspots_dask_numpy to eliminate the global barrier that forced materialization of the full intermediate convolution output. The old implementation created a task graph where every chunk's z-score depended on both the per-chunk convolution and global reductions over ALL chunks, making it infeasible for datasets larger than RAM. New approach: - Pass 1: eagerly compute global_mean/global_std (two scalars, single co-scheduled graph traversal via da.compute) - Pass 2: fuse convolution + z-score + classification into one map_overlap call, so each chunk streams through independently Also fixes: redundant astype calls, pointwise map_overlap misuse, separate nanmean/nanstd accumulation, and re-enables zero-std check.
1 parent e1370ef commit 356f73d

File tree

1 file changed

+41
-23
lines changed

1 file changed

+41
-23
lines changed

xrspatial/focal.py

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
class cupy(object):
2828
ndarray = False
2929

30-
from xrspatial.convolution import convolve_2d, custom_kernel
30+
from xrspatial.convolution import convolve_2d, custom_kernel, _convolve_2d_numpy
3131
from xrspatial.utils import ArrayTypeFunctionMapping, cuda_args, ngjit, not_implemented_func
3232
from xrspatial.dataset_support import supports_dataset
3333

@@ -938,32 +938,50 @@ def _hotspots_numpy(raster, kernel):
938938

939939

940940
def _hotspots_dask_numpy(raster, kernel):
941-
data = raster.data.astype(np.float32)
941+
data = raster.data
942+
if not np.issubdtype(data.dtype, np.floating):
943+
data = data.astype(np.float32)
944+
945+
# Pass 1: eagerly compute global statistics (two scalars).
946+
# This reads all chunks once, produces 16 bytes, then frees all
947+
# intermediate state -- no barrier that would force materialization
948+
# of the full convolution output.
949+
global_mean, global_std = da.compute(da.nanmean(data), da.nanstd(data))
950+
global_mean = np.float32(global_mean)
951+
global_std = np.float32(global_std)
942952

943-
# apply kernel to raster values
944-
mean_array = convolve_2d(data, kernel / kernel.sum())
945-
946-
# calculate z-scores
947-
global_mean = da.nanmean(data)
948-
global_std = da.nanstd(data)
949-
950-
# commented out to avoid early compute to check if global_std is zero
951-
# if global_std == 0:
952-
# raise ZeroDivisionError(
953-
# "Standard deviation of the input raster values is 0."
954-
# )
953+
if global_std == 0:
954+
raise ZeroDivisionError(
955+
"Standard deviation of the input raster values is 0."
956+
)
955957

956-
z_array = (mean_array - global_mean) / global_std
958+
norm_kernel = (kernel / kernel.sum()).astype(np.float32)
959+
pad_h = norm_kernel.shape[0] // 2
960+
pad_w = norm_kernel.shape[1] // 2
961+
962+
# Pass 2: fuse convolution + z-score + classification into one
963+
# map_overlap call. Each chunk reads source + halo, produces int8
964+
# output, and frees all intermediates immediately. No spill needed.
965+
_func = partial(
966+
_hotspots_chunk,
967+
kernel=norm_kernel,
968+
global_mean=global_mean,
969+
global_std=global_std,
970+
)
971+
out = data.map_overlap(
972+
_func,
973+
depth=(pad_h, pad_w),
974+
boundary=np.nan,
975+
meta=np.array((), dtype=np.int8),
976+
)
977+
return out
957978

958-
_func = partial(_calc_hotspots_numpy)
959-
pad_h = kernel.shape[0] // 2
960-
pad_w = kernel.shape[1] // 2
961979

962-
out = z_array.map_overlap(_func,
963-
depth=(pad_h, pad_w),
964-
boundary=np.nan,
965-
meta=np.array(()))
966-
return out
980+
def _hotspots_chunk(chunk, kernel, global_mean, global_std):
981+
"""Fused per-chunk: convolve -> z-score -> classify."""
982+
convolved = _convolve_2d_numpy(chunk, kernel)
983+
z = (convolved - global_mean) / global_std
984+
return _calc_hotspots_numpy(z)
967985

968986

969987
@nb.cuda.jit(device=True)

0 commit comments

Comments
 (0)