Skip to content

Intermediate Zarr materialization vs keeping intermediates in RAM #870

@amalia-k510

Description

@amalia-k510

Hello!

I’m trying to understand how cubed’s memory/materialization model works during execution.

From reading documentation/blogs, my understanding is:

  • Cubed builds a lazy DAG plan.
  • On compute(), arrays are executed chunkwise and intermediate arrays are materialized by writing to Zarr, with fusion/optimization reducing some intermediate writes.

What I was wondering if there is a way for cubed to keep intermediates in RAM up to a memory budget and only write to Zarr when needed, in other words, not writing intermediate steps after every stop.

Some of my questions are:

  1. Is there currently any mode/runtime that keeps intermediates in memory (within allowed_mem or similar) and only writes to Zarr when necessary?
  2. If not, is “write intermediates to Zarr (except when fused)” the intended design, even for local runs?
  3. Also, I was curious but is there an ongoing plan/idea for a local executor, where tasks are grouped based on their projected memory usage, so that intermediate results can stay in RAM when possible instead of always being written to disk?

Here is the code I used to figure out how the memory works:

# cubed + sparse (numba)

import os
import tempfile
import tracemalloc

os.environ["SPARSE_AUTO_DENSIFY"] = "1"
os.environ["CUBED_BACKEND_ARRAY_API_MODULE"] = "sparse"

import cubed
import numpy as np
import sparse
from cubed import Spec
from cubed.primitive.memory import calculate_projected_mem, get_buffer_copies
from cubed.runtime.executors.local import SingleThreadedExecutor
from cubed.utils import memory_repr, peak_measured_mem

array = sparse.COO.from_numpy(np.eye(1024))

allowed_mem_bytes = 128 * 1024 * 1024 

tracemalloc.start()
tracemalloc.reset_peak()

with tempfile.TemporaryDirectory() as tmpdir:
    executor = SingleThreadedExecutor()
    spec = Spec(
        work_dir=tmpdir,
        allowed_mem="2GB",
        reserved_mem=0 
    )

    carray1 = cubed.from_array(array, chunks=(128, 128), spec=spec)
    carray2 = cubed.from_array(array, chunks=(128, 128), spec=spec)

    product = cubed.matmul(carray1, carray2)

    build_current, build_peak = tracemalloc.get_traced_memory()


    buffer_copies = get_buffer_copies(spec)
    projected_mem = calculate_projected_mem(
        reserved_mem=0,
        inputs=[array.nbytes, array.nbytes],
        operation=0,  
        output=array.nbytes,
        buffer_copies=buffer_copies,
    )

    tracemalloc.reset_peak()

    before_compute_current, _ = tracemalloc.get_traced_memory()
    result = product.compute(executor=executor)
    after_compute_current, compute_peak = tracemalloc.get_traced_memory()

    measured_mem = peak_measured_mem()

    print("\n=== Tracemalloc report ===")
    print(f"After graph build:  current={memory_repr(build_current)} peak={memory_repr(build_peak)}")
    print(f"Compute-only peak:  peak={memory_repr(compute_peak)}")
    print(f"Compute current diff:  delta={memory_repr(after_compute_current - before_compute_current)}")

    print("\n=== Cubed memory model ===")
    print(f"Projected task mem : {memory_repr(projected_mem)}")
    print(f"Cubed measured peak: {memory_repr(measured_mem)}")
    
    tracemalloc.stop()

@ilan-gold

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions