Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/execute-nbs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
activate-environment: tutorials
channel-priority: flexible
environment-file: environment.yml
miniforge-variant: Mambaforge
miniforge-variant: Miniforge3
miniforge-version: latest
use-mamba: true
# some important packages are not available as .tar.bz2 anymore
Expand Down
7 changes: 6 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"sphinx.ext.intersphinx",
"sphinx.ext.autosummary",
"sphinx.ext.napoleon",
"sphinx_issues",
"sphinxcontrib.bibtex",
"sphinx_autodoc_typehints",
"sphinx.ext.mathjax",
Expand Down Expand Up @@ -88,9 +89,13 @@

intersphinx_mapping = {
"python": ("https://docs.python.org/3", None),
"anndata": ("https://anndata.readthedocs.io/en/stable/", None),
"anndata": ("https://anndata.readthedocs.io/en/latest/", None), # TODO: change back to stable after 0.12 release
"numpy": ("https://numpy.org/doc/stable/", None),
"scanpy": ("https://scanpy.readthedocs.io/en/stable/", None),
"fast-array-utils": ("https://icb-fast-array-utils.readthedocs-hosted.com/en/stable", None),
"dask": ("https://docs.dask.org/en/stable", None),
"scipy": ("https://docs.scipy.org/doc/scipy", None),
"rapids-singlecell": ("https://rapids-singlecell.readthedocs.io/en/stable/", None),
}

# List of patterns, relative to source directory, that match files and
Expand Down
102 changes: 102 additions & 0 deletions docs/how-to-dask.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Dask Q&A

Here we will go through some common questions and answers about `dask`, with a special focus on its integration with `scanpy` and `anndata`. For more comprehensive tutorials or other topics like {doc}`launching a cluster <dask:deploying>`, head over their documentation.

## Quickstart

### How do I monitor the {doc}`dask dashboard <dask:dashboard>`?

If you are in a jupyter notebook, when you render the `repr` of your `client`, you will see a link, usually something like `http://localhost:8787/status`.
If you are working locally, this link alone should suffice.

If you are working on some sort of remote notebook from a web browser, you will need to replace `http://localhost` by the root url of the notebook.

If you are in vscode, there is a [`dask` extension] which will allow you to monitor there.

### How do I know how to allocate resources?

In `dask`, every worker will receive an equal share of the memory available.
So if you request e.g., a slurm job with 256GB of RAM, and then start 8 workers, each will have 32 GB of memory.

`dask` distributes jobs to each worker generally based on the chunking of the array.
So if you have dense chunks of `(30_000, 30_000)` with 32 bit integers, you will need to be have 3.6 GB for each worker, at the minimum to even load the data.
Then if you do something like matrix multiplication, you will need double or even more, as an example.

### How do I read my data into a `dask` array?

{func}`anndata.experimental.read_elem_lazy` or {func}`anndata.experimental.read_lazy` can help you if you already have data on-disk that was written to the `anndata` file format.
If you use {func}`dask.array.to_zarr`, the data _cannot_ be read in using `anndata`'s functionality as `anndata` will look for its {doc}`specified file format metadata <anndata:fileformat-prose>`.

If you need to implement custom io, generally we found that using {func}`dask.array.map_blocks` provides a nice way.
See [our custom h5 io code] for an example.

## Advanced use and how-to-contribute

### How do `scanpy` and `anndata` handle sparse matrices?

While there is some {class}`scipy.sparse.csr_matrix` and {class}`scipy.sparse.csc_matrix` support for `dask`, it is not comprehensive and missing key functions like summation, mean etc.
We have implemented custom functionality, much of which lives in {mod}`fast_array_utils`, although we have also had to implement custom algorithms like `pca` for sparse-in-dask.
In the future, an [`array-api`] compatible sparse matrix like [`finch`] would help us considerably as `dask` supports the [`array-api`].

Therefore, if you run into a puzzling error after trying to run a function like {func}`numpy.sum` (or similar) on a sparse-in-dask array, consider checking {mod}`fast_array_utils`.
If you need to implement the function yourself, see the next point.

### Custom block-wise array operations

Sometimes you may want to do an operation on a an array that is implemented nowhere.
Generally, we have found {func}`dask.array.map_blocks` to be versatile enough that most operations can be expressed on it. Click on the link to see `dask`'s own tutorial about the function.

Take this (simplified) example of calculating a gram matrix from {func}`scanpy.pp.pca` for sparse-in-dask:

```python
def gram_block(x_part):
gram_matrix = x_part.T @ x_part
return gram_matrix[None, ...]

gram_matrix_dask = da.map_blocks(
gram_block,
x,
new_axis=(1,),
chunks=((1,) * x.blocks.size, (x.shape[1],), (x.shape[1],)),
meta=np.array([], dtype=x.dtype),
dtype=x.dtype,
).sum(axis=0)
```

This algorithm goes through every `chunk_size` number of rows and calculates the gram matrix for those rows producing a collection of `(n_vars,n_vars)` size matrix.
These are the summed together to produce a single `(n_vars,n_vars)` matrix, which is the gram matrix.

Because `dask` does not implement matrix multiplication for sparse-in-dask, we do it ourselves.
We use `map_blocks` over a CSR sparse-in-dask array where the chunking looks something like `(chunk_size, n_vars)`.
When we compute the individual block's gram matrix, we add an axis via `[None, ...]` so that we can sum over that axis i.e., the `da.map_blocks` call produces a `(n_obs // chunk_size, n_vars, n_vars)` sized-matrix which is summed over the first dimension.
However, to make this work, we need to be very specific about how `da.map_blocks` expects its result to look like, done via `new_axis` and `chunks`.
`new_axis` indicates that we are adding a single new axis at the front.
The `chunks` argument specifies that the output of `da.map_blocks` should have `x.blocks.size` number of `(1, n_vars, n_vars)` matrixes.
This `chunks` argument thus allows the inferral of the shape of the output.

While this example is a bit complicated it shows how you can go from a matrix of one shape and chunking to another by operating in a clean way over blocks.

## FAQ

### What is `persist` used for in RSC notebooks?

In the {doc}`multi-gpu showcase notebook for rapids-singlecell <rapids-singlecell:notebooks/06-multi_gpu_show>`, {meth}`dask.array.Array.persist` appears across the notebook.
This loads the entire dataset into memory while keeping the representation as a dask array.
Thus, lazy computation still works but only necessitates a single read into memory.
The catch is that you need to have enough memory to use `persist`, but if you do it greatly speeds up the computation.

### I'm out of memory, what now?

You can always reduce the number of workers you use, which will cause more memory to be allocated per worker.
Some algorithms may have limitations with loading all data onto a single node; see {issue}`dask/dask-ml#985` for an example.

### How do I choose chunk sizes?

Have a look at the {doc}`dask docs for chunking <dask:array-chunks>`, however the general rule of thumb there is to use larger chunks in memory than on disk.
In this sense, it is probably a good idea to use the largest chunk size in memory allowable by your memory limits (and the algorithms you use) in order to maximize any thread-level parallelization in algorithms to its fullest.
For sparse data, where the chunks in-memory do not map to those on disk, maxing out the memory available by choosing a large chunk size becomes more imperative.

[`dask` extension]: https://marketplace.visualstudio.com/items?itemName=joyceerhl.vscode-das
[our custom h5 io code]: https://github.com/scverse/anndata/blob/089ed929393a02200b389395f278b7c920e5bc4a/src/anndata/_io/specs/lazy_methods.py#L179-L205
[`array-api`]: https://data-apis.org/array-api/latest/index.html
[`finch`]: https://github.com/finch-tensor/finch-tensor-python
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ notebooks/tutorial_axes_anndata_mudata
notebooks/scverse_data_backed
notebooks/scverse_data_interoperability
notebooks/tutorial_concatenation_anndata_mudata

how-to-dask.md
references.md
```
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ registry = [
docs = [
"sphinx>=7",
"sphinx-book-theme>=1.1.0",
"sphinx-issues>=5.0.1",
"myst-nb>=1.1.0",
"sphinxcontrib-bibtex>=1.0.0",
"sphinx-autodoc-typehints",
Expand All @@ -45,6 +46,7 @@ docs = [

[tool.hatch.envs.default]
installer = "uv"
features = ["dev"]

[tool.hatch.envs.registry]
features = ["registry"]
Expand All @@ -60,6 +62,8 @@ extra-dependencies = [
]
[tool.hatch.envs.docs.scripts]
build = "sphinx-build -M html docs docs/_build {args}"
open = "python3 -m webbrowser -t docs/_build/html/index.html"
clean = "git clean -fdX -- {args:docs}"

[tool.hatch.build.targets.wheel]
bypass-selection = true # This is not a package
Expand Down