From 86175d5ba133396ca28476400f27927ec2719f7f Mon Sep 17 00:00:00 2001 From: Intron7 Date: Wed, 25 Jun 2025 10:17:15 +0200 Subject: [PATCH 01/19] add csc kernel --- src/rapids_singlecell/get/_aggregated.py | 17 ++++++---- .../get/_kernels/_aggr_kernels.py | 34 +++++++++++++++++++ 2 files changed, 45 insertions(+), 6 deletions(-) diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index aa2729fe..eb5359be 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -73,17 +73,22 @@ def count_mean_var_sparse(self, dof: int = 1): """ assert dof >= 0 - from ._kernels._aggr_kernels import _get_aggr_sparse_kernel + from ._kernels._aggr_kernels import ( + _get_aggr_sparse_kernel, + _get_aggr_sparse_kernel_csc, + ) - if self.data.format == "csc": - self.data = self.data.tocsr() means = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) var = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) sums = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) counts = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.int32) - block = (128,) - grid = (self.data.shape[0],) - aggr_kernel = _get_aggr_sparse_kernel(self.data.dtype) + block = (512,) + if self.data.format == "csc": + grid = (self.data.shape[1],) + aggr_kernel = _get_aggr_sparse_kernel_csc(self.data.dtype) + else: + grid = (self.data.shape[0],) + aggr_kernel = _get_aggr_sparse_kernel(self.data.dtype) mask = self._get_mask() aggr_kernel( grid, diff --git a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py index a3e342de..144f1ca1 100644 --- a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py +++ b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py @@ -24,6 +24,34 @@ } """ + +sparse_dense_aggr_kernel_csc = r""" + (const int *indptr, const int *index,const {0} *data, + int* counts,double* sums, double* means, double* vars, int* cats, double* numcells,bool* mask,int n_cells, int n_genes){ + int gene = blockIdx.x; + if(gene >= n_genes){ + return; + } + int gene_start = indptr[gene]; + int gene_end = indptr[gene+1]; + + for (int cell_idx = gene_start+threadIdx.x; cell_idx Date: Thu, 26 Jun 2025 05:29:02 -0700 Subject: [PATCH 02/19] add F-continous --- src/rapids_singlecell/get/_aggregated.py | 17 +++-- .../get/_kernels/_aggr_kernels.py | 63 ++++++++++++++----- tests/test_aggregated.py | 23 +++++++ 3 files changed, 82 insertions(+), 21 deletions(-) diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index eb5359be..311a30cb 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -280,19 +280,24 @@ def count_mean_var_dense(self, dof: int = 1): """ assert dof >= 0 - from ._kernels._aggr_kernels import _get_aggr_dense_kernel + from ._kernels._aggr_kernels import _get_aggr_dense_kernel_C, _get_aggr_dense_kernel_F means = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) var = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) sums = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) counts = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.int32) - block = (128,) - grid = (self.data.shape[0],) - aggr_kernel = _get_aggr_dense_kernel(self.data.dtype) + N = self.data.shape[0] * self.data.shape[1] + threads_per_block = 256 + blocks = (N + threads_per_block - 1) // threads_per_block + + if self.data.flags.c_contiguous: + aggr_kernel = _get_aggr_dense_kernel_C(self.data.dtype) + else: + aggr_kernel = _get_aggr_dense_kernel_F(self.data.dtype) mask = self._get_mask() aggr_kernel( - grid, - block, + (blocks,), + (threads_per_block,), ( self.data.data, counts, diff --git a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py index 144f1ca1..73137e9f 100644 --- a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py +++ b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py @@ -92,30 +92,60 @@ """ -dense_aggr_kernel = r""" +dense_aggr_kernel_C = r""" (const {0} *data, int* counts, double* sums, double* means, double* vars, - int* cats, double* numcells, bool* mask, int n_cells, int n_genes){ - int cell = blockIdx.x; - if(cell >= n_cells || !mask[cell]){ + int* cats, double* numcells, bool* mask, size_t n_cells, size_t n_genes){ + + size_t i = blockIdx.x * blockDim.x + threadIdx.x; + size_t N = n_cells * n_genes; + if (i >= N) return; + size_t cell = i / n_genes; + size_t gene = i % n_genes; + if(!mask[cell]){ return; } - int group = cats[cell]; + size_t group = (size_t)cats[cell]; double major = numcells[group]; - for (int gene = threadIdx.x; gene < n_genes; gene += blockDim.x){ - double value = (double)data[cell * n_genes + gene]; - if (value != 0){ - atomicAdd(&sums[group * n_genes + gene], value); - atomicAdd(&counts[group * n_genes + gene], 1); - atomicAdd(&means[group * n_genes + gene], value / major); - atomicAdd(&vars[group * n_genes + gene], value * value / major);; - } + double value = (double)data[cell * n_genes + gene]; + if (value != 0){ + atomicAdd(&sums[group * n_genes + gene], value); + atomicAdd(&counts[group * n_genes + gene], 1); + atomicAdd(&means[group * n_genes + gene], value / major); + atomicAdd(&vars[group * n_genes + gene], value * value / major); + } +} +""" + +dense_aggr_kernel_F = r""" + (const {0} *data, int* counts, double* sums, double* means, double* vars, + int* cats, double* numcells, bool* mask, size_t n_cells, size_t n_genes){ + + size_t i = blockIdx.x * blockDim.x + threadIdx.x; + size_t N = n_cells * n_genes; + if (i >= N) return; + size_t cell = i % n_cells; + size_t gene = i / n_cells; + if(!mask[cell]){ + return; + } + + size_t group = (size_t) cats[cell]; + double major = numcells[group]; + + double value = (double)data[gene * n_cells + cell]; + if (value != 0){ + atomicAdd(&sums[group * n_genes + gene], value); + atomicAdd(&counts[group * n_genes + gene], 1); + atomicAdd(&means[group * n_genes + gene], value / major); + atomicAdd(&vars[group * n_genes + gene], value * value / major); } } """ + def _get_aggr_sparse_kernel(dtype): return cuda_kernel_factory( sparse_dense_aggr_kernel, (dtype,), "sparse_dense_aggr_kernel" @@ -138,5 +168,8 @@ def _get_sparse_var_kernel(dtype): return cuda_kernel_factory(sparse_var_kernel, (dtype,), "sparse_var_kernel") -def _get_aggr_dense_kernel(dtype): - return cuda_kernel_factory(dense_aggr_kernel, (dtype,), "dense_aggr_kernel") +def _get_aggr_dense_kernel_C(dtype): + return cuda_kernel_factory(dense_aggr_kernel_C, (dtype,), "dense_aggr_kernel_C") + +def _get_aggr_dense_kernel_F(dtype): + return cuda_kernel_factory(dense_aggr_kernel_F, (dtype,), "dense_aggr_kernel_F") diff --git a/tests/test_aggregated.py b/tests/test_aggregated.py index fa0d3b13..bab880a2 100644 --- a/tests/test_aggregated.py +++ b/tests/test_aggregated.py @@ -363,3 +363,26 @@ def test_sparse_vs_dense(): a = rsc_get_sparse.layers[c].toarray() b = rsc_get.layers[c] cp.testing.assert_allclose(a, b) + +def test_c_contiguous_vs_fortran_contiguous(): + adata = pbmc3k_processed() + rsc.get.anndata_to_GPU(adata) + adata.X = cp.asfortranarray(adata.X) + mask = adata.obs.louvain == "Megakaryocytes" + rsc_get_F = rsc.get.aggregate( + adata, by="louvain", func=["sum", "mean", "var", "count_nonzero"], mask=mask + ) + adata.X = cp.ascontiguousarray(adata.X) + rsc_get_C = rsc.get.aggregate( + adata, + by="louvain", + func=["sum", "mean", "var", "count_nonzero"], + mask=mask, + return_sparse=True, + ) + + for i in range(4): + c = ["sum", "mean", "var", "count_nonzero"][i] + a = rsc_get_C.layers[c] + b = rsc_get_F.layers[c] + cp.testing.assert_allclose(a, b) From a0c26363c95ac361efc86cea4e0752f4e47d9a94 Mon Sep 17 00:00:00 2001 From: Intron7 Date: Mon, 30 Jun 2025 15:47:37 +0200 Subject: [PATCH 03/19] add dask test --- src/rapids_singlecell/get/_aggregated.py | 95 +++++++++++++++++++++++- 1 file changed, 91 insertions(+), 4 deletions(-) diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index 311a30cb..c1f3f4d6 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -8,18 +8,22 @@ ) import cupy as cp -import numpy as np from anndata import AnnData from cupyx.scipy import sparse as cp_sparse from scanpy._utils import _resolve_axis from scanpy.get._aggregated import _combine_categories +from rapids_singlecell._compat import ( + DaskArray, + _meta_dense, +) from rapids_singlecell.get import _check_mask from rapids_singlecell.preprocessing._utils import _check_gpu_X if TYPE_CHECKING: from collections.abc import Collection, Iterable + import numpy as np import pandas as pd from numpy.typing import NDArray @@ -52,7 +56,7 @@ def __init__( ) -> None: self.mask = mask self.groupby = cp.array(groupby.codes, dtype=cp.int32) - self.n_cells = cp.array(np.bincount(groupby.codes), dtype=cp.float64).reshape( + self.n_cells = cp.array(cp.bincount(self.groupby), dtype=cp.float64).reshape( -1, 1 ) self.data = data @@ -66,6 +70,84 @@ def _get_mask(self): else: return cp.ones(self.data.shape[0], dtype=bool) + def count_mean_var_sparse_dask(self, dof: int = 1): + """ + This function is used to calculate the sum, mean, and variance of the sparse data matrix. + It uses a custom cuda-kernel to perform the aggregation. + """ + import dask + import dask.array as da + + assert dof >= 0 + from ._kernels._aggr_kernels import ( + _get_aggr_sparse_kernel, + ) + + kernel = _get_aggr_sparse_kernel(self.data.dtype) + kernel.compile() + + def __aggregate_dask(X_part, mask_part, groupby_part): + means = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.float64) + var = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.float64) + sums = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.float64) + counts = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.int32) + block = (512,) + grid = (self.data.shape[0],) + kernel( + grid, + block, + ( + X_part.indptr, + X_part.indices, + X_part.data, + counts, + sums, + means, + var, + groupby_part, + self.n_cells, + mask_part, + X_part.shape[0], + X_part.shape[1], + ), + ) + counts = counts.astype(cp.float64) + out = cp.stack([means, var, sums, counts], axis=0) + return out + + mask = self._get_mask() + mask_dask = da.from_array( + mask, chunks=(self.data.chunks[0],), meta=_meta_dense(mask.dtype) + ) + groupby_dask = da.from_array( + self.groupby, + chunks=(self.data.chunks[0],), + meta=_meta_dense(self.groupby.dtype), + ) + out = da.blockwise( + __aggregate_dask, + "ij", + self.data, + "ij", + mask_dask, + "i", + groupby_dask, + "i", + meta=_meta_dense(self.data.dtype), + ) + out = out.sum(axis=0) + means = out[0] + var = out[1] + sums = out[2] + counts = out[3] + means, var, sums, counts = dask.compute(means, var, sums, counts) + print(means.shape, var.shape, sums.shape, counts.shape) + counts = counts.astype(cp.int32) + var = var - cp.power(means, 2) + var *= self.n_cells / (self.n_cells - dof) + + return {"mean": means, "var": var, "sum": sums, "count_nonzero": counts} + def count_mean_var_sparse(self, dof: int = 1): """ This function is used to calculate the sum, mean, and variance of the sparse data matrix. @@ -280,7 +362,10 @@ def count_mean_var_dense(self, dof: int = 1): """ assert dof >= 0 - from ._kernels._aggr_kernels import _get_aggr_dense_kernel_C, _get_aggr_dense_kernel_F + from ._kernels._aggr_kernels import ( + _get_aggr_dense_kernel_C, + _get_aggr_dense_kernel_F, + ) means = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) var = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) @@ -426,7 +511,7 @@ def aggregate( elif axis == 1: # i.e., all of `varm`, `obsm`, `layers` are None so we use `X` which must be transposed data = data.T - _check_gpu_X(data) + _check_gpu_X(data, allow_dask=True) dim_df = getattr(adata, axis_name) categorical, new_label_df = _combine_categories(dim_df, by) # Actual computation @@ -439,6 +524,8 @@ def aggregate( if isinstance(data, cp.ndarray): result = groupby.count_mean_var_dense(dof) + elif isinstance(data, DaskArray): + result = groupby.count_mean_var_sparse_dask(dof) else: if return_sparse: result = groupby.count_mean_var_sparse_sparse(funcs, dof) From 546c6cead13d5aee8dbbbf211c27dfcf379bd6e4 Mon Sep 17 00:00:00 2001 From: Intron7 Date: Mon, 30 Jun 2025 07:39:14 -0700 Subject: [PATCH 04/19] update aggr --- src/rapids_singlecell/get/_aggregated.py | 38 +++++++------------ .../get/_kernels/_aggr_kernels.py | 10 ++--- 2 files changed, 17 insertions(+), 31 deletions(-) diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index c1f3f4d6..0f65461f 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -87,9 +87,8 @@ def count_mean_var_sparse_dask(self, dof: int = 1): kernel.compile() def __aggregate_dask(X_part, mask_part, groupby_part): - means = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.float64) - var = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.float64) sums = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.float64) + sq_sums = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.float64) counts = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.int32) block = (512,) grid = (self.data.shape[0],) @@ -102,18 +101,16 @@ def __aggregate_dask(X_part, mask_part, groupby_part): X_part.data, counts, sums, - means, - var, + sq_sums, groupby_part, - self.n_cells, mask_part, X_part.shape[0], X_part.shape[1], ), ) counts = counts.astype(cp.float64) - out = cp.stack([means, var, sums, counts], axis=0) - return out + out = cp.vstack([ sums.ravel(), sq_sums.ravel(), counts.ravel()]) + return out[None, ...] mask = self._get_mask() mask_dask = da.from_array( @@ -134,18 +131,12 @@ def __aggregate_dask(X_part, mask_part, groupby_part): groupby_dask, "i", meta=_meta_dense(self.data.dtype), - ) - out = out.sum(axis=0) - means = out[0] - var = out[1] - sums = out[2] - counts = out[3] - means, var, sums, counts = dask.compute(means, var, sums, counts) - print(means.shape, var.shape, sums.shape, counts.shape) - counts = counts.astype(cp.int32) - var = var - cp.power(means, 2) + ).sum(axis=0) + sums, sq_sums, counts = out[0], out[1], out[2] + sums, sq_sums, counts = dask.compute(sums, sq_sums, counts) + means = sums / self.n_cells + var = sq_sums/ self.n_cells - cp.power(means, 2) var *= self.n_cells / (self.n_cells - dof) - return {"mean": means, "var": var, "sum": sums, "count_nonzero": counts} def count_mean_var_sparse(self, dof: int = 1): @@ -160,8 +151,7 @@ def count_mean_var_sparse(self, dof: int = 1): _get_aggr_sparse_kernel_csc, ) - means = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) - var = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) + sq_sums = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) sums = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) counts = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.int32) block = (512,) @@ -181,8 +171,7 @@ def count_mean_var_sparse(self, dof: int = 1): self.data.data, counts, sums, - means, - var, + sq_sums, self.groupby, self.n_cells, mask, @@ -190,12 +179,11 @@ def count_mean_var_sparse(self, dof: int = 1): self.data.shape[1], ), ) - - var = var - cp.power(means, 2) + means = sums / self.n_cells + var = sq_sums/ self.n_cells - cp.power(means, 2) var *= self.n_cells / (self.n_cells - dof) results = {"sum": sums, "count_nonzero": counts, "mean": means, "var": var} - return results def count_mean_var_sparse_sparse(self, funcs, dof: int = 1): diff --git a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py index 73137e9f..be50cf96 100644 --- a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py +++ b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py @@ -4,7 +4,7 @@ sparse_dense_aggr_kernel = r""" (const int *indptr, const int *index,const {0} *data, - int* counts,double* sums, double* means, double* vars, int* cats, double* numcells,bool* mask,int n_cells, int n_genes){ + int* counts,double* sums, double* sq_sums, int* cats,bool* mask,int n_cells, int n_genes){ int cell = blockIdx.x; if(cell >= n_cells || !mask[cell]){ return; @@ -18,8 +18,7 @@ double value = (double)data[gene]; atomicAdd(&sums[group*n_genes+gene_pos], value); atomicAdd(&counts[group*n_genes+gene_pos], 1); - atomicAdd(&means[group*n_genes+gene_pos], value/major); - atomicAdd(&vars[group*n_genes+gene_pos], value*value/major); + atomicAdd(&sq_sums[group*n_genes+gene_pos], value); } } """ @@ -27,7 +26,7 @@ sparse_dense_aggr_kernel_csc = r""" (const int *indptr, const int *index,const {0} *data, - int* counts,double* sums, double* means, double* vars, int* cats, double* numcells,bool* mask,int n_cells, int n_genes){ + int* counts,double* sums, double* sq_sums, int* cats, double* numcells,bool* mask,int n_cells, int n_genes){ int gene = blockIdx.x; if(gene >= n_genes){ return; @@ -45,8 +44,7 @@ double value = (double)data[cell_idx]; atomicAdd(&sums[group*n_genes+gene], value); atomicAdd(&counts[group*n_genes+gene], 1); - atomicAdd(&means[group*n_genes+gene], value/major); - atomicAdd(&vars[group*n_genes+gene], value*value/major); + atomicAdd(&sq_sums[group*n_genes+gene], value*value); } } """ From 516a05e6fe2464ce98f19fb9b1848b2235bf33bd Mon Sep 17 00:00:00 2001 From: Intron7 Date: Mon, 30 Jun 2025 16:45:17 +0200 Subject: [PATCH 05/19] update kernels --- src/rapids_singlecell/get/_kernels/_aggr_kernels.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py index be50cf96..b7688b0e 100644 --- a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py +++ b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py @@ -12,7 +12,6 @@ int cell_start = indptr[cell]; int cell_end = indptr[cell+1]; int group = cats[cell]; - double major = numcells[group]; for (int gene = cell_start+threadIdx.x; gene= N) return; @@ -119,7 +117,7 @@ dense_aggr_kernel_F = r""" (const {0} *data, int* counts, double* sums, double* means, double* vars, int* cats, double* numcells, bool* mask, size_t n_cells, size_t n_genes){ - + size_t i = blockIdx.x * blockDim.x + threadIdx.x; size_t N = n_cells * n_genes; if (i >= N) return; @@ -143,7 +141,6 @@ """ - def _get_aggr_sparse_kernel(dtype): return cuda_kernel_factory( sparse_dense_aggr_kernel, (dtype,), "sparse_dense_aggr_kernel" @@ -169,5 +166,6 @@ def _get_sparse_var_kernel(dtype): def _get_aggr_dense_kernel_C(dtype): return cuda_kernel_factory(dense_aggr_kernel_C, (dtype,), "dense_aggr_kernel_C") + def _get_aggr_dense_kernel_F(dtype): return cuda_kernel_factory(dense_aggr_kernel_F, (dtype,), "dense_aggr_kernel_F") From ff05540d85b5e02dd9204e9e7937d28ad01de846 Mon Sep 17 00:00:00 2001 From: Intron7 Date: Mon, 30 Jun 2025 17:07:12 +0200 Subject: [PATCH 06/19] fix sums --- src/rapids_singlecell/get/_aggregated.py | 15 +++++++++------ .../get/_kernels/_aggr_kernels.py | 4 ++-- tests/test_aggregated.py | 18 ++++++++---------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index 0f65461f..a3af85df 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -88,7 +88,9 @@ def count_mean_var_sparse_dask(self, dof: int = 1): def __aggregate_dask(X_part, mask_part, groupby_part): sums = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.float64) - sq_sums = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.float64) + sq_sums = cp.zeros( + (self.n_cells.shape[0], X_part.shape[1]), dtype=cp.float64 + ) counts = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.int32) block = (512,) grid = (self.data.shape[0],) @@ -109,7 +111,7 @@ def __aggregate_dask(X_part, mask_part, groupby_part): ), ) counts = counts.astype(cp.float64) - out = cp.vstack([ sums.ravel(), sq_sums.ravel(), counts.ravel()]) + out = cp.vstack([sums.ravel(), sq_sums.ravel(), counts.ravel()]) return out[None, ...] mask = self._get_mask() @@ -135,7 +137,7 @@ def __aggregate_dask(X_part, mask_part, groupby_part): sums, sq_sums, counts = out[0], out[1], out[2] sums, sq_sums, counts = dask.compute(sums, sq_sums, counts) means = sums / self.n_cells - var = sq_sums/ self.n_cells - cp.power(means, 2) + var = sq_sums / self.n_cells - cp.power(means, 2) var *= self.n_cells / (self.n_cells - dof) return {"mean": means, "var": var, "sum": sums, "count_nonzero": counts} @@ -151,7 +153,9 @@ def count_mean_var_sparse(self, dof: int = 1): _get_aggr_sparse_kernel_csc, ) - sq_sums = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) + sq_sums = cp.zeros( + (self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64 + ) sums = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) counts = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.int32) block = (512,) @@ -173,14 +177,13 @@ def count_mean_var_sparse(self, dof: int = 1): sums, sq_sums, self.groupby, - self.n_cells, mask, self.data.shape[0], self.data.shape[1], ), ) means = sums / self.n_cells - var = sq_sums/ self.n_cells - cp.power(means, 2) + var = sq_sums / self.n_cells - means**2 var *= self.n_cells / (self.n_cells - dof) results = {"sum": sums, "count_nonzero": counts, "mean": means, "var": var} diff --git a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py index b7688b0e..d0e443e6 100644 --- a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py +++ b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py @@ -17,7 +17,7 @@ double value = (double)data[gene]; atomicAdd(&sums[group*n_genes+gene_pos], value); atomicAdd(&counts[group*n_genes+gene_pos], 1); - atomicAdd(&sq_sums[group*n_genes+gene_pos], value); + atomicAdd(&sq_sums[group*n_genes+gene_pos], value*value); } } """ @@ -25,7 +25,7 @@ sparse_dense_aggr_kernel_csc = r""" (const int *indptr, const int *index,const {0} *data, - int* counts,double* sums, double* sq_sums, int* cats, double* numcells,bool* mask,int n_cells, int n_genes){ + int* counts,double* sums, double* sq_sums, int* cats,bool* mask,int n_cells, int n_genes){ int gene = blockIdx.x; if(gene >= n_genes){ return; diff --git a/tests/test_aggregated.py b/tests/test_aggregated.py index bab880a2..f4d73799 100644 --- a/tests/test_aggregated.py +++ b/tests/test_aggregated.py @@ -343,26 +343,24 @@ def test_factors(): cp.testing.assert_array_equal(res.layers["sum"], adata.X) -def test_sparse_vs_dense(): +@pytest.mark.parametrize("metric", ["sum", "mean", "var", "count_nonzero"]) +def test_sparse_vs_dense(metric): adata = pbmc3k_processed().raw.to_adata() rsc.get.anndata_to_GPU(adata) mask = adata.obs.louvain == "Megakaryocytes" - rsc_get = rsc.get.aggregate( - adata, by="louvain", func=["sum", "mean", "var", "count_nonzero"], mask=mask - ) + rsc_get = rsc.get.aggregate(adata, by="louvain", func=metric, mask=mask) rsc_get_sparse = rsc.get.aggregate( adata, by="louvain", - func=["sum", "mean", "var", "count_nonzero"], + func=metric, mask=mask, return_sparse=True, ) - for i in range(4): - c = ["sum", "mean", "var", "count_nonzero"][i] - a = rsc_get_sparse.layers[c].toarray() - b = rsc_get.layers[c] - cp.testing.assert_allclose(a, b) + a = rsc_get_sparse.layers[metric].toarray() + b = rsc_get.layers[metric] + cp.testing.assert_allclose(a, b) + def test_c_contiguous_vs_fortran_contiguous(): adata = pbmc3k_processed() From a6d83d230fb9d2dcceb42098123cb9cbf4fa4e53 Mon Sep 17 00:00:00 2001 From: Intron7 Date: Mon, 30 Jun 2025 08:29:52 -0700 Subject: [PATCH 07/19] fix dimensions --- src/rapids_singlecell/get/_aggregated.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index a3af85df..6471b9d2 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -136,6 +136,10 @@ def __aggregate_dask(X_part, mask_part, groupby_part): ).sum(axis=0) sums, sq_sums, counts = out[0], out[1], out[2] sums, sq_sums, counts = dask.compute(sums, sq_sums, counts) + sums = sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + sq_sums = sq_sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + counts = counts.reshape(self.n_cells.shape[0], self.data.shape[1]) + counts = counts.astype(cp.int32) means = sums / self.n_cells var = sq_sums / self.n_cells - cp.power(means, 2) var *= self.n_cells / (self.n_cells - dof) From d773b0bfb9eec056b6315231d8b3e1964ec9e522 Mon Sep 17 00:00:00 2001 From: Intron7 Date: Wed, 2 Jul 2025 05:24:42 -0700 Subject: [PATCH 08/19] update dtype --- src/rapids_singlecell/get/_aggregated.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index 6471b9d2..020a8bc8 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -93,7 +93,7 @@ def __aggregate_dask(X_part, mask_part, groupby_part): ) counts = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.int32) block = (512,) - grid = (self.data.shape[0],) + grid = (X_part.shape[1],) kernel( grid, block, @@ -132,7 +132,8 @@ def __aggregate_dask(X_part, mask_part, groupby_part): "i", groupby_dask, "i", - meta=_meta_dense(self.data.dtype), + meta=_meta_dense(cp.float64), + dtype=cp.float64, ).sum(axis=0) sums, sq_sums, counts = out[0], out[1], out[2] sums, sq_sums, counts = dask.compute(sums, sq_sums, counts) From 5cfe6700908317fac8c8ec1b2a1f942ee91831a2 Mon Sep 17 00:00:00 2001 From: Severin Dicks <37635888+Intron7@users.noreply.github.com> Date: Tue, 8 Jul 2025 14:42:30 +0200 Subject: [PATCH 09/19] Update aggregate2 (#404) * test map_blocks * add dense prototype * update kernels * make nicer * fix 64_bit * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- src/rapids_singlecell/get/_aggregated.py | 153 +++++++++++++----- .../get/_kernels/_aggr_kernels.py | 99 ++++++------ 2 files changed, 165 insertions(+), 87 deletions(-) diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index 020a8bc8..bb82009b 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -75,7 +75,6 @@ def count_mean_var_sparse_dask(self, dof: int = 1): This function is used to calculate the sum, mean, and variance of the sparse data matrix. It uses a custom cuda-kernel to perform the aggregation. """ - import dask import dask.array as da assert dof >= 0 @@ -87,13 +86,12 @@ def count_mean_var_sparse_dask(self, dof: int = 1): kernel.compile() def __aggregate_dask(X_part, mask_part, groupby_part): - sums = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.float64) - sq_sums = cp.zeros( - (self.n_cells.shape[0], X_part.shape[1]), dtype=cp.float64 + out = cp.zeros( + (1, 3, self.n_cells.shape[0] * self.data.shape[1]), dtype=cp.float64 ) - counts = cp.zeros((self.n_cells.shape[0], X_part.shape[1]), dtype=cp.int32) block = (512,) - grid = (X_part.shape[1],) + grid = (X_part.shape[0],) + kernel( grid, block, @@ -101,18 +99,15 @@ def __aggregate_dask(X_part, mask_part, groupby_part): X_part.indptr, X_part.indices, X_part.data, - counts, - sums, - sq_sums, + out, groupby_part, mask_part, X_part.shape[0], X_part.shape[1], + self.n_cells.shape[0], ), ) - counts = counts.astype(cp.float64) - out = cp.vstack([sums.ravel(), sq_sums.ravel(), counts.ravel()]) - return out[None, ...] + return out mask = self._get_mask() mask_dask = da.from_array( @@ -123,6 +118,7 @@ def __aggregate_dask(X_part, mask_part, groupby_part): chunks=(self.data.chunks[0],), meta=_meta_dense(self.groupby.dtype), ) + # Use map_blocks instead of blockwise out = da.blockwise( __aggregate_dask, "ij", @@ -135,11 +131,84 @@ def __aggregate_dask(X_part, mask_part, groupby_part): meta=_meta_dense(cp.float64), dtype=cp.float64, ).sum(axis=0) - sums, sq_sums, counts = out[0], out[1], out[2] - sums, sq_sums, counts = dask.compute(sums, sq_sums, counts) + out = out.compute() + sums, counts, sq_sums = out[0], out[1], out[2] sums = sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + counts = counts.reshape(self.n_cells.shape[0], self.data.shape[1]) sq_sums = sq_sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + counts = counts.astype(cp.int32) + means = sums / self.n_cells + var = sq_sums / self.n_cells - cp.power(means, 2) + var *= self.n_cells / (self.n_cells - dof) + return {"mean": means, "var": var, "sum": sums, "count_nonzero": counts} + + def count_mean_var_dense_dask(self, dof: int = 1): + """ + This function is used to calculate the sum, mean, and variance of the dense data matrix. + It uses a custom cuda-kernel to perform the aggregation. + """ + import dask.array as da + + assert dof >= 0 + from ._kernels._aggr_kernels import ( + _get_aggr_dense_kernel_C, + ) + + kernel_dense = _get_aggr_dense_kernel_C(self.data.dtype) + kernel_dense.compile() + + def __aggregate_dask_dense(X_part, mask_part, groupby_part): + out = cp.zeros( + (1, 3, self.n_cells.shape[0] * self.data.shape[1]), dtype=cp.float64 + ) + N = X_part.shape[0] * X_part.shape[1] + threads_per_block = 512 + blocks = min( + (N + threads_per_block - 1) // threads_per_block, + cp.cuda.Device().attributes["MultiProcessorCount"] * 8, + ) + kernel_dense( + (blocks,), + (threads_per_block,), + ( + X_part, + out, + groupby_part, + mask_part, + X_part.shape[0], + X_part.shape[1], + self.n_cells.shape[0], + ), + ) + return out + + mask = self._get_mask() + mask_dask = da.from_array( + mask, chunks=(self.data.chunks[0],), meta=_meta_dense(mask.dtype) + ) + groupby_dask = da.from_array( + self.groupby, + chunks=(self.data.chunks[0],), + meta=_meta_dense(self.groupby.dtype), + ) + # Use map_blocks instead of blockwise + out = da.blockwise( + __aggregate_dask_dense, + "ij", + self.data, + "ij", + mask_dask, + "i", + groupby_dask, + "i", + meta=_meta_dense(cp.float64), + dtype=cp.float64, + ).sum(axis=0) + out = out.compute() + sums, counts, sq_sums = out[0], out[1], out[2] + sums = sums.reshape(self.n_cells.shape[0], self.data.shape[1]) counts = counts.reshape(self.n_cells.shape[0], self.data.shape[1]) + sq_sums = sq_sums.reshape(self.n_cells.shape[0], self.data.shape[1]) counts = counts.astype(cp.int32) means = sums / self.n_cells var = sq_sums / self.n_cells - cp.power(means, 2) @@ -158,11 +227,10 @@ def count_mean_var_sparse(self, dof: int = 1): _get_aggr_sparse_kernel_csc, ) - sq_sums = cp.zeros( - (self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64 + out = cp.zeros( + (3, self.n_cells.shape[0] * self.data.shape[1]), dtype=cp.float64 ) - sums = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) - counts = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.int32) + block = (512,) if self.data.format == "csc": grid = (self.data.shape[1],) @@ -178,15 +246,19 @@ def count_mean_var_sparse(self, dof: int = 1): self.data.indptr, self.data.indices, self.data.data, - counts, - sums, - sq_sums, + out, self.groupby, mask, self.data.shape[0], self.data.shape[1], + self.n_cells.shape[0], ), ) + sums, counts, sq_sums = out[0, :], out[1, :], out[2, :] + sums = sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + sq_sums = sq_sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + counts = counts.reshape(self.n_cells.shape[0], self.data.shape[1]) + counts = counts.astype(cp.int32) means = sums / self.n_cells var = sq_sums / self.n_cells - means**2 var *= self.n_cells / (self.n_cells - dof) @@ -363,14 +435,14 @@ def count_mean_var_dense(self, dof: int = 1): _get_aggr_dense_kernel_F, ) - means = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) - var = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) - sums = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) - counts = cp.zeros((self.n_cells.shape[0], self.data.shape[1]), dtype=cp.int32) - N = self.data.shape[0] * self.data.shape[1] - threads_per_block = 256 - blocks = (N + threads_per_block - 1) // threads_per_block + out = cp.zeros((3, self.n_cells.shape[0], self.data.shape[1]), dtype=cp.float64) + N = self.data.shape[0] * self.data.shape[1] + threads_per_block = 512 + blocks = min( + (N + threads_per_block - 1) // threads_per_block, + cp.cuda.Device().attributes["MultiProcessorCount"] * 8, + ) if self.data.flags.c_contiguous: aggr_kernel = _get_aggr_dense_kernel_C(self.data.dtype) else: @@ -380,20 +452,22 @@ def count_mean_var_dense(self, dof: int = 1): (blocks,), (threads_per_block,), ( - self.data.data, - counts, - sums, - means, - var, + self.data, + out, self.groupby, - self.n_cells, mask, self.data.shape[0], self.data.shape[1], + self.n_cells.shape[0], ), ) - - var = var - cp.power(means, 2) + sums, counts, sq_sums = out[0], out[1], out[2] + sums = sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + counts = counts.reshape(self.n_cells.shape[0], self.data.shape[1]) + sq_sums = sq_sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + counts = counts.astype(cp.int32) + means = sums / self.n_cells + var = sq_sums / self.n_cells - cp.power(means, 2) var *= self.n_cells / (self.n_cells - dof) results = {"sum": sums, "count_nonzero": counts, "mean": means, "var": var} @@ -510,8 +584,6 @@ def aggregate( _check_gpu_X(data, allow_dask=True) dim_df = getattr(adata, axis_name) categorical, new_label_df = _combine_categories(dim_df, by) - # Actual computation - groupby = Aggregate(groupby=categorical, data=data, mask=mask) funcs = set([func] if isinstance(func, str) else func) @@ -521,7 +593,10 @@ def aggregate( if isinstance(data, cp.ndarray): result = groupby.count_mean_var_dense(dof) elif isinstance(data, DaskArray): - result = groupby.count_mean_var_sparse_dask(dof) + if isinstance(data._meta, cp.ndarray): + result = groupby.count_mean_var_dense_dask(dof) + else: + result = groupby.count_mean_var_sparse_dask(dof) else: if return_sparse: result = groupby.count_mean_var_sparse_sparse(funcs, dof) diff --git a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py index d0e443e6..0da06c5e 100644 --- a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py +++ b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py @@ -4,20 +4,20 @@ sparse_dense_aggr_kernel = r""" (const int *indptr, const int *index,const {0} *data, - int* counts,double* sums, double* sq_sums, int* cats,bool* mask,int n_cells, int n_genes){ - int cell = blockIdx.x; + double* out, int* cats,bool* mask,size_t n_cells, size_t n_genes, size_t n_groups){ + size_t cell = blockIdx.x; if(cell >= n_cells || !mask[cell]){ return; } int cell_start = indptr[cell]; int cell_end = indptr[cell+1]; - int group = cats[cell]; + size_t group = (size_t)cats[cell]; for (int gene = cell_start+threadIdx.x; gene= n_genes){ return; } @@ -34,15 +34,15 @@ int gene_end = indptr[gene+1]; for (int cell_idx = gene_start+threadIdx.x; cell_idx= N) return; - size_t cell = i / n_genes; - size_t gene = i % n_genes; - if(!mask[cell]){ - return; - } + while (i < N){ + if (i >= N) return; + size_t cell = i / n_genes; + size_t gene = i % n_genes; + if(!mask[cell]){ + return; + } - size_t group = (size_t)cats[cell]; - double major = numcells[group]; - - double value = (double)data[cell * n_genes + gene]; - if (value != 0){ - atomicAdd(&sums[group * n_genes + gene], value); - atomicAdd(&counts[group * n_genes + gene], 1); - atomicAdd(&means[group * n_genes + gene], value / major); - atomicAdd(&vars[group * n_genes + gene], value * value / major); + size_t group = (size_t) cats[cell]; + + double value = (double)data[cell * n_genes + gene]; + if (value != 0){ + atomicAdd(&out[group*n_genes+gene], value); + atomicAdd(&out[group*n_genes+gene+n_genes*n_groups], 1); + atomicAdd(&out[group*n_genes+gene+2*n_genes*n_groups], value*value); + } + i += stride; } } """ dense_aggr_kernel_F = r""" - (const {0} *data, int* counts, double* sums, double* means, double* vars, - int* cats, double* numcells, bool* mask, size_t n_cells, size_t n_genes){ - + (const {0} *data, double* out, + int* cats, bool* mask, size_t n_cells, size_t n_genes, size_t n_groups){ size_t i = blockIdx.x * blockDim.x + threadIdx.x; + size_t stride = gridDim.x * blockDim.x; size_t N = n_cells * n_genes; - if (i >= N) return; - size_t cell = i % n_cells; - size_t gene = i / n_cells; - if(!mask[cell]){ - return; - } + while (i < N){ + if (i >= N) return; + size_t cell = i % n_cells; + size_t gene = i / n_cells; + if(!mask[cell]){ + return; + } - size_t group = (size_t) cats[cell]; - double major = numcells[group]; + size_t group = (size_t) cats[cell]; - double value = (double)data[gene * n_cells + cell]; - if (value != 0){ - atomicAdd(&sums[group * n_genes + gene], value); - atomicAdd(&counts[group * n_genes + gene], 1); - atomicAdd(&means[group * n_genes + gene], value / major); - atomicAdd(&vars[group * n_genes + gene], value * value / major); + double value = (double)data[gene * n_cells + cell]; + if (value != 0){ + atomicAdd(&out[group*n_genes+gene], value); + atomicAdd(&out[group*n_genes+gene+n_genes*n_groups], 1); + atomicAdd(&out[group*n_genes+gene+2*n_genes*n_groups], value*value); + } + i += stride; } } """ From 69fabd51e523409ada12e5f3ccaa68a89e952027 Mon Sep 17 00:00:00 2001 From: Intron7 Date: Tue, 8 Jul 2025 06:39:04 -0700 Subject: [PATCH 10/19] update blockwise --- src/rapids_singlecell/get/_aggregated.py | 31 ++++++++++++------------ 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index bb82009b..da4a26af 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -84,10 +84,10 @@ def count_mean_var_sparse_dask(self, dof: int = 1): kernel = _get_aggr_sparse_kernel(self.data.dtype) kernel.compile() - + n_groups = self.n_cells.shape[0] def __aggregate_dask(X_part, mask_part, groupby_part): out = cp.zeros( - (1, 3, self.n_cells.shape[0] * self.data.shape[1]), dtype=cp.float64 + (1,3, n_groups, self.data.shape[1]), dtype=cp.float64 ) block = (512,) grid = (X_part.shape[0],) @@ -104,7 +104,7 @@ def __aggregate_dask(X_part, mask_part, groupby_part): mask_part, X_part.shape[0], X_part.shape[1], - self.n_cells.shape[0], + n_groups, ), ) return out @@ -121,21 +121,21 @@ def __aggregate_dask(X_part, mask_part, groupby_part): # Use map_blocks instead of blockwise out = da.blockwise( __aggregate_dask, - "ij", + "ikgj", self.data, "ij", mask_dask, "i", groupby_dask, "i", - meta=_meta_dense(cp.float64), + meta=cp.empty((3, n_groups, 0), dtype=cp.float64), dtype=cp.float64, + new_axes={'k': (3,),'g':(n_groups,)}, + align_arrays=True, ).sum(axis=0) out = out.compute() sums, counts, sq_sums = out[0], out[1], out[2] - sums = sums.reshape(self.n_cells.shape[0], self.data.shape[1]) - counts = counts.reshape(self.n_cells.shape[0], self.data.shape[1]) - sq_sums = sq_sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + print(sums.shape, counts.shape, sq_sums.shape) counts = counts.astype(cp.int32) means = sums / self.n_cells var = sq_sums / self.n_cells - cp.power(means, 2) @@ -156,10 +156,11 @@ def count_mean_var_dense_dask(self, dof: int = 1): kernel_dense = _get_aggr_dense_kernel_C(self.data.dtype) kernel_dense.compile() + n_groups = self.n_cells.shape[0] def __aggregate_dask_dense(X_part, mask_part, groupby_part): out = cp.zeros( - (1, 3, self.n_cells.shape[0] * self.data.shape[1]), dtype=cp.float64 + (1, 3, n_groups, self.data.shape[1]), dtype=cp.float64 ) N = X_part.shape[0] * X_part.shape[1] threads_per_block = 512 @@ -177,7 +178,7 @@ def __aggregate_dask_dense(X_part, mask_part, groupby_part): mask_part, X_part.shape[0], X_part.shape[1], - self.n_cells.shape[0], + n_groups, ), ) return out @@ -194,21 +195,21 @@ def __aggregate_dask_dense(X_part, mask_part, groupby_part): # Use map_blocks instead of blockwise out = da.blockwise( __aggregate_dask_dense, - "ij", + "ikgj", self.data, "ij", mask_dask, "i", groupby_dask, "i", - meta=_meta_dense(cp.float64), + meta=cp.empty((3, n_groups, 0), dtype=cp.float64), dtype=cp.float64, + new_axes={'k': (3,),'g':(n_groups,)}, + align_arrays=True, ).sum(axis=0) out = out.compute() sums, counts, sq_sums = out[0], out[1], out[2] - sums = sums.reshape(self.n_cells.shape[0], self.data.shape[1]) - counts = counts.reshape(self.n_cells.shape[0], self.data.shape[1]) - sq_sums = sq_sums.reshape(self.n_cells.shape[0], self.data.shape[1]) + print(sums.shape, counts.shape, sq_sums.shape) counts = counts.astype(cp.int32) means = sums / self.n_cells var = sq_sums / self.n_cells - cp.power(means, 2) From ac3e78dff3232c998b00d348d10bbf2596aeac55 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 8 Jul 2025 13:39:29 +0000 Subject: [PATCH 11/19] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rapids_singlecell/get/_aggregated.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index da4a26af..97902565 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -85,10 +85,9 @@ def count_mean_var_sparse_dask(self, dof: int = 1): kernel = _get_aggr_sparse_kernel(self.data.dtype) kernel.compile() n_groups = self.n_cells.shape[0] + def __aggregate_dask(X_part, mask_part, groupby_part): - out = cp.zeros( - (1,3, n_groups, self.data.shape[1]), dtype=cp.float64 - ) + out = cp.zeros((1, 3, n_groups, self.data.shape[1]), dtype=cp.float64) block = (512,) grid = (X_part.shape[0],) @@ -130,7 +129,7 @@ def __aggregate_dask(X_part, mask_part, groupby_part): "i", meta=cp.empty((3, n_groups, 0), dtype=cp.float64), dtype=cp.float64, - new_axes={'k': (3,),'g':(n_groups,)}, + new_axes={"k": (3,), "g": (n_groups,)}, align_arrays=True, ).sum(axis=0) out = out.compute() @@ -159,9 +158,7 @@ def count_mean_var_dense_dask(self, dof: int = 1): n_groups = self.n_cells.shape[0] def __aggregate_dask_dense(X_part, mask_part, groupby_part): - out = cp.zeros( - (1, 3, n_groups, self.data.shape[1]), dtype=cp.float64 - ) + out = cp.zeros((1, 3, n_groups, self.data.shape[1]), dtype=cp.float64) N = X_part.shape[0] * X_part.shape[1] threads_per_block = 512 blocks = min( @@ -204,7 +201,7 @@ def __aggregate_dask_dense(X_part, mask_part, groupby_part): "i", meta=cp.empty((3, n_groups, 0), dtype=cp.float64), dtype=cp.float64, - new_axes={'k': (3,),'g':(n_groups,)}, + new_axes={"k": (3,), "g": (n_groups,)}, align_arrays=True, ).sum(axis=0) out = out.compute() From 932bfc190d3533b0b8e79ad0523c7ac2abc61636 Mon Sep 17 00:00:00 2001 From: Intron7 Date: Thu, 10 Jul 2025 14:48:46 +0200 Subject: [PATCH 12/19] switch to map_blocks --- src/rapids_singlecell/get/_aggregated.py | 60 ++++++++++++++---------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index 97902565..d2c16469 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -110,28 +110,35 @@ def __aggregate_dask(X_part, mask_part, groupby_part): mask = self._get_mask() mask_dask = da.from_array( - mask, chunks=(self.data.chunks[0],), meta=_meta_dense(mask.dtype) + mask, chunks=(self.data.chunks[0]), meta=_meta_dense(mask.dtype) ) groupby_dask = da.from_array( self.groupby, - chunks=(self.data.chunks[0],), + chunks=(self.data.chunks[0]), meta=_meta_dense(self.groupby.dtype), ) # Use map_blocks instead of blockwise - out = da.blockwise( + out = da.map_blocks( __aggregate_dask, - "ikgj", self.data, - "ij", - mask_dask, - "i", - groupby_dask, - "i", - meta=cp.empty((3, n_groups, 0), dtype=cp.float64), + mask_dask[..., None], + groupby_dask[..., None], + meta=cp.empty([], dtype=cp.float64), dtype=cp.float64, - new_axes={"k": (3,), "g": (n_groups,)}, - align_arrays=True, - ).sum(axis=0) + new_axis=( + 1, + 2, + ), + chunks=( + (1,) * self.data.blocks.size, + (3,), + (n_groups,), + (self.data.shape[1],), + ), + ) + print(out.shape) + out = out.sum(axis=0) + print(out.shape) out = out.compute() sums, counts, sq_sums = out[0], out[1], out[2] print(sums.shape, counts.shape, sq_sums.shape) @@ -190,20 +197,25 @@ def __aggregate_dask_dense(X_part, mask_part, groupby_part): meta=_meta_dense(self.groupby.dtype), ) # Use map_blocks instead of blockwise - out = da.blockwise( + out = da.map_blocks( __aggregate_dask_dense, - "ikgj", self.data, - "ij", - mask_dask, - "i", - groupby_dask, - "i", - meta=cp.empty((3, n_groups, 0), dtype=cp.float64), + mask_dask[..., None], + groupby_dask[..., None], + meta=cp.empty([], dtype=cp.float64), dtype=cp.float64, - new_axes={"k": (3,), "g": (n_groups,)}, - align_arrays=True, - ).sum(axis=0) + new_axis=( + 1, + 2, + ), + chunks=( + (1,) * self.data.blocks.size, + (3,), + (n_groups,), + (self.data.shape[1],), + ), + ) + out = out.sum(axis=0) out = out.compute() sums, counts, sq_sums = out[0], out[1], out[2] print(sums.shape, counts.shape, sq_sums.shape) From b805156b0cb1b94b6dcc82d7201e0461608521b6 Mon Sep 17 00:00:00 2001 From: Intron7 Date: Thu, 10 Jul 2025 07:19:45 -0700 Subject: [PATCH 13/19] add test and split --- src/rapids_singlecell/get/_aggregated.py | 22 +++++++----- tests/dask/test_dask_aggr.py | 46 ++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 9 deletions(-) create mode 100644 tests/dask/test_dask_aggr.py diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index d2c16469..ded9ab36 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -70,7 +70,7 @@ def _get_mask(self): else: return cp.ones(self.data.shape[0], dtype=bool) - def count_mean_var_sparse_dask(self, dof: int = 1): + def count_mean_var_sparse_dask(self, dof: int = 1, split_every: int = 2): """ This function is used to calculate the sum, mean, and variance of the sparse data matrix. It uses a custom cuda-kernel to perform the aggregation. @@ -136,19 +136,16 @@ def __aggregate_dask(X_part, mask_part, groupby_part): (self.data.shape[1],), ), ) - print(out.shape) - out = out.sum(axis=0) - print(out.shape) + out = out.sum(axis=0, split_every=split_every) out = out.compute() sums, counts, sq_sums = out[0], out[1], out[2] - print(sums.shape, counts.shape, sq_sums.shape) counts = counts.astype(cp.int32) means = sums / self.n_cells var = sq_sums / self.n_cells - cp.power(means, 2) var *= self.n_cells / (self.n_cells - dof) return {"mean": means, "var": var, "sum": sums, "count_nonzero": counts} - def count_mean_var_dense_dask(self, dof: int = 1): + def count_mean_var_dense_dask(self, dof: int = 1, split_every: int = 2): """ This function is used to calculate the sum, mean, and variance of the dense data matrix. It uses a custom cuda-kernel to perform the aggregation. @@ -215,7 +212,7 @@ def __aggregate_dask_dense(X_part, mask_part, groupby_part): (self.data.shape[1],), ), ) - out = out.sum(axis=0) + out = out.sum(axis=0, split_every=split_every) out = out.compute() sums, counts, sq_sums = out[0], out[1], out[2] print(sums.shape, counts.shape, sq_sums.shape) @@ -497,6 +494,7 @@ def aggregate( obsm: str | None = None, varm: str | None = None, return_sparse: bool = False, + **kwargs, ) -> AnnData: """\ Aggregate data matrix based on some categorical grouping. @@ -603,10 +601,16 @@ def aggregate( if isinstance(data, cp.ndarray): result = groupby.count_mean_var_dense(dof) elif isinstance(data, DaskArray): + if "split_every" in kwargs: + assert isinstance(kwargs["split_every"], int) + assert kwargs["split_every"] > 0 + split_every = kwargs["split_every"] + else: + split_every = 2 if isinstance(data._meta, cp.ndarray): - result = groupby.count_mean_var_dense_dask(dof) + result = groupby.count_mean_var_dense_dask(dof, split_every=split_every) else: - result = groupby.count_mean_var_sparse_dask(dof) + result = groupby.count_mean_var_sparse_dask(dof, split_every=split_every) else: if return_sparse: result = groupby.count_mean_var_sparse_sparse(funcs, dof) diff --git a/tests/dask/test_dask_aggr.py b/tests/dask/test_dask_aggr.py new file mode 100644 index 00000000..c9fb6976 --- /dev/null +++ b/tests/dask/test_dask_aggr.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import cupy as cp +import numpy as np +import pytest +from cupyx.scipy import sparse as cusparse +from scanpy.datasets import pbmc3k_processed + +import rapids_singlecell as rsc +from rapids_singlecell._testing import ( + as_dense_cupy_dask_array, + as_sparse_cupy_dask_array, +) + + +@pytest.mark.parametrize("data_kind", ["sparse", "dense"]) +def test_dask_aggr(client, data_kind): + adata_1 = pbmc3k_processed() + adata_2 = pbmc3k_processed() + + if data_kind == "sparse": + adata_1 = adata_1.raw.to_adata() + adata_2 = adata_2.raw.to_adata() + adata_1.X = cusparse.csr_matrix(adata_1.X.astype(np.float64)) + adata_2.X = as_sparse_cupy_dask_array(adata_2.X.astype(np.float64)) + elif data_kind == "dense": + adata_1.X = cp.array(adata_1.X.astype(np.float64)) + adata_2.X = as_dense_cupy_dask_array(adata_2.X.astype(np.float64)) + else: + raise ValueError(f"Unknown data_kind {data_kind}") + + out_in_memory = rsc.get.aggregate( + adata_2, + by="louvain", + func=["sum", "mean", "var", "count_nonzero"], + ) + out_dask = rsc.get.aggregate( + adata_1, + by="louvain", + func=["sum", "mean", "var", "count_nonzero"], + ) + for i in range(4): + c = ["sum", "mean", "var", "count_nonzero"][i] + a = out_in_memory.layers[c] + b = out_dask.layers[c] + cp.testing.assert_allclose(a, b) From f06ebf253f2cd2594e0f9dc47383666907dcab27 Mon Sep 17 00:00:00 2001 From: Intron7 Date: Thu, 10 Jul 2025 16:35:51 +0200 Subject: [PATCH 14/19] fix tests and mask --- .../get/_kernels/_aggr_kernels.py | 40 +++++++++---------- tests/dask/test_dask_aggr.py | 9 ++++- tests/test_aggregated.py | 5 ++- 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py index 0da06c5e..c8bcca58 100644 --- a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py +++ b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py @@ -99,17 +99,15 @@ if (i >= N) return; size_t cell = i / n_genes; size_t gene = i % n_genes; - if(!mask[cell]){ - return; - } - - size_t group = (size_t) cats[cell]; - - double value = (double)data[cell * n_genes + gene]; - if (value != 0){ - atomicAdd(&out[group*n_genes+gene], value); - atomicAdd(&out[group*n_genes+gene+n_genes*n_groups], 1); - atomicAdd(&out[group*n_genes+gene+2*n_genes*n_groups], value*value); + if(mask[cell]){ + size_t group = (size_t) cats[cell]; + + double value = (double)data[cell * n_genes + gene]; + if (value != 0){ + atomicAdd(&out[group*n_genes+gene], value); + atomicAdd(&out[group*n_genes+gene+n_genes*n_groups], 1); + atomicAdd(&out[group*n_genes+gene+2*n_genes*n_groups], value*value); + } } i += stride; } @@ -126,17 +124,15 @@ if (i >= N) return; size_t cell = i % n_cells; size_t gene = i / n_cells; - if(!mask[cell]){ - return; - } - - size_t group = (size_t) cats[cell]; - - double value = (double)data[gene * n_cells + cell]; - if (value != 0){ - atomicAdd(&out[group*n_genes+gene], value); - atomicAdd(&out[group*n_genes+gene+n_genes*n_groups], 1); - atomicAdd(&out[group*n_genes+gene+2*n_genes*n_groups], value*value); + if(mask[cell]){ + size_t group = (size_t) cats[cell]; + + double value = (double)data[gene * n_cells + cell]; + if (value != 0){ + atomicAdd(&out[group*n_genes+gene], value); + atomicAdd(&out[group*n_genes+gene+n_genes*n_groups], 1); + atomicAdd(&out[group*n_genes+gene+2*n_genes*n_groups], value*value); + } } i += stride; } diff --git a/tests/dask/test_dask_aggr.py b/tests/dask/test_dask_aggr.py index c9fb6976..19acb9b8 100644 --- a/tests/dask/test_dask_aggr.py +++ b/tests/dask/test_dask_aggr.py @@ -14,7 +14,8 @@ @pytest.mark.parametrize("data_kind", ["sparse", "dense"]) -def test_dask_aggr(client, data_kind): +@pytest.mark.parametrize("use_mask", [True, False]) +def test_dask_aggr(client, data_kind, use_mask): adata_1 = pbmc3k_processed() adata_2 = pbmc3k_processed() @@ -28,16 +29,22 @@ def test_dask_aggr(client, data_kind): adata_2.X = as_dense_cupy_dask_array(adata_2.X.astype(np.float64)) else: raise ValueError(f"Unknown data_kind {data_kind}") + if use_mask: + mask = adata_1.obs.louvain == "Megakaryocytes" + else: + mask = None out_in_memory = rsc.get.aggregate( adata_2, by="louvain", func=["sum", "mean", "var", "count_nonzero"], + mask=mask, ) out_dask = rsc.get.aggregate( adata_1, by="louvain", func=["sum", "mean", "var", "count_nonzero"], + mask=mask, ) for i in range(4): c = ["sum", "mean", "var", "count_nonzero"][i] diff --git a/tests/test_aggregated.py b/tests/test_aggregated.py index f4d73799..0ab430ee 100644 --- a/tests/test_aggregated.py +++ b/tests/test_aggregated.py @@ -363,7 +363,9 @@ def test_sparse_vs_dense(metric): def test_c_contiguous_vs_fortran_contiguous(): - adata = pbmc3k_processed() + adata = pbmc3k_processed().raw.to_adata() + adata = adata[:, :1000].copy() + adata.X = adata.X.toarray() rsc.get.anndata_to_GPU(adata) adata.X = cp.asfortranarray(adata.X) mask = adata.obs.louvain == "Megakaryocytes" @@ -376,7 +378,6 @@ def test_c_contiguous_vs_fortran_contiguous(): by="louvain", func=["sum", "mean", "var", "count_nonzero"], mask=mask, - return_sparse=True, ) for i in range(4): From 2d77c16faef0119e96f8b3626fd3f4c1fa5eb209 Mon Sep 17 00:00:00 2001 From: Intron7 Date: Thu, 10 Jul 2025 16:40:55 +0200 Subject: [PATCH 15/19] adds release note --- docs/release-notes/0.13.0.md | 15 +++++++++++++++ docs/release-notes/index.md | 4 ++++ 2 files changed, 19 insertions(+) create mode 100644 docs/release-notes/0.13.0.md diff --git a/docs/release-notes/0.13.0.md b/docs/release-notes/0.13.0.md new file mode 100644 index 00000000..abcbc59a --- /dev/null +++ b/docs/release-notes/0.13.0.md @@ -0,0 +1,15 @@ +### 0.13.0 {small}`the-future` + +```{rubric} Features +``` +* Add support for aggregate operations on CSC matrices, Fortran-ordered arrays, and Dask with sparse CSR and dense matrices {pr}`395` {smaller}`S Dicks` + +```{rubric} Performance +``` + +```{rubric} Bug fixes +``` + + +```{rubric} Misc +``` diff --git a/docs/release-notes/index.md b/docs/release-notes/index.md index ab603767..059730b1 100644 --- a/docs/release-notes/index.md +++ b/docs/release-notes/index.md @@ -2,6 +2,10 @@ # Release notes +## Version 0.13.0 +```{include} /release-notes/0.13.0.md +``` + ## Version 0.12.0 ```{include} /release-notes/0.12.7.md ``` From dfc1e1fc287fa54e6bad947692daf30b5dd31595 Mon Sep 17 00:00:00 2001 From: Intron7 Date: Fri, 11 Jul 2025 12:28:42 +0200 Subject: [PATCH 16/19] update dtypes --- src/rapids_singlecell/get/_kernels/_aggr_kernels.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py index c8bcca58..cda671d1 100644 --- a/src/rapids_singlecell/get/_kernels/_aggr_kernels.py +++ b/src/rapids_singlecell/get/_kernels/_aggr_kernels.py @@ -4,7 +4,8 @@ sparse_dense_aggr_kernel = r""" (const int *indptr, const int *index,const {0} *data, - double* out, int* cats,bool* mask,size_t n_cells, size_t n_genes, size_t n_groups){ + double* out, int* cats,bool* mask, + size_t n_cells, size_t n_genes, size_t n_groups){ size_t cell = blockIdx.x; if(cell >= n_cells || !mask[cell]){ return; @@ -25,7 +26,8 @@ sparse_dense_aggr_kernel_csc = r""" (const int *indptr, const int *index,const {0} *data, - double* out, int* cats,bool* mask,int n_cells, int n_genes, size_t n_groups){ + double* out, int* cats,bool* mask, + size_t n_cells, size_t n_genes, size_t n_groups){ size_t gene = blockIdx.x; if(gene >= n_genes){ return; From 6c6127dfbb18c5d8b4f123c9b68acbc127bac3c7 Mon Sep 17 00:00:00 2001 From: Intron7 Date: Fri, 11 Jul 2025 14:51:39 +0200 Subject: [PATCH 17/19] merge dask --- src/rapids_singlecell/get/_aggregated.py | 133 +++++++---------------- 1 file changed, 40 insertions(+), 93 deletions(-) diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index ded9ab36..b2924201 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -70,31 +70,38 @@ def _get_mask(self): else: return cp.ones(self.data.shape[0], dtype=bool) - def count_mean_var_sparse_dask(self, dof: int = 1, split_every: int = 2): + def count_mean_var_dask(self, dof: int = 1, split_every: int = 2): """ - This function is used to calculate the sum, mean, and variance of the sparse data matrix. - It uses a custom cuda-kernel to perform the aggregation. + This function is used to calculate the sum, mean, and variance of the data matrix. + It automatically detects sparse vs dense matrices and uses the appropriate + CUDA kernel for aggregation. """ import dask.array as da assert dof >= 0 from ._kernels._aggr_kernels import ( + _get_aggr_dense_kernel_C, _get_aggr_sparse_kernel, ) - kernel = _get_aggr_sparse_kernel(self.data.dtype) + if isinstance(self.data._meta, cp.ndarray): + kernel = _get_aggr_dense_kernel_C(self.data.dtype) + is_sparse = False + else: + kernel = _get_aggr_sparse_kernel(self.data.dtype) + is_sparse = True + kernel.compile() n_groups = self.n_cells.shape[0] def __aggregate_dask(X_part, mask_part, groupby_part): out = cp.zeros((1, 3, n_groups, self.data.shape[1]), dtype=cp.float64) - block = (512,) - grid = (X_part.shape[0],) - kernel( - grid, - block, - ( + if is_sparse: + # Sparse matrix kernel parameters + block = (512,) + grid = (X_part.shape[0],) + kernel_args = ( X_part.indptr, X_part.indices, X_part.data, @@ -104,75 +111,18 @@ def __aggregate_dask(X_part, mask_part, groupby_part): X_part.shape[0], X_part.shape[1], n_groups, - ), - ) - return out - - mask = self._get_mask() - mask_dask = da.from_array( - mask, chunks=(self.data.chunks[0]), meta=_meta_dense(mask.dtype) - ) - groupby_dask = da.from_array( - self.groupby, - chunks=(self.data.chunks[0]), - meta=_meta_dense(self.groupby.dtype), - ) - # Use map_blocks instead of blockwise - out = da.map_blocks( - __aggregate_dask, - self.data, - mask_dask[..., None], - groupby_dask[..., None], - meta=cp.empty([], dtype=cp.float64), - dtype=cp.float64, - new_axis=( - 1, - 2, - ), - chunks=( - (1,) * self.data.blocks.size, - (3,), - (n_groups,), - (self.data.shape[1],), - ), - ) - out = out.sum(axis=0, split_every=split_every) - out = out.compute() - sums, counts, sq_sums = out[0], out[1], out[2] - counts = counts.astype(cp.int32) - means = sums / self.n_cells - var = sq_sums / self.n_cells - cp.power(means, 2) - var *= self.n_cells / (self.n_cells - dof) - return {"mean": means, "var": var, "sum": sums, "count_nonzero": counts} - - def count_mean_var_dense_dask(self, dof: int = 1, split_every: int = 2): - """ - This function is used to calculate the sum, mean, and variance of the dense data matrix. - It uses a custom cuda-kernel to perform the aggregation. - """ - import dask.array as da - - assert dof >= 0 - from ._kernels._aggr_kernels import ( - _get_aggr_dense_kernel_C, - ) - - kernel_dense = _get_aggr_dense_kernel_C(self.data.dtype) - kernel_dense.compile() - n_groups = self.n_cells.shape[0] - - def __aggregate_dask_dense(X_part, mask_part, groupby_part): - out = cp.zeros((1, 3, n_groups, self.data.shape[1]), dtype=cp.float64) - N = X_part.shape[0] * X_part.shape[1] - threads_per_block = 512 - blocks = min( - (N + threads_per_block - 1) // threads_per_block, - cp.cuda.Device().attributes["MultiProcessorCount"] * 8, - ) - kernel_dense( - (blocks,), - (threads_per_block,), - ( + ) + else: + # Dense matrix kernel parameters + N = X_part.shape[0] * X_part.shape[1] + threads_per_block = 512 + blocks = min( + (N + threads_per_block - 1) // threads_per_block, + cp.cuda.Device().attributes["MultiProcessorCount"] * 8, + ) + block = (threads_per_block,) + grid = (blocks,) + kernel_args = ( X_part, out, groupby_part, @@ -180,31 +130,30 @@ def __aggregate_dask_dense(X_part, mask_part, groupby_part): X_part.shape[0], X_part.shape[1], n_groups, - ), - ) + ) + + kernel(grid, block, kernel_args) return out mask = self._get_mask() mask_dask = da.from_array( - mask, chunks=(self.data.chunks[0],), meta=_meta_dense(mask.dtype) + mask, chunks=(self.data.chunks[0]), meta=_meta_dense(mask.dtype) ) groupby_dask = da.from_array( self.groupby, - chunks=(self.data.chunks[0],), + chunks=(self.data.chunks[0]), meta=_meta_dense(self.groupby.dtype), ) + # Use map_blocks instead of blockwise out = da.map_blocks( - __aggregate_dask_dense, + __aggregate_dask, self.data, mask_dask[..., None], groupby_dask[..., None], meta=cp.empty([], dtype=cp.float64), dtype=cp.float64, - new_axis=( - 1, - 2, - ), + new_axis=(1, 2), chunks=( (1,) * self.data.blocks.size, (3,), @@ -212,10 +161,10 @@ def __aggregate_dask_dense(X_part, mask_part, groupby_part): (self.data.shape[1],), ), ) + out = out.sum(axis=0, split_every=split_every) out = out.compute() sums, counts, sq_sums = out[0], out[1], out[2] - print(sums.shape, counts.shape, sq_sums.shape) counts = counts.astype(cp.int32) means = sums / self.n_cells var = sq_sums / self.n_cells - cp.power(means, 2) @@ -607,10 +556,8 @@ def aggregate( split_every = kwargs["split_every"] else: split_every = 2 - if isinstance(data._meta, cp.ndarray): - result = groupby.count_mean_var_dense_dask(dof, split_every=split_every) - else: - result = groupby.count_mean_var_sparse_dask(dof, split_every=split_every) + result = groupby.count_mean_var_dask(dof, split_every=split_every) + else: if return_sparse: result = groupby.count_mean_var_sparse_sparse(funcs, dof) From 61f784826bcb3b72a7d5d4706dc684fe0350547f Mon Sep 17 00:00:00 2001 From: Intron7 Date: Fri, 11 Jul 2025 15:13:47 +0200 Subject: [PATCH 18/19] slim down --- src/rapids_singlecell/get/_aggregated.py | 35 ++++++++++++------------ 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/rapids_singlecell/get/_aggregated.py b/src/rapids_singlecell/get/_aggregated.py index b2924201..cad33ba4 100644 --- a/src/rapids_singlecell/get/_aggregated.py +++ b/src/rapids_singlecell/get/_aggregated.py @@ -96,45 +96,43 @@ def count_mean_var_dask(self, dof: int = 1, split_every: int = 2): def __aggregate_dask(X_part, mask_part, groupby_part): out = cp.zeros((1, 3, n_groups, self.data.shape[1]), dtype=cp.float64) + threads_per_block = 512 if is_sparse: # Sparse matrix kernel parameters - block = (512,) grid = (X_part.shape[0],) kernel_args = ( X_part.indptr, X_part.indices, X_part.data, - out, - groupby_part, - mask_part, - X_part.shape[0], - X_part.shape[1], - n_groups, ) else: # Dense matrix kernel parameters N = X_part.shape[0] * X_part.shape[1] - threads_per_block = 512 + blocks = min( (N + threads_per_block - 1) // threads_per_block, cp.cuda.Device().attributes["MultiProcessorCount"] * 8, ) - block = (threads_per_block,) grid = (blocks,) - kernel_args = ( - X_part, + kernel_args = (X_part,) + + kernel( + grid, + (threads_per_block,), + ( + *kernel_args, out, groupby_part, mask_part, X_part.shape[0], X_part.shape[1], n_groups, - ) - - kernel(grid, block, kernel_args) + ), + ) return out + # Prepare Dask arrays mask = self._get_mask() mask_dask = da.from_array( mask, chunks=(self.data.chunks[0]), meta=_meta_dense(mask.dtype) @@ -145,7 +143,7 @@ def __aggregate_dask(X_part, mask_part, groupby_part): meta=_meta_dense(self.groupby.dtype), ) - # Use map_blocks instead of blockwise + # Apply aggregation across all blocks out = da.map_blocks( __aggregate_dask, self.data, @@ -162,13 +160,16 @@ def __aggregate_dask(X_part, mask_part, groupby_part): ), ) - out = out.sum(axis=0, split_every=split_every) - out = out.compute() + # Compute final aggregated results + out = out.sum(axis=0, split_every=split_every).compute() sums, counts, sq_sums = out[0], out[1], out[2] + + # Calculate statistics counts = counts.astype(cp.int32) means = sums / self.n_cells var = sq_sums / self.n_cells - cp.power(means, 2) var *= self.n_cells / (self.n_cells - dof) + return {"mean": means, "var": var, "sum": sums, "count_nonzero": counts} def count_mean_var_sparse(self, dof: int = 1): From e1aad059d467571d00f03696d944f3e50073772e Mon Sep 17 00:00:00 2001 From: Intron7 Date: Fri, 11 Jul 2025 15:32:58 +0200 Subject: [PATCH 19/19] fix hvg --- src/rapids_singlecell/preprocessing/_pca.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/rapids_singlecell/preprocessing/_pca.py b/src/rapids_singlecell/preprocessing/_pca.py index 063fa99a..e2e8491b 100644 --- a/src/rapids_singlecell/preprocessing/_pca.py +++ b/src/rapids_singlecell/preprocessing/_pca.py @@ -126,7 +126,9 @@ def pca( X = _get_obs_rep(adata, layer=layer) - mask_var_param, mask_var = _handle_mask_var(adata, mask_var, use_highly_variable) + mask_var_param, mask_var = _handle_mask_var( + adata, mask_var, use_highly_variable=use_highly_variable + ) del use_highly_variable X = X[:, mask_var] if mask_var is not None else X