Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 56 additions & 8 deletions xrspatial/geotiff/_gpu_decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -2453,12 +2453,31 @@ class _DeflateCompOpts(ctypes.Structure):
return None
max_cs = max_comp_size.value

# Allocate compressed output buffers on device
d_comp_bufs = [cupy.empty(max_cs, dtype=cupy.uint8) for _ in range(n_tiles)]
# Allocate one contiguous device buffer of size ``n_tiles *
# max_cs`` and view ``n_tiles`` per-tile slabs into it. This
# mirrors the single-buffer pattern already used by the decode
# path in ``_try_nvcomp_from_device_bufs`` (#1659) and the GDS
# fallback in ``gpu_decode_tiles_from_file`` (#1552). Per-tile
# ``cupy.empty`` calls each round-trip through the memory pool
# and add up to milliseconds of overhead for thousand-tile
# writes; one allocation + pointer arithmetic skips that
# bookkeeping. The slab views are non-overlapping so nvCOMP
# can write into them in parallel via the per-tile pointer
# array. See issue #1712.
_check_gpu_memory(n_tiles * max_cs,
what="nvCOMP compressed-output buffer")
d_comp_pool = cupy.empty(n_tiles * max_cs, dtype=cupy.uint8)
comp_base_ptr = int(d_comp_pool.data.ptr)
d_comp_bufs = [
d_comp_pool[i * max_cs:(i + 1) * max_cs]
for i in range(n_tiles)
]
comp_ptrs_host = np.arange(n_tiles, dtype=np.uint64) * np.uint64(max_cs)
comp_ptrs_host += np.uint64(comp_base_ptr)

# Build pointer and size arrays
d_uncomp_ptrs = cupy.array([b.data.ptr for b in d_tile_bufs], dtype=cupy.uint64)
d_comp_ptrs = cupy.array([b.data.ptr for b in d_comp_bufs], dtype=cupy.uint64)
d_comp_ptrs = cupy.asarray(comp_ptrs_host)
d_uncomp_sizes = cupy.full(n_tiles, tile_bytes, dtype=cupy.uint64)
d_comp_sizes = cupy.empty(n_tiles, dtype=cupy.uint64)

Expand Down Expand Up @@ -2518,18 +2537,47 @@ class _DeflateCompOpts(ctypes.Structure):
adler_checksums[i] = zlib.adler32(
host_view[i * tile_bytes:(i + 1) * tile_bytes])

# Read compressed sizes and data back to CPU
# Read compressed sizes and data back to CPU.
#
# Previously this loop ran one ``.get()`` per tile, which
# serialised on the default stream and burned a per-DMA setup
# cost (issue #1712, same pattern as the decode-side fix in
# #1552). Instead, hoist the variable-size slabs into a
# contiguous buffer with one ``cupy.concatenate``, do one D2H
# transfer, then slice the host buffer per tile.
comp_sizes = d_comp_sizes.get().astype(int)
if n_tiles == 0:
return []
# Each tile's compressed payload lives in the first
# ``comp_sizes[i]`` bytes of its ``max_cs``-sized slab. Build a
# list of trimmed views and concatenate into one packed device
# buffer; the host then sees one DMA-sized region.
trimmed = [
d_comp_bufs[i][:int(comp_sizes[i])] for i in range(n_tiles)
]
# The concat allocates a fresh device buffer of ``sum(comp_sizes)``
# bytes -- a peak-VRAM bump on top of the ``n_tiles * max_cs``
# pool above. Guard it explicitly so an OOM here surfaces with
# the same message style as ``_batched_d2h_to_bytes`` rather
# than silently triggering the broad except below.
_check_gpu_memory(int(sum(comp_sizes)),
what="nvCOMP batch staging buffer")
d_comp_concat = cupy.concatenate(trimmed)
host_concat = bytes(d_comp_concat.get())
Comment on lines +2555 to +2566

# Walk host_concat by per-tile offsets to peel out each tile's
# compressed bytes. Cumulative-sum on the host avoids a second
# device round-trip.
offsets = np.zeros(n_tiles + 1, dtype=np.int64)
np.cumsum(comp_sizes, out=offsets[1:])

result = []
for i in range(n_tiles):
cs = int(comp_sizes[i])
raw = d_comp_bufs[i][:cs].get().tobytes()

raw = host_concat[offsets[i]:offsets[i + 1]]
if adler_checksums is not None:
# Wrap raw deflate in zlib format: header + data + adler32
checksum = struct.pack('>I', adler_checksums[i] & 0xFFFFFFFF)
raw = b'\x78\x9c' + raw + checksum

result.append(raw)

