-
Notifications
You must be signed in to change notification settings - Fork 86
Expand file tree
/
Copy pathsink_d8.py
More file actions
276 lines (221 loc) · 7.56 KB
/
sink_d8.py
File metadata and controls
276 lines (221 loc) · 7.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
"""Sink identification: find and label depression cells in a D8 flow direction grid.
Identifies cells with direction code 0 (pit/flat with no downhill neighbor)
and labels connected groups using 8-connected BFS.
"""
from __future__ import annotations
import numpy as np
import xarray as xr
from numba import cuda
try:
import cupy
except ImportError:
class cupy: # type: ignore[no-redef]
ndarray = False
try:
import dask.array as da
except ImportError:
da = None
from xrspatial.utils import (
_validate_raster,
cuda_args,
has_cuda_and_cupy,
is_cupy_array,
is_dask_cupy,
ngjit,
)
from xrspatial.dataset_support import supports_dataset
# =====================================================================
# CPU kernel
# =====================================================================
@ngjit
def _sink_cpu(flow_dir, h, w, row_off, col_off, total_w):
"""8-connected BFS flood-fill CCL for sink cells (code 0).
Labels each connected group of code-0 cells with a unique ID
based on position: (row_off + r) * total_w + (col_off + c) + 1.
"""
labels = np.empty((h, w), dtype=np.float64)
labels[:] = np.nan
dy = np.array([-1, -1, -1, 0, 0, 1, 1, 1])
dx = np.array([-1, 0, 1, -1, 1, -1, 0, 1])
queue_r = np.empty(h * w, dtype=np.int64)
queue_c = np.empty(h * w, dtype=np.int64)
for r in range(h):
for c in range(w):
v = flow_dir[r, c]
if v != v: # NaN
continue
if v != 0.0:
continue
if labels[r, c] == labels[r, c]: # already labeled
continue
label = float((row_off + r) * total_w + (col_off + c) + 1)
labels[r, c] = label
head = np.int64(0)
tail = np.int64(0)
queue_r[tail] = r
queue_c[tail] = c
tail += 1
while head < tail:
cr = queue_r[head]
cc = queue_c[head]
head += 1
for k in range(8):
nr = cr + dy[k]
nc = cc + dx[k]
if nr < 0 or nr >= h or nc < 0 or nc >= w:
continue
nv = flow_dir[nr, nc]
if nv != nv:
continue
if nv != 0.0:
continue
if labels[nr, nc] == labels[nr, nc]:
continue
labels[nr, nc] = label
queue_r[tail] = nr
queue_c[tail] = nc
tail += 1
return labels
# =====================================================================
# GPU kernels
# =====================================================================
@cuda.jit
def _sink_init_gpu(flow_dir, labels, H, W):
"""Pits (code 0) get position-based ID, others get 0."""
i, j = cuda.grid(2)
if i >= H or j >= W:
return
v = flow_dir[i, j]
if v != v: # NaN
labels[i, j] = 0.0
return
if v == 0.0:
labels[i, j] = float(i * W + j + 1)
else:
labels[i, j] = 0.0
@cuda.jit
def _sink_propagate_gpu(labels, changed, H, W):
"""Min-label propagation: each sink cell takes minimum neighbor label."""
i, j = cuda.grid(2)
if i >= H or j >= W:
return
my_label = labels[i, j]
if my_label <= 0.0:
return
min_label = my_label
for k in range(8):
if k == 0:
dy, dx = -1, -1
elif k == 1:
dy, dx = -1, 0
elif k == 2:
dy, dx = -1, 1
elif k == 3:
dy, dx = 0, -1
elif k == 4:
dy, dx = 0, 1
elif k == 5:
dy, dx = 1, -1
elif k == 6:
dy, dx = 1, 0
else:
dy, dx = 1, 1
ni = i + dy
nj = j + dx
if ni < 0 or ni >= H or nj < 0 or nj >= W:
continue
nb = labels[ni, nj]
if nb > 0.0 and nb < min_label:
min_label = nb
if min_label < my_label:
labels[i, j] = min_label
cuda.atomic.add(changed, 0, 1)
def _sink_cupy(flow_dir_data):
"""GPU driver for sink identification."""
import cupy as cp
H, W = flow_dir_data.shape
flow_dir_f64 = flow_dir_data.astype(cp.float64)
labels = cp.zeros((H, W), dtype=cp.float64)
changed = cp.zeros(1, dtype=cp.int32)
griddim, blockdim = cuda_args((H, W))
_sink_init_gpu[griddim, blockdim](flow_dir_f64, labels, H, W)
max_iter = max(H, W)
for _ in range(max_iter):
changed[0] = 0
_sink_propagate_gpu[griddim, blockdim](labels, changed, H, W)
if int(changed[0]) == 0:
break
return cp.where(labels > 0, labels, cp.nan)
# =====================================================================
# Backend wrappers
# =====================================================================
def _run_numpy(data):
h, w = data.shape
return _sink_cpu(data.astype(np.float64), h, w, 0, 0, w)
def _run_dask_numpy(data):
total_w = data.shape[1]
def _tile_fn(block, block_info=None):
if block_info is None or 0 not in block_info:
return np.full(block.shape, np.nan, dtype=np.float64)
row_off = block_info[0]['array-location'][0][0]
col_off = block_info[0]['array-location'][1][0]
h, w = block.shape
return _sink_cpu(np.asarray(block, dtype=np.float64),
h, w, row_off, col_off, total_w)
return da.map_blocks(
_tile_fn, data,
dtype=np.float64,
meta=np.array((), dtype=np.float64),
)
def _run_dask_cupy(data):
"""Dask+CuPy: convert to numpy dask, run CPU path, convert back."""
import cupy as cp
data_np = data.map_blocks(
lambda b: b.get(), dtype=data.dtype,
meta=np.array((), dtype=data.dtype),
)
result = _run_dask_numpy(data_np)
return result.map_blocks(
cp.asarray, dtype=result.dtype,
meta=cp.array((), dtype=result.dtype),
)
# =====================================================================
# Public API
# =====================================================================
@supports_dataset
def sink_d8(flow_dir: xr.DataArray,
name: str = 'sink') -> xr.DataArray:
"""Identify and label depression cells in a D8 flow direction grid.
Finds cells with direction code 0 (pit/flat with no downhill
neighbor) and labels connected groups using 8-connected
component labeling.
Parameters
----------
flow_dir : xarray.DataArray or xr.Dataset
2D D8 flow direction grid
(codes 0/1/2/4/8/16/32/64/128; NaN for nodata).
name : str, default='sink'
Name of output DataArray.
Returns
-------
xarray.DataArray or xr.Dataset
2D float64 array where each sink cell is labeled with a unique
group ID. Non-sink cells and NaN cells are NaN.
"""
_validate_raster(flow_dir, func_name='sink', name='flow_dir')
data = flow_dir.data
if isinstance(data, np.ndarray):
out = _run_numpy(data)
elif has_cuda_and_cupy() and is_cupy_array(data):
out = _sink_cupy(data)
elif has_cuda_and_cupy() and is_dask_cupy(flow_dir):
out = _run_dask_cupy(data)
elif da is not None and isinstance(data, da.Array):
out = _run_dask_numpy(data)
else:
raise TypeError(f"Unsupported array type: {type(data)}")
return xr.DataArray(out,
name=name,
coords=flow_dir.coords,
dims=flow_dir.dims,
attrs=flow_dir.attrs)