Skip to content

Commit 7cad73b

Browse files
authored
Add native Dask+CuPy backends for hydrology core functions (#952) (#966)
Replace CPU fallback with native GPU tile kernels for flow_accumulation, watershed, basin, stream_order, stream_link, and snap_pour_point when running on Dask+CuPy arrays. Each function now runs its existing CUDA kernels per-tile with seed injection at tile boundaries, keeping data GPU-resident throughout the iterative tile sweep. Also adds a native CUDA kernel for snap_pour_point's single-GPU CuPy path.
1 parent f08cb0f commit 7cad73b

13 files changed

+1377
-118
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -286,13 +286,13 @@ In the GIS world, rasters are used for representing continuous phenomena (e.g. e
286286
| [Flow Direction (D8)](xrspatial/flow_direction.py) | Computes D8 flow direction from each cell toward the steepest downhill neighbor | ✅️ | ✅️ | ✅️ | ✅️ |
287287
| [Flow Direction (Dinf)](xrspatial/flow_direction_dinf.py) | Computes D-infinity flow direction as a continuous angle toward the steepest downslope facet | ✅️ | ✅️ | ✅️ | ✅️ |
288288
| [Flow Direction (MFD)](xrspatial/flow_direction_mfd.py) | Partitions flow to all downslope neighbors with an adaptive exponent (Qin et al. 2007) | ✅️ | ✅️ | ✅️ | ✅️ |
289-
| [Flow Accumulation (D8)](xrspatial/flow_accumulation.py) | Counts upstream cells draining through each cell in a D8 flow direction grid | ✅️ | ✅️ | ✅️ | 🔄 |
289+
| [Flow Accumulation (D8)](xrspatial/flow_accumulation.py) | Counts upstream cells draining through each cell in a D8 flow direction grid | ✅️ | ✅️ | ✅️ | ✅️ |
290290
| [Flow Accumulation (MFD)](xrspatial/flow_accumulation_mfd.py) | Accumulates upstream area through all MFD flow paths weighted by directional fractions | ✅️ | ✅️ | ✅️ | 🔄 |
291-
| [Watershed](xrspatial/watershed.py) | Labels each cell with the pour point it drains to via D8 flow direction | ✅️ | ✅️ | ✅️ | 🔄 |
292-
| [Basins](xrspatial/watershed.py) | Delineates drainage basins by labeling each cell with its outlet ID | ✅️ | ✅️ | ✅️ | 🔄 |
293-
| [Stream Order](xrspatial/stream_order.py) | Assigns Strahler or Shreve stream order to cells in a drainage network | ✅️ | ✅️ | ✅️ | 🔄 |
294-
| [Stream Link](xrspatial/stream_link.py) | Assigns unique IDs to each stream segment between junctions | ✅️ | ✅️ | ✅️ | 🔄 |
295-
| [Snap Pour Point](xrspatial/snap_pour_point.py) | Snaps pour points to the highest-accumulation cell within a search radius | ✅️ | ✅️ | 🔄 | 🔄 |
291+
| [Watershed](xrspatial/watershed.py) | Labels each cell with the pour point it drains to via D8 flow direction | ✅️ | ✅️ | ✅️ | ✅️ |
292+
| [Basins](xrspatial/watershed.py) | Delineates drainage basins by labeling each cell with its outlet ID | ✅️ | ✅️ | ✅️ | ✅️ |
293+
| [Stream Order](xrspatial/stream_order.py) | Assigns Strahler or Shreve stream order to cells in a drainage network | ✅️ | ✅️ | ✅️ | ✅️ |
294+
| [Stream Link](xrspatial/stream_link.py) | Assigns unique IDs to each stream segment between junctions | ✅️ | ✅️ | ✅️ | ✅️ |
295+
| [Snap Pour Point](xrspatial/snap_pour_point.py) | Snaps pour points to the highest-accumulation cell within a search radius | ✅️ | ✅️ | ✅️ | ✅️ |
296296
| [Flow Path](xrspatial/flow_path.py) | Traces downstream flow paths from start points through a D8 direction grid | ✅️ | ✅️ | 🔄 | 🔄 |
297297
| [HAND](xrspatial/hand.py) | Computes Height Above Nearest Drainage by tracing D8 flow to the nearest stream cell | ✅️ | ✅️ | 🔄 | 🔄 |
298298
| [TWI](xrspatial/twi.py) | Topographic Wetness Index: ln(specific catchment area / tan(slope)) | ✅️ | ✅️ | ✅️ | 🔄 |

xrspatial/basin.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -298,18 +298,33 @@ def _basins_make_pp_block(flow_dir_block, block_info=None):
298298

299299

300300
def _basins_dask_cupy(flow_dir_da):
301-
"""Dask+CuPy basins: convert to numpy, run CPU iterative, convert back."""
301+
"""Dask+CuPy basins: native GPU via watershed infrastructure."""
302302
import cupy as cp
303+
from xrspatial.watershed import _watershed_dask_cupy
303304

304-
flow_dir_np = flow_dir_da.map_blocks(
305-
lambda b: b.get(), dtype=flow_dir_da.dtype,
306-
meta=np.array((), dtype=flow_dir_da.dtype),
307-
)
308-
result = _basins_dask_iterative(flow_dir_np)
309-
return result.map_blocks(
310-
cp.asarray, dtype=result.dtype,
311-
meta=cp.array((), dtype=result.dtype),
312-
)
305+
chunks_y = flow_dir_da.chunks[0]
306+
chunks_x = flow_dir_da.chunks[1]
307+
total_h = sum(chunks_y)
308+
total_w = sum(chunks_x)
309+
310+
def _basins_make_pp_block(flow_dir_block, block_info=None):
311+
if block_info is None or 0 not in block_info:
312+
return cp.full(flow_dir_block.shape, cp.nan, dtype=cp.float64)
313+
row_off = block_info[0]['array-location'][0][0]
314+
col_off = block_info[0]['array-location'][1][0]
315+
h, w = flow_dir_block.shape
316+
chunk_np = flow_dir_block.get() if hasattr(flow_dir_block, 'get') \
317+
else np.asarray(flow_dir_block)
318+
chunk_np = np.asarray(chunk_np, dtype=np.float64)
319+
pp = _basins_init_labels(chunk_np, h, w, total_h, total_w,
320+
row_off, col_off)
321+
return cp.asarray(np.where(pp >= 0, pp, np.nan))
322+
323+
pour_points_da = da.map_blocks(
324+
_basins_make_pp_block, flow_dir_da,
325+
dtype=np.float64, meta=cp.array((), dtype=cp.float64))
326+
327+
return _watershed_dask_cupy(flow_dir_da, pour_points_da)
313328

314329

315330
# =====================================================================

0 commit comments

Comments
 (0)