return result
Expand Down
157 changes: 157 additions & 0 deletions xrspatial/geotiff/tests/test_nvcomp_batch_compress_batched_1712.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""Coverage for the batched ``_nvcomp_batch_compress`` (#1712).

The pre-fix function allocated compressed-output device buffers one
``cupy.empty`` per tile and then read each tile back to host with one
``.get()`` per tile. Both patterns serialised on the default CUDA
stream and were dominant in large-N writes. The fix folds both into a
single contiguous device allocation + a single batched D2H concat-and-
``.get()``, matching the patterns already in use on the decode side
(#1552, #1659).

These tests pin the new shape and confirm the deflate / zstd GPU write
paths still round-trip end-to-end.
"""
from __future__ import annotations

import importlib.util
import inspect
import os
import tempfile

import numpy as np
import pytest
import xarray as xr

try:
import cupy
_HAS_CUPY = True
except Exception:
_HAS_CUPY = False


def _gpu_available() -> bool:
"""Match the geotiff-test convention: cupy import AND working CUDA.

A host can have cupy installed without a usable CUDA runtime (no
driver, no device visible, container misconfig), and in that case
every test that calls into the GPU writer would fail rather than
skip. ``cupy.cuda.is_available()`` is the cheap probe.
"""
if importlib.util.find_spec("cupy") is None:
return False
try:
import cupy
return bool(cupy.cuda.is_available())
except Exception:
return False


_HAS_GPU = _gpu_available()

# nvCOMP is the entry point that exercises this code path.
from xrspatial.geotiff import _gpu_decode

needs_cupy = pytest.mark.skipif(
not _HAS_GPU, reason="cupy + CUDA required"
)


# ----------------------------------------------------------------------
# Source-level structural assertions -- run on any host with the source
# available, no GPU required.
# ----------------------------------------------------------------------

def test_no_per_tile_cupy_empty_in_compressed_pool():
"""The per-tile cupy.empty list comprehension is gone (#1712).

The fix replaced it with a single contiguous allocation. Catch any
regression that brings the loop back.
"""
source = inspect.getsource(_gpu_decode._nvcomp_batch_compress)
assert "cupy.empty(max_cs, dtype=cupy.uint8) for _ in range" not in source, (
"_nvcomp_batch_compress regressed to per-tile cupy.empty "
"allocations for the compressed output pool. See #1712."
)


def test_no_per_tile_get_in_result_loop():
"""The per-tile ``d_comp_bufs[i][:cs].get().tobytes()`` is gone (#1712).

The fix replaced it with one concat + one ``.get()``. Catch any
regression that brings the per-tile pattern back.
"""
source = inspect.getsource(_gpu_decode._nvcomp_batch_compress)
# The exact string the prior loop used:
bad_fragment = "d_comp_bufs[i][:cs].get().tobytes()"
assert bad_fragment not in source, (
"_nvcomp_batch_compress regressed to per-tile .get().tobytes() "
"D2H readback. See #1712."
)


# ----------------------------------------------------------------------
# End-to-end behaviour: GPU write + read round-trip stays correct
# ----------------------------------------------------------------------

@needs_cupy
@pytest.mark.parametrize("compression", ["deflate", "zstd"])
def test_gpu_write_roundtrip_after_batched_compress(compression):
"""GPU compress path round-trips uncorrupted for deflate + zstd.

Catches the most likely regression mode: any off-by-one in the
cumulative-sum offsets used to slice the host-side concatenated
buffer would scramble tile order, which a round-trip equality
check picks up immediately.
"""
from xrspatial.geotiff import write_geotiff_gpu, open_geotiff

rng = np.random.default_rng(seed=1712)
arr_cpu = rng.random((512, 512), dtype=np.float32)
arr_gpu = cupy.asarray(arr_cpu)
darr = xr.DataArray(arr_gpu, dims=["y", "x"])

with tempfile.TemporaryDirectory(prefix="nvcomp_batch_1712_") as td:
path = os.path.join(td, f"roundtrip_{compression}.tif")
try:
write_geotiff_gpu(
darr, path,
compression=compression,
tiled=True,
tile_size=64,
)
except RuntimeError as e:
# nvCOMP may be unavailable in this environment; the writer
# falls back to CPU and that path doesn't exercise the
# change. Skip rather than fail.
pytest.skip(f"nvCOMP unavailable for {compression}: {e}")

back = open_geotiff(path)
np.testing.assert_allclose(back.values, arr_cpu, rtol=0, atol=0)


@needs_cupy
def test_gpu_write_zero_tile_edge_case():
"""A 0-tile compress returns an empty list without indexing into None.

The cumulative-sum / concat path must short-circuit before
``cupy.concatenate`` (which would raise on an empty list). The
pre-fix loop simply iterated zero times, so the contract is the
same empty-list output.
"""
# Direct call into the internal function with n_tiles=0 is
# awkward because it needs a libnvCOMP handle and matching opts.
# Instead, exercise the public writer with a tiny single-tile
# input and confirm the fast path does not crash. Real n_tiles==0
# never occurs via the writer (every image has at least one tile).
from xrspatial.geotiff import write_geotiff_gpu, open_geotiff
arr_gpu = cupy.zeros((32, 32), dtype=cupy.float32)
darr = xr.DataArray(arr_gpu, dims=["y", "x"])
with tempfile.TemporaryDirectory(prefix="nvcomp_batch_1712_") as td:
path = os.path.join(td, "tiny.tif")
try:
write_geotiff_gpu(darr, path, compression="zstd",
tiled=True, tile_size=32)
except RuntimeError as e:
pytest.skip(f"nvCOMP unavailable: {e}")
back = open_geotiff(path)
assert back.shape == (32, 32)
Loading