Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion cubed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,24 @@
from .array.pad import pad
from .core.array import compute, measure_reserved_mem, plan, visualize
from .core.gufunc import apply_gufunc
from .core.ops import from_array, from_zarr, map_blocks, rechunk, store, to_zarr
from .core.ops import (
from_array,
from_manifest,
from_zarr,
map_blocks,
rechunk,
store,
to_zarr,
)
from .runtime.types import Callback, TaskEndEvent
from .spec import Spec

try:
from .virtualizarr import from_virtual_array
except ImportError:
# VirtualiZarr integration is optional
from_virtual_array = None

__all__ = [
"__version__",
"Callback",
Expand All @@ -33,6 +47,8 @@
"compute",
"config",
"from_array",
"from_manifest",
"from_virtual_array",
"from_zarr",
"map_blocks",
"map_overlap",
Expand Down
191 changes: 183 additions & 8 deletions cubed/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,177 @@ def from_zarr(store, path=None, spec=None) -> "Array":
return Array(name, target, spec, plan)


def from_manifest(
load_chunk: Callable[[Tuple[int, ...]], np.ndarray],
shape: T_Shape,
dtype: np.dtype,
chunks: T_RegularChunks,
spec=None,
fill_value=None,
) -> "Array":
"""Create a Cubed array from a chunk-loading callback.

The callback is invoked by workers during execution to load each chunk
on-demand. This enables zero-copy workflows where data is fetched directly
from source storage (S3, HTTP, etc.) without client-side materialization.

Parameters
----------
load_chunk : callable
Function with signature: ``load_chunk(chunk_key: tuple[int, ...]) -> np.ndarray``
Called by workers to load each chunk. Must be serializable (picklable).
The chunk_key is a tuple of integers (block_id) representing the chunk
coordinates, e.g., (0, 1) for the chunk at row 0, column 1. This maps
directly to Zarr chunk keys like "0.1".
shape : tuple of int
Array shape
dtype : np.dtype
Array data type
chunks : tuple of int
Chunk size for each dimension (regular chunking)
spec : cubed.Spec, optional
The spec to use for the computation.
fill_value : scalar, optional
Value to use for missing chunks. If None and a chunk is missing,
load_chunk should handle the error or return an appropriate fill.

Returns
-------
cubed.Array
Cubed array that lazily loads chunks during computation

Examples
--------
>>> import cubed
>>> import numpy as np
>>>
>>> def load_chunk(chunk_key):
... # Fetch chunk from storage using chunk_key
... # For example, from S3, HTTP, or local files
... import fsspec
... path = f"s3://bucket/chunk_{chunk_key}.npy"
... with fsspec.open(path, 'rb') as f:
... return np.load(f)
>>>
>>> spec = cubed.Spec(work_dir="tmp", allowed_mem="2GB")
>>> arr = cubed.from_manifest(
... load_chunk,
... shape=(1000, 1000),
... dtype=np.float32,
... chunks=(100, 100),
... spec=spec
... )
>>>
>>> result = arr.mean().compute() # Workers call load_chunk() for each chunk
>>>
>>> # Example: Integration with VirtualiZarr ManifestArray
>>> # import virtualizarr as vz
>>> # vds = vz.open_virtual_dataset("s3://bucket/data.zarr")
>>> # marr = vds['temperature'].data # ManifestArray
>>> #
>>> # def load_from_manifest(chunk_key):
>>> # key_str = '.'.join(map(str, chunk_key))
>>> # entry = marr.manifest.dict().get(key_str)
>>> # if entry is None or entry['path'] == '':
>>> # return np.full(marr.chunks, marr.metadata.fill_value, dtype=marr.dtype)
>>> # import fsspec
>>> # with fsspec.open(entry['path'], 'rb') as f:
>>> # f.seek(entry['offset'])
>>> # data = f.read(entry['length'])
>>> # # Decode chunk (simplified - actual implementation needs codec chain)
>>> # return np.frombuffer(data, dtype=marr.dtype).reshape(marr.chunks)
>>> #
>>> # arr = cubed.from_manifest(
>>> # load_from_manifest,
>>> # shape=marr.shape,
>>> # dtype=marr.dtype,
>>> # chunks=marr.chunks,
>>> # spec=spec,
>>> # fill_value=marr.metadata.fill_value
>>> # )

Notes
-----
The load_chunk callback must be serializable (picklable) since it will be
sent to workers. Avoid closures over large objects or non-serializable state.

**Block ID Mapping**: The block_id passed to load_chunk is a tuple of integers
representing chunk coordinates in N-dimensional space. For a 2D array with
chunks=(100, 100), block_id=(2, 3) refers to the chunk at array slice
[200:300, 300:400]. In Zarr terminology, this corresponds to chunk key "2.3".

**Serialization**: When creating the callback, ensure all referenced objects
(file paths, manifests, credentials) are picklable. Consider using function
parameters rather than closure variables for better serialization.
"""
# Validate inputs
if not callable(load_chunk):
raise TypeError("load_chunk must be callable")

# Validate and normalize dtype
if not isinstance(dtype, np.dtype):
dtype = np.dtype(dtype)

# Normalize chunks to regular chunking
outchunks = normalize_chunks(chunks, shape, dtype=dtype)
expected_chunk_shape = tuple(c[0] for c in outchunks)

# Create a wrapper function that map_blocks will call
# It ignores the input block and uses block_id to call load_chunk
def _load_manifest_chunk(
block,
block_id=None,
load_chunk_func=None,
expected_dtype=None,
expected_shape=None,
fill_val=None,
):
"""Load a chunk from the manifest using the chunk coordinates."""
if block_id is None:
raise ValueError("block_id is required for manifest loading")

try:
# Call the user's load_chunk callback with the chunk coordinates
chunk_data = load_chunk_func(block_id)
except (KeyError, FileNotFoundError, IndexError):
# Handle missing chunks with fill_value if provided
if fill_val is not None:
chunk_data = nxp.full(block.shape, fill_val, dtype=expected_dtype)
else:
raise

# Validate dtype matches
if chunk_data.dtype != expected_dtype:
raise TypeError(
f"load_chunk returned dtype {chunk_data.dtype} but expected {expected_dtype} "
f"for chunk at {block_id}"
)

# Convert to backend array
return numpy_array_to_backend_array(chunk_data)

# Use map_blocks with a virtual empty array to create the structure
# The virtual array serves as a template for the chunk structure
from cubed.array_api.creation_functions import empty_virtual_array

template = empty_virtual_array(
shape, dtype=dtype, chunks=outchunks, spec=spec, hidden=True
)

# Map over the template, calling load_chunk for each block
return map_blocks(
_load_manifest_chunk,
template,
dtype=dtype,
chunks=outchunks,
spec=spec,
load_chunk_func=load_chunk,
expected_dtype=dtype,
expected_shape=expected_chunk_shape,
fill_val=fill_value,
)


def store(
sources: Union["Array", Sequence["Array"]],
targets,
Expand Down Expand Up @@ -1344,9 +1515,11 @@ def partial_reduce(
combine_sizes = combine_sizes or {}
combine_sizes = {k: combine_sizes.get(k, 1) for k in axis}
chunks = tuple(
(combine_sizes[i],) * math.ceil(len(c) / split_every[i])
if i in split_every
else c
(
(combine_sizes[i],) * math.ceil(len(c) / split_every[i])
if i in split_every
else c
)
for (i, c) in enumerate(x.chunks)
)
shape = tuple(map(sum, chunks))
Expand Down Expand Up @@ -1531,11 +1704,13 @@ def unify_chunks(*args: "Array", **kwargs):
arrays.append(a)
else:
chunks = tuple(
chunkss[j]
if a.shape[n] > 1
else (a.shape[n],)
if not np.isnan(sum(chunkss[j]))
else None
(
chunkss[j]
if a.shape[n] > 1
else (a.shape[n],)
if not np.isnan(sum(chunkss[j]))
else None
)
for n, j in enumerate(i)
)
if chunks != a.chunks and all(a.chunks):
Expand Down
Loading
Loading