Skip to content

Commit 4b599fc

Browse files
authored
Added Foundational Hydrology Tools (#921)
* Add D8 flow direction module: foundational hydrology operation Implements D8 flow direction using ESRI power-of-2 encoding (compatible with GDAL/ArcGIS) across all four backends (numpy, cupy, dask+numpy, dask+cupy). Follows the slope.py pattern with explicit cellsize- parameterized backend wrappers. New files: - xrspatial/flow_direction.py — CPU kernel (@ngjit), GPU device/global kernels, four backend wrappers, public API with @supports_dataset - xrspatial/tests/test_flow_direction.py — 66 tests (flat, cardinal, diagonal, known bowl, NaN handling, cross-backend, boundary modes, dtype acceptance, dataset support, cellsize effect, valid codes) - benchmarks/benchmarks/flow_direction.py — ASV benchmark Modified files: - xrspatial/__init__.py — export flow_direction - xrspatial/accessor.py — add Hydrology section to both accessors - README.md — add Hydrology section with Flow Direction entry * Add stream order module: Strahler and Shreve ordering of drainage networks * Add BoundaryStore: memmap-backed boundary strip storage for dask tile sweeps * Add D8 flow accumulation module with iterative dask tile sweep * Add depression fill module using Planchon-Darboux iterative flooding * Add sink identification module with connected-component labeling * Add watershed delineation and drainage basin labeling module * Add hydrology table entries to README, fix minor docstring wording * Add standalone basin module, fix BoundaryStore temp file leaks Extract basin delineation from watershed.py into its own basin.py module with a basin() public function. The old basins() stays as a backward-compatible wrapper. Add basin to __init__.py and both xarray accessors. Add BoundarySnapshot to _boundary_store.py -- a read-only in-memory copy of converged boundary strips. BoundaryStore.snapshot() copies strip data to plain numpy arrays and closes the underlying memmap temp files. Apply this pattern across all hydrology dask backends (flow_accumulation, watershed, fill, stream_order) so temp directories are cleaned up before the lazy dask result is returned. Previously, BoundaryStore objects captured in map_blocks closures kept temp files alive indefinitely. * Add stream link segmentation module with all four backends Assigns unique position-based IDs to each stream segment between junctions, headwaters, and outlets. Supports numpy, cupy, dask+numpy, and dask+cupy backends. * Add snap pour point module with all four backends Moves each pour point to the highest flow-accumulation cell within a circular search radius so that watershed delineation starts from the actual drainage channel. Dask backend extracts sparse pour points chunk-by-chunk (map_blocks flag pass + selective load) to keep memory bounded regardless of grid size. * Add flow path tracing module with all four backends * Add D-infinity flow direction module with all four backends * Fix flow_path Dask OOM: vectorized chunk grouping, numpy buffers, adaptive cache Replace three memory/performance bottlenecks in _flow_path_dask: - Path tracing (phase 3): growable numpy buffers (~24 bytes/cell) instead of list-of-tuples (~164 bytes/cell) - Output assembly (phase 4): pre-group cells by chunk via vectorized searchsorted + stable argsort, then O(1) dict lookup per block instead of O(n_chunks * n_cells) linear scan - LRU cache: adaptive sizing capped at ~512 MB instead of fixed 32 entries regardless of chunk size * Add flow length, TWI, and HAND modules with tests and notebook * Add benchmarks for flow_length, TWI, and HAND
1 parent 80e073f commit 4b599fc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+14006
-2
lines changed

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,22 @@ In the GIS world, rasters are used for representing continuous phenomena (e.g. e
239239

240240
-----------
241241

242+
### **Hydrology**
243+
244+
| Name | Description | NumPy xr.DataArray | Dask xr.DataArray | CuPy GPU xr.DataArray | Dask GPU xr.DataArray |
245+
|:----------:|:------------|:----------------------:|:--------------------:|:-------------------:|:------:|
246+
| [Flow Direction (D8)](xrspatial/flow_direction.py) | Computes D8 flow direction from each cell toward the steepest downhill neighbor | ✅️ | ✅️ | ✅️ | ✅️ |
247+
| [Flow Direction (Dinf)](xrspatial/flow_direction_dinf.py) | Computes D-infinity flow direction as a continuous angle toward the steepest downslope facet | ✅️ | ✅️ | ✅️ | ✅️ |
248+
| [Flow Accumulation (D8)](xrspatial/flow_accumulation.py) | Counts upstream cells draining through each cell in a D8 flow direction grid | ✅️ | ✅️ | ✅️ | ✅️ |
249+
| [Watershed](xrspatial/watershed.py) | Labels each cell with the pour point it drains to via D8 flow direction | ✅️ | ✅️ | ✅️ | ✅️ |
250+
| [Basins](xrspatial/watershed.py) | Delineates drainage basins by labeling each cell with its outlet ID | ✅️ | ✅️ | ✅️ | ✅️ |
251+
| [Stream Order](xrspatial/stream_order.py) | Assigns Strahler or Shreve stream order to cells in a drainage network | ✅️ | ✅️ | ✅️ | ✅️ |
252+
| [Stream Link](xrspatial/stream_link.py) | Assigns unique IDs to each stream segment between junctions | ✅️ | ✅️ | ✅️ | ✅️ |
253+
| [Snap Pour Point](xrspatial/snap_pour_point.py) | Snaps pour points to the highest-accumulation cell within a search radius | ✅️ | ✅️ | ✅️ | ✅️ |
254+
| [Flow Path](xrspatial/flow_path.py) | Traces downstream flow paths from start points through a D8 direction grid | ✅️ | ✅️ | ✅️ | ✅️ |
255+
256+
-----------
257+
242258
### **Zonal**
243259

244260
| Name | Description | NumPy xr.DataArray | Dask xr.DataArray | CuPy GPU xr.DataArray | Dask GPU xr.DataArray |
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from xrspatial import flow_direction, flow_accumulation
2+
3+
from .common import Benchmarking, get_xr_dataarray
4+
5+
6+
class FlowAccumulation:
7+
params = ([100, 300, 1000], ["numpy", "dask"])
8+
param_names = ("nx", "type")
9+
10+
def setup(self, nx, type):
11+
ny = nx // 2
12+
elev = get_xr_dataarray((ny, nx), type)
13+
# Compute flow_dir from elevation (materialise for benchmarking)
14+
self.flow_dir = flow_direction(elev)
15+
if hasattr(self.flow_dir.data, 'compute'):
16+
self.flow_dir.data = self.flow_dir.data.compute()
17+
# Re-chunk for dask benchmark
18+
if type == "dask":
19+
import dask.array as da
20+
self.flow_dir.data = da.from_array(
21+
self.flow_dir.data,
22+
chunks=(max(1, ny // 2), max(1, nx // 2)),
23+
)
24+
25+
def time_flow_accumulation(self, nx, type):
26+
result = flow_accumulation(self.flow_dir)
27+
if hasattr(result.data, 'compute'):
28+
result.data.compute()
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from xrspatial import flow_direction
2+
3+
from .common import Benchmarking
4+
5+
6+
class FlowDirection(Benchmarking):
7+
def __init__(self):
8+
super().__init__(func=flow_direction)
9+
10+
def time_flow_direction(self, nx, type):
11+
return self.time(nx, type)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from xrspatial import flow_direction, flow_length
2+
3+
from .common import get_xr_dataarray
4+
5+
6+
class FlowLength:
7+
params = ([100, 300, 1000], ["numpy", "dask"])
8+
param_names = ("nx", "type")
9+
10+
def setup(self, nx, type):
11+
ny = nx // 2
12+
elev = get_xr_dataarray((ny, nx), "numpy")
13+
self.flow_dir = flow_direction(elev)
14+
fd_data = self.flow_dir.data
15+
16+
if type == "dask":
17+
import dask.array as da
18+
self.flow_dir.data = da.from_array(
19+
fd_data,
20+
chunks=(max(1, ny // 2), max(1, nx // 2)),
21+
)
22+
23+
def time_flow_length_downstream(self, nx, type):
24+
result = flow_length(self.flow_dir, direction='downstream')
25+
if hasattr(result.data, 'compute'):
26+
result.data.compute()
27+
28+
def time_flow_length_upstream(self, nx, type):
29+
result = flow_length(self.flow_dir, direction='upstream')
30+
if hasattr(result.data, 'compute'):
31+
result.data.compute()

benchmarks/benchmarks/hand.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from xrspatial import flow_direction, flow_accumulation, hand
2+
3+
from .common import get_xr_dataarray
4+
5+
6+
class HAND:
7+
params = ([100, 300, 1000], ["numpy", "dask"])
8+
param_names = ("nx", "type")
9+
10+
def setup(self, nx, type):
11+
ny = nx // 2
12+
self.elev = get_xr_dataarray((ny, nx), "numpy")
13+
flow_dir = flow_direction(self.elev)
14+
self.flow_dir = flow_dir
15+
self.flow_accum = flow_accumulation(flow_dir)
16+
17+
if type == "dask":
18+
import dask.array as da
19+
chunks = (max(1, ny // 2), max(1, nx // 2))
20+
fd_data = self.flow_dir.data
21+
fa_data = self.flow_accum.data
22+
elev_data = self.elev.data
23+
24+
self.flow_dir.data = da.from_array(fd_data, chunks=chunks)
25+
self.flow_accum.data = da.from_array(fa_data, chunks=chunks)
26+
self.elev.data = da.from_array(elev_data, chunks=chunks)
27+
28+
def time_hand(self, nx, type):
29+
result = hand(self.flow_dir, self.flow_accum, self.elev)
30+
if hasattr(result.data, 'compute'):
31+
result.data.compute()

benchmarks/benchmarks/twi.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from xrspatial import flow_direction, flow_accumulation, slope, twi
2+
3+
from .common import get_xr_dataarray
4+
5+
6+
class TWI:
7+
params = ([100, 300, 1000], ["numpy", "dask"])
8+
param_names = ("nx", "type")
9+
10+
def setup(self, nx, type):
11+
ny = nx // 2
12+
elev = get_xr_dataarray((ny, nx), "numpy")
13+
flow_dir = flow_direction(elev)
14+
self.flow_accum = flow_accumulation(flow_dir)
15+
self.slope_agg = slope(elev)
16+
17+
if type == "dask":
18+
import dask.array as da
19+
chunks = (max(1, ny // 2), max(1, nx // 2))
20+
self.flow_accum.data = da.from_array(
21+
self.flow_accum.data, chunks=chunks,
22+
)
23+
self.slope_agg.data = da.from_array(
24+
self.slope_agg.data, chunks=chunks,
25+
)
26+
27+
def time_twi(self, nx, type):
28+
result = twi(self.flow_accum, self.slope_agg)
29+
if hasattr(result.data, 'compute'):
30+
result.data.compute()

benchmarks/benchmarks/watershed.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from xrspatial import flow_direction, watershed, basins
2+
3+
from .common import get_xr_dataarray
4+
5+
import numpy as np
6+
7+
8+
class Watershed:
9+
params = ([100, 300, 1000], ["numpy", "dask"])
10+
param_names = ("nx", "type")
11+
12+
def setup(self, nx, type):
13+
ny = nx // 2
14+
elev = get_xr_dataarray((ny, nx), "numpy")
15+
self.flow_dir = flow_direction(elev)
16+
fd_data = self.flow_dir.data
17+
18+
# Create pour_points at pits (code 0)
19+
pp = np.full_like(fd_data, np.nan)
20+
pit_mask = fd_data == 0
21+
pp[pit_mask] = np.arange(1, pit_mask.sum() + 1, dtype=np.float64)
22+
23+
if type == "dask":
24+
import dask.array as da
25+
self.flow_dir.data = da.from_array(
26+
fd_data,
27+
chunks=(max(1, ny // 2), max(1, nx // 2)),
28+
)
29+
self.pour_points = self.flow_dir.copy()
30+
self.pour_points.data = da.from_array(
31+
pp,
32+
chunks=(max(1, ny // 2), max(1, nx // 2)),
33+
)
34+
else:
35+
self.pour_points = self.flow_dir.copy()
36+
self.pour_points.data = pp
37+
38+
def time_watershed(self, nx, type):
39+
result = watershed(self.flow_dir, self.pour_points)
40+
if hasattr(result.data, 'compute'):
41+
result.data.compute()
42+
43+
44+
class Basins:
45+
params = ([100, 300, 1000], ["numpy", "dask"])
46+
param_names = ("nx", "type")
47+
48+
def setup(self, nx, type):
49+
ny = nx // 2
50+
elev = get_xr_dataarray((ny, nx), "numpy")
51+
self.flow_dir = flow_direction(elev)
52+
fd_data = self.flow_dir.data
53+
54+
if type == "dask":
55+
import dask.array as da
56+
self.flow_dir.data = da.from_array(
57+
fd_data,
58+
chunks=(max(1, ny // 2), max(1, nx // 2)),
59+
)
60+
61+
def time_basins(self, nx, type):
62+
result = basins(self.flow_dir)
63+
if hasattr(result.data, 'compute'):
64+
result.data.compute()

examples/user_guide/11_Hydrology.ipynb

Lines changed: 938 additions & 0 deletions
Large diffs are not rendered by default.

xrspatial/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,14 @@
1414
from xrspatial.classify import reclassify # noqa
1515
from xrspatial.curvature import curvature # noqa
1616
from xrspatial.emerging_hotspots import emerging_hotspots # noqa
17+
from xrspatial.fill import fill # noqa
18+
from xrspatial.flow_accumulation import flow_accumulation # noqa
19+
from xrspatial.flow_direction import flow_direction # noqa
20+
from xrspatial.flow_direction_dinf import flow_direction_dinf # noqa
21+
from xrspatial.flow_length import flow_length # noqa
22+
from xrspatial.flow_path import flow_path # noqa
1723
from xrspatial.focal import mean # noqa
24+
from xrspatial.hand import hand # noqa
1825
from xrspatial.hillshade import hillshade # noqa
1926
from xrspatial.mahalanobis import mahalanobis # noqa
2027
from xrspatial.multispectral import arvi # noqa
@@ -31,6 +38,10 @@
3138
from xrspatial.proximity import great_circle_distance # noqa
3239
from xrspatial.proximity import manhattan_distance # noqa
3340
from xrspatial.proximity import proximity # noqa
41+
from xrspatial.sink import sink # noqa
42+
from xrspatial.snap_pour_point import snap_pour_point # noqa
43+
from xrspatial.stream_link import stream_link # noqa
44+
from xrspatial.stream_order import stream_order # noqa
3445
from xrspatial.slope import slope # noqa
3546
from xrspatial.surface_distance import surface_allocation # noqa
3647
from xrspatial.surface_distance import surface_direction # noqa
@@ -39,8 +50,12 @@
3950
from xrspatial.terrain_metrics import roughness # noqa
4051
from xrspatial.terrain_metrics import tpi # noqa
4152
from xrspatial.terrain_metrics import tri # noqa
53+
from xrspatial.twi import twi # noqa
4254
from xrspatial.polygonize import polygonize # noqa
4355
from xrspatial.viewshed import viewshed # noqa
56+
from xrspatial.basin import basin # noqa
57+
from xrspatial.watershed import basins # noqa
58+
from xrspatial.watershed import watershed # noqa
4459
from xrspatial.zonal import apply as zonal_apply # noqa
4560
from xrspatial.zonal import crop # noqa
4661
from xrspatial.zonal import trim # noqa

0 commit comments

Comments
 (0)