diff --git a/docs/release-notes/0.13.0.md b/docs/release-notes/0.13.0.md new file mode 100644 index 00000000..abcbc59a --- /dev/null +++ b/docs/release-notes/0.13.0.md @@ -0,0 +1,15 @@ +### 0.13.0 {small}`the-future` + +```{rubric} Features +``` +* Add support for aggregate operations on CSC matrices, Fortran-ordered arrays, and Dask with sparse CSR and dense matrices {pr}`395` {smaller}`S Dicks` + +```{rubric} Performance +``` + +```{rubric} Bug fixes +``` + + +```{rubric} Misc +``` diff --git a/docs/release-notes/index.md b/docs/release-notes/index.md index ab603767..059730b1 100644 --- a/docs/release-notes/index.md +++ b/docs/release-notes/index.md @@ -2,6 +2,10 @@ # Release notes +## Version 0.13.0 +```{include} /release-notes/0.13.0.md +``` + ## Version 0.12.0 ```{include} /release-notes/0.12.7.md ``` diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index aa2729fe..cad33ba4 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -8,18 +8,22 @@ ) import cupy as cp -import numpy as np from anndata import AnnData from cupyx.scipy import sparse as cp_sparse from scanpy._utils import _resolve_axis from scanpy.get._aggregated import _combine_categories +from rapids_singlecell._compat import ( + DaskArray, + _meta_dense, +) from rapids_singlecell.get import _check_mask from rapids_singlecell.preprocessing._utils import _check_gpu_X if TYPE_CHECKING: from collections.abc import Collection, Iterable + import numpy as np import pandas as pd from numpy.typing import NDArray @@ -52,7 +56,7 @@ def __init__( ) -> None: self.mask = mask self.groupby = cp.array(groupby.codes, dtype=cp.int32) - self.n_cells = cp.array(np.bincount(groupby.codes), dtype=cp.float64).reshape( + self.n_cells = cp.array(cp.bincount(self.groupby), dtype=cp.float64).reshape( -1, 1 ) self.data = data @@ -66,6 +70,108 @@ def _get_mask(self): else: return cp.ones(self.data.shape[0], dtype=bool) + def count_mean_var_dask(self, dof: int = 1, split_every: int = 2): + """ + This function is used to calculate the sum, mean, and variance of the data matrix. + It automatically detects sparse vs dense matrices and uses the appropriate + CUDA kernel for aggregation. + """ + import dask.array as da + + assert dof >= 0 + from ._kernels._aggr_kernels import ( + _get_aggr_dense_kernel_C, + _get_aggr_sparse_kernel, + ) + + if isinstance(self.data._meta, cp.ndarray): + kernel = _get_aggr_dense_kernel_C(self.data.dtype) + is_sparse = False + else: + kernel = _get_aggr_sparse_kernel(self.data.dtype) + is_sparse = True + + kernel.compile() + n_groups = self.n_cells.shape[0] + + def __aggregate_dask(X_part, mask_part, groupby_part): + out = cp.zeros((1, 3, n_groups, self.data.shape[1]), dtype=cp.float64) + threads_per_block = 512 + + if is_sparse: + # Sparse matrix kernel parameters + grid = (X_part.shape[0],) + kernel_args = ( + X_part.indptr, + X_part.indices, + X_part.data, + ) + else: + # Dense matrix kernel parameters + N = X_part.shape[0] * X_part.shape[1] + + blocks = min( + (N + threads_per_block - 1) // threads_per_block, + cp.cuda.Device().attributes["MultiProcessorCount"] * 8, + ) + grid = (blocks,) + kernel_args = (X_part,) + + kernel( + grid, + (threads_per_block,), + ( + *kernel_args, + out, + groupby_part, + mask_part, + X_part.shape[0], + X_part.shape[1], + n_groups, + ), + ) + return out + + # Prepare Dask arrays + mask = self._get_mask() + mask_dask = da.from_array( + mask, chunks=(self.data.chunks[0]), meta=_meta_dense(mask.dtype) + ) + groupby_dask = da.from_array( + self.groupby, + chunks=(self.data.chunks[0]), + meta=_meta_dense(self.groupby.dtype), + ) + + # Apply aggregation across all blocks + out = da.map_blocks( + __aggregate_dask, + self.data, + mask_dask[..., None], + groupby_dask[..., None], + meta=cp.empty([], dtype=cp.float64), + dtype=cp.float64, + new_axis=(1, 2), + chunks=( + (1,) * self.data.blocks.size, + (3,), + (n_groups,), + (self.data.shape[1],), + ), + ) + + # Compute final aggregated results + out = out.sum(axis=0, split_every=split_every).compute() + sums, counts, sq_sums = out[0], out[1], out[2] + + # Calculate statistics + counts = counts.astype(cp.int32) + means = sums / self.n_cells + var = sq_sums / self.n_cells - cp.power(means, 2) + var *= self.n_cells / (self.n_cells - dof) + + return {"mean": means, "var": var, "sum": sums, "count_nonzero": counts} + def count_mean_var_sparse(self, dof: int = 1): """ This function is used to calculate the sum, mean, and variance of the sparse data matrix. @@ -73,17 +179,22 @@ def count_mean_var_sparse(self, dof: int = 1): """ assert dof >= 0 - from ._kernels._aggr_kernels import _get_aggr_sparse_kernel + from ._kernels._aggr_kernels import ( + _get_aggr_sparse_kernel, + _get_aggr_sparse_kernel_csc, + ) + + out = cp.zeros( + (3, self.n_cells.shape[0] * self.data.shape[1]), dtype=cp.float64 + ) + block = (512,) if self.data.format == "csc": - self.data = self.data.tocsr() - means = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) - var = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) - sums = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) - counts = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.int32) - block = (128,) - grid = (self.data.shape[0],) - aggr_kernel = _get_aggr_sparse_kernel(self.data.dtype) + grid = (self.data.shape[1],) + aggr_kernel = _get_aggr_sparse_kernel_csc(self.data.dtype) + else: + grid = (self.data.shape[0],) + aggr_kernel = _get_aggr_sparse_kernel(self.data.dtype) mask = self._get_mask() aggr_kernel( grid, @@ -92,23 +203,24 @@ def count_mean_var_sparse(self, dof: int = 1): self.data.indptr, self.data.indices, self.data.data, - counts, - sums, - means, - var, + out, self.groupby, - self.n_cells, mask, self.data.shape[0], self.data.shape[1], + self.n_cells.shape[0], ), ) - - var = var - cp.power(means, 2) + sums, counts, sq_sums = out[0, :], out[1, :], out[2, :] + sums = sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + sq_sums = sq_sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + counts = counts.reshape(self.n_cells.shape[0], self.data.shape[1]) + counts = counts.astype(cp.int32) + means = sums / self.n_cells + var = sq_sums / self.n_cells - means**2 var *= self.n_cells / (self.n_cells - dof) results = {"sum": sums, "count_nonzero": counts, "mean": means, "var": var} - return results def count_mean_var_sparse_sparse(self, funcs, dof: int = 1): @@ -275,34 +387,44 @@ def count_mean_var_dense(self, dof: int = 1): """ assert dof >= 0 - from ._kernels._aggr_kernels import _get_aggr_dense_kernel + from ._kernels._aggr_kernels import ( + _get_aggr_dense_kernel_C, + _get_aggr_dense_kernel_F, + ) - means = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) - var = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) - sums = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) - counts = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.int32) - block = (128,) - grid = (self.data.shape[0],) - aggr_kernel = _get_aggr_dense_kernel(self.data.dtype) + out = cp.zeros((3, self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) + + N = self.data.shape[0] * self.data.shape[1] + threads_per_block = 512 + blocks = min( + (N + threads_per_block - 1) // threads_per_block, + cp.cuda.Device().attributes["MultiProcessorCount"] * 8, + ) + if self.data.flags.c_contiguous: + aggr_kernel = _get_aggr_dense_kernel_C(self.data.dtype) + else: + aggr_kernel = _get_aggr_dense_kernel_F(self.data.dtype) mask = self._get_mask() aggr_kernel( - grid, - block, + (blocks,), + (threads_per_block,), ( - self.data.data, - counts, - sums, - means, - var, + self.data, + out, self.groupby, - self.n_cells, mask, self.data.shape[0], self.data.shape[1], + self.n_cells.shape[0], ), ) - - var = var - cp.power(means, 2) + sums, counts, sq_sums = out[0], out[1], out[2] + sums = sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + counts = counts.reshape(self.n_cells.shape[0], self.data.shape[1]) + sq_sums = sq_sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + counts = counts.astype(cp.int32) + means = sums / self.n_cells + var = sq_sums / self.n_cells - cp.power(means, 2) var *= self.n_cells / (self.n_cells - dof) results = {"sum": sums, "count_nonzero": counts, "mean": means, "var": var} @@ -322,6 +444,7 @@ def aggregate( obsm: str | None = None, varm: str | None = None, return_sparse: bool = False, + **kwargs, ) -> AnnData: """\ Aggregate data matrix based on some categorical grouping. @@ -416,11 +539,9 @@ def aggregate( elif axis == 1: # i.e., all of `varm`, `obsm`, `layers` are None so we use `X` which must be transposed data = data.T - _check_gpu_X(data) + _check_gpu_X(data, allow_dask=True) dim_df = getattr(adata, axis_name) categorical, new_label_df = _combine_categories(dim_df, by) - # Actual computation - groupby = Aggregate(groupby=categorical, data=data, mask=mask) funcs = set([func] if isinstance(func, str) else func) @@ -429,6 +550,15 @@ def aggregate( if isinstance(data, cp.ndarray): result = groupby.count_mean_var_dense(dof) + elif isinstance(data, DaskArray): + if "split_every" in kwargs: + assert isinstance(kwargs["split_every"], int) + assert kwargs["split_every"] > 0 + split_every = kwargs["split_every"] + else: + split_every = 2 + result = groupby.count_mean_var_dask(dof, split_every=split_every) + else: if return_sparse: result = groupby.count_mean_var_sparse_sparse(funcs, dof) diff --git a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py index a3e342de..cda671d1 100644 --- a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py +++ b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py @@ -4,26 +4,52 @@ sparse_dense_aggr_kernel = r""" (const int *indptr, const int *index,const {0} *data, - int* counts,double* sums, double* means, double* vars, int* cats, double* numcells,bool* mask,int n_cells, int n_genes){ - int cell = blockIdx.x; + double* out, int* cats,bool* mask, + size_t n_cells, size_t n_genes, size_t n_groups){ + size_t cell = blockIdx.x; if(cell >= n_cells || !mask[cell]){ return; } int cell_start = indptr[cell]; int cell_end = indptr[cell+1]; - int group = cats[cell]; - double major = numcells[group]; + size_t group = (size_t)cats[cell]; for (int gene = cell_start+threadIdx.x; gene= n_genes){ + return; + } + int gene_start = indptr[gene]; + int gene_end = indptr[gene+1]; + + for (int cell_idx = gene_start+threadIdx.x; cell_idx= n_cells || !mask[cell]){ - return; +dense_aggr_kernel_C = r""" + (const {0} *data, double* out, + int* cats, bool* mask, size_t n_cells, size_t n_genes, size_t n_groups){ + + size_t i = blockIdx.x * blockDim.x + threadIdx.x; + size_t stride = gridDim.x * blockDim.x; + size_t N = n_cells * n_genes; + while (i < N){ + if (i >= N) return; + size_t cell = i / n_genes; + size_t gene = i % n_genes; + if(mask[cell]){ + size_t group = (size_t) cats[cell]; + + double value = (double)data[cell * n_genes + gene]; + if (value != 0){ + atomicAdd(&out[group*n_genes+gene], value); + atomicAdd(&out[group*n_genes+gene+n_genes*n_groups], 1); + atomicAdd(&out[group*n_genes+gene+2*n_genes*n_groups], value*value); + } + } + i += stride; } +} +""" - int group = cats[cell]; - double major = numcells[group]; - - for (int gene = threadIdx.x; gene < n_genes; gene += blockDim.x){ - double value = (double)data[cell * n_genes + gene]; - if (value != 0){ - atomicAdd(&sums[group * n_genes + gene], value); - atomicAdd(&counts[group * n_genes + gene], 1); - atomicAdd(&means[group * n_genes + gene], value / major); - atomicAdd(&vars[group * n_genes + gene], value * value / major);; +dense_aggr_kernel_F = r""" + (const {0} *data, double* out, + int* cats, bool* mask, size_t n_cells, size_t n_genes, size_t n_groups){ + size_t i = blockIdx.x * blockDim.x + threadIdx.x; + size_t stride = gridDim.x * blockDim.x; + size_t N = n_cells * n_genes; + while (i < N){ + if (i >= N) return; + size_t cell = i % n_cells; + size_t gene = i / n_cells; + if(mask[cell]){ + size_t group = (size_t) cats[cell]; + + double value = (double)data[gene * n_cells + cell]; + if (value != 0){ + atomicAdd(&out[group*n_genes+gene], value); + atomicAdd(&out[group*n_genes+gene+n_genes*n_groups], 1); + atomicAdd(&out[group*n_genes+gene+2*n_genes*n_groups], value*value); + } } + i += stride; } } """ @@ -94,6 +148,12 @@ def _get_aggr_sparse_kernel(dtype): ) +def _get_aggr_sparse_kernel_csc(dtype): + return cuda_kernel_factory( + sparse_dense_aggr_kernel_csc, (dtype,), "sparse_dense_aggr_kernel_csc" + ) + + def _get_aggr_sparse_sparse_kernel(dtype): return cuda_kernel_factory( sparse_sparse_aggr_kernel, (dtype,), "sparse_sparse_aggr_kernel" @@ -104,5 +164,9 @@ def _get_sparse_var_kernel(dtype): return cuda_kernel_factory(sparse_var_kernel, (dtype,), "sparse_var_kernel") -def _get_aggr_dense_kernel(dtype): - return cuda_kernel_factory(dense_aggr_kernel, (dtype,), "dense_aggr_kernel") +def _get_aggr_dense_kernel_C(dtype): + return cuda_kernel_factory(dense_aggr_kernel_C, (dtype,), "dense_aggr_kernel_C") + + +def _get_aggr_dense_kernel_F(dtype): + return cuda_kernel_factory(dense_aggr_kernel_F, (dtype,), "dense_aggr_kernel_F") diff --git a/src/rapids_singlecell/preprocessing/_pca.py b/src/rapids_singlecell/preprocessing/_pca.py index 063fa99a..e2e8491b 100644 --- a/src/rapids_singlecell/preprocessing/_pca.py +++ b/src/rapids_singlecell/preprocessing/_pca.py @@ -126,7 +126,9 @@ def pca( X = _get_obs_rep(adata, layer=layer) - mask_var_param, mask_var = _handle_mask_var(adata, mask_var, use_highly_variable) + mask_var_param, mask_var = _handle_mask_var( + adata, mask_var, use_highly_variable=use_highly_variable + ) del use_highly_variable X = X[:, mask_var] if mask_var is not None else X diff --git a/tests/dask/test_dask_aggr.py b/tests/dask/test_dask_aggr.py new file mode 100644 index 00000000..19acb9b8 --- /dev/null +++ b/tests/dask/test_dask_aggr.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +import cupy as cp +import numpy as np +import pytest +from cupyx.scipy import sparse as cusparse +from scanpy.datasets import pbmc3k_processed + +import rapids_singlecell as rsc +from rapids_singlecell._testing import ( + as_dense_cupy_dask_array, + as_sparse_cupy_dask_array, +) + + +@pytest.mark.parametrize("data_kind", ["sparse", "dense"]) +@pytest.mark.parametrize("use_mask", [True, False]) +def test_dask_aggr(client, data_kind, use_mask): + adata_1 = pbmc3k_processed() + adata_2 = pbmc3k_processed() + + if data_kind == "sparse": + adata_1 = adata_1.raw.to_adata() + adata_2 = adata_2.raw.to_adata() + adata_1.X = cusparse.csr_matrix(adata_1.X.astype(np.float64)) + adata_2.X = as_sparse_cupy_dask_array(adata_2.X.astype(np.float64)) + elif data_kind == "dense": + adata_1.X = cp.array(adata_1.X.astype(np.float64)) + adata_2.X = as_dense_cupy_dask_array(adata_2.X.astype(np.float64)) + else: + raise ValueError(f"Unknown data_kind {data_kind}") + if use_mask: + mask = adata_1.obs.louvain == "Megakaryocytes" + else: + mask = None + + out_in_memory = rsc.get.aggregate( + adata_2, + by="louvain", + func=["sum", "mean", "var", "count_nonzero"], + mask=mask, + ) + out_dask = rsc.get.aggregate( + adata_1, + by="louvain", + func=["sum", "mean", "var", "count_nonzero"], + mask=mask, + ) + for i in range(4): + c = ["sum", "mean", "var", "count_nonzero"][i] + a = out_in_memory.layers[c] + b = out_dask.layers[c] + cp.testing.assert_allclose(a, b) diff --git a/tests/test_aggregated.py b/tests/test_aggregated.py index fa0d3b13..0ab430ee 100644 --- a/tests/test_aggregated.py +++ b/tests/test_aggregated.py @@ -343,23 +343,45 @@ def test_factors(): cp.testing.assert_array_equal(res.layers["sum"], adata.X) -def test_sparse_vs_dense(): +@pytest.mark.parametrize("metric", ["sum", "mean", "var", "count_nonzero"]) +def test_sparse_vs_dense(metric): adata = pbmc3k_processed().raw.to_adata() rsc.get.anndata_to_GPU(adata) mask = adata.obs.louvain == "Megakaryocytes" - rsc_get = rsc.get.aggregate( + rsc_get = rsc.get.aggregate(adata, by="louvain", func=metric, mask=mask) + rsc_get_sparse = rsc.get.aggregate( + adata, + by="louvain", + func=metric, + mask=mask, + return_sparse=True, + ) + + a = rsc_get_sparse.layers[metric].toarray() + b = rsc_get.layers[metric] + cp.testing.assert_allclose(a, b) + + +def test_c_contiguous_vs_fortran_contiguous(): + adata = pbmc3k_processed().raw.to_adata() + adata = adata[:, :1000].copy() + adata.X = adata.X.toarray() + rsc.get.anndata_to_GPU(adata) + adata.X = cp.asfortranarray(adata.X) + mask = adata.obs.louvain == "Megakaryocytes" + rsc_get_F = rsc.get.aggregate( adata, by="louvain", func=["sum", "mean", "var", "count_nonzero"], mask=mask ) - rsc_get_sparse = rsc.get.aggregate( + adata.X = cp.ascontiguousarray(adata.X) + rsc_get_C = rsc.get.aggregate( adata, by="louvain", func=["sum", "mean", "var", "count_nonzero"], mask=mask, - return_sparse=True, ) for i in range(4): c = ["sum", "mean", "var", "count_nonzero"][i] - a = rsc_get_sparse.layers[c].toarray() - b = rsc_get.layers[c] + a = rsc_get_C.layers[c] + b = rsc_get_F.layers[c] cp.testing.assert_allclose(a, b)