|
| 1 | +import dask.array as da |
1 | 2 | import datashader as ds |
2 | 3 | import numpy as np |
3 | 4 | import pandas as pd |
|
6 | 7 |
|
7 | 8 | from xrspatial import viewshed |
8 | 9 | from xrspatial.tests.general_checks import general_output_checks |
| 10 | +from xrspatial.viewshed import INVISIBLE |
9 | 11 |
|
10 | 12 | from ..gpu_rtx import has_rtx |
11 | 13 |
|
@@ -159,3 +161,103 @@ def test_viewshed_flat(backend, observer_elev, target_elev): |
159 | 161 | # Should do better with viewshed gpu output angles. |
160 | 162 | mask = (v.data < 90) |
161 | 163 | np.testing.assert_allclose(v.data[mask], angle[mask], atol=0.03) |
| 164 | + |
| 165 | + |
| 166 | +# ------------------------------------------------------------------- |
| 167 | +# Dask backend tests |
| 168 | +# ------------------------------------------------------------------- |
| 169 | + |
| 170 | +@pytest.mark.parametrize("observer_elev", [5, 2]) |
| 171 | +@pytest.mark.parametrize("target_elev", [0, 1]) |
| 172 | +def test_viewshed_dask_flat(observer_elev, target_elev): |
| 173 | + """Flat terrain: dask should match analytical formula.""" |
| 174 | + x, y = 0, 0 |
| 175 | + ny, nx = 5, 4 |
| 176 | + arr = da.from_array(np.full((ny, nx), 1.3), chunks=(3, 2)) |
| 177 | + xs = np.arange(nx) * 0.5 |
| 178 | + ys = np.arange(ny) * 1.5 |
| 179 | + xarr = xa.DataArray(arr, coords=dict(x=xs, y=ys), dims=["y", "x"]) |
| 180 | + v = viewshed(xarr, x=x, y=y, |
| 181 | + observer_elev=observer_elev, target_elev=target_elev) |
| 182 | + result = v.values |
| 183 | + |
| 184 | + xs2, ys2 = np.meshgrid(xs, ys) |
| 185 | + d_vert = observer_elev - target_elev |
| 186 | + d_horz = np.sqrt((xs2 - x) ** 2 + (ys2 - y) ** 2) |
| 187 | + expected = np.rad2deg(np.arctan2(d_horz, d_vert)) |
| 188 | + expected[0, 0] = result[0, 0] # observer cell |
| 189 | + np.testing.assert_allclose(result, expected) |
| 190 | + |
| 191 | + |
| 192 | +def test_viewshed_dask_matches_numpy(): |
| 193 | + """Dask should closely match numpy R2 sweep on small varied terrain.""" |
| 194 | + np.random.seed(42) |
| 195 | + ny, nx = 12, 10 |
| 196 | + terrain = np.random.uniform(0, 5, (ny, nx)) |
| 197 | + xs = np.arange(nx, dtype=float) |
| 198 | + ys = np.arange(ny, dtype=float) |
| 199 | + |
| 200 | + # numpy reference |
| 201 | + raster_np = xa.DataArray(terrain.copy(), coords=dict(x=xs, y=ys), |
| 202 | + dims=["y", "x"]) |
| 203 | + v_np = viewshed(raster_np, x=5.0, y=6.0, observer_elev=10) |
| 204 | + |
| 205 | + # dask (will use Tier B — full compute + R2 — on this small grid) |
| 206 | + raster_da = xa.DataArray( |
| 207 | + da.from_array(terrain.copy(), chunks=(4, 5)), |
| 208 | + coords=dict(x=xs, y=ys), dims=["y", "x"]) |
| 209 | + v_da = viewshed(raster_da, x=5.0, y=6.0, observer_elev=10) |
| 210 | + |
| 211 | + np.testing.assert_allclose(v_da.values, v_np.values) |
| 212 | + |
| 213 | + |
| 214 | +def test_viewshed_dask_max_distance(): |
| 215 | + """max_distance should produce partial viewshed within radius.""" |
| 216 | + ny, nx = 20, 20 |
| 217 | + arr_np = np.zeros((ny, nx)) |
| 218 | + xs = np.arange(nx, dtype=float) |
| 219 | + ys = np.arange(ny, dtype=float) |
| 220 | + |
| 221 | + raster_da = xa.DataArray( |
| 222 | + da.from_array(arr_np, chunks=(10, 10)), |
| 223 | + coords=dict(x=xs, y=ys), dims=["y", "x"]) |
| 224 | + v = viewshed(raster_da, x=10.0, y=10.0, |
| 225 | + observer_elev=5, max_distance=5.0) |
| 226 | + result = v.values |
| 227 | + |
| 228 | + # Observer cell is visible |
| 229 | + assert result[10, 10] == 180.0 |
| 230 | + |
| 231 | + # Cells far beyond max_distance should be INVISIBLE |
| 232 | + assert result[0, 0] == INVISIBLE |
| 233 | + assert result[19, 19] == INVISIBLE |
| 234 | + |
| 235 | + # Cells within max_distance should be visible (flat terrain, observer up) |
| 236 | + assert result[10, 12] > INVISIBLE # 2 cells away |
| 237 | + assert result[8, 10] > INVISIBLE # 2 cells away |
| 238 | + |
| 239 | + |
| 240 | +def test_viewshed_dask_distance_sweep(): |
| 241 | + """Force the distance-sweep path (Tier C) and verify flat terrain.""" |
| 242 | + from unittest.mock import patch |
| 243 | + |
| 244 | + ny, nx = 10, 10 |
| 245 | + arr_np = np.full((ny, nx), 0.0) |
| 246 | + xs = np.arange(nx, dtype=float) |
| 247 | + ys = np.arange(ny, dtype=float) |
| 248 | + |
| 249 | + raster_da = xa.DataArray( |
| 250 | + da.from_array(arr_np, chunks=(5, 5)), |
| 251 | + coords=dict(x=xs, y=ys), dims=["y", "x"]) |
| 252 | + |
| 253 | + # R2 needs 280*10*10=28000 bytes; Tier B requires <50% of avail. |
| 254 | + # Output grid is 10*10*8=800 bytes; memory guard requires <80% of avail. |
| 255 | + # 10000 bytes: skips Tier B (28000 > 5000) and passes guard (800 < 8000). |
| 256 | + with patch('xrspatial.viewshed._available_memory_bytes', |
| 257 | + return_value=10_000): |
| 258 | + v = viewshed(raster_da, x=5.0, y=5.0, observer_elev=5) |
| 259 | + |
| 260 | + result = v.values |
| 261 | + assert result[5, 5] == 180.0 |
| 262 | + # All cells on flat terrain should be visible |
| 263 | + assert (result > INVISIBLE).all() |
0 commit comments