From 4fb71b5fba04ac1465d4e622ac976067ae838a12 Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:10:04 -0600 Subject: [PATCH 01/18] Refactor memory check method to find operations exceeding memory limits --- cubed/core/plan.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 10de200f6..eef1ca128 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -271,9 +271,15 @@ def _compile_blockwise(self, dag, compile_function: Decorator) -> nx.MultiDiGrap return dag - def _check_projected_mem(self, dag) -> None: - op_name = None - max_projected_mem_op = None + def _find_ops_exceeding_memory( + self, dag + ) -> List[Tuple[str, "PrimitiveOperation"]]: + """Find all operations where projected memory exceeds allowed memory. + + Returns a list of (op_name, primitive_op) tuples for operations that + exceed memory limits, sorted by projected memory (highest first). + """ + ops_exceeding = [] for n, d in dag.nodes(data=True): if "primitive_op" in d: op = d["primitive_op"] From 8beb9eb28a1259fe007bf9b392a73bc6bd8d0885 Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:10:13 -0600 Subject: [PATCH 02/18] Refactor memory check to identify operations exceeding allowed memory limits --- cubed/core/plan.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index eef1ca128..b928a8fe3 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -283,19 +283,11 @@ def _find_ops_exceeding_memory( for n, d in dag.nodes(data=True): if "primitive_op" in d: op = d["primitive_op"] - if ( - max_projected_mem_op is None - or op.projected_mem > max_projected_mem_op.projected_mem - ): - op_name = n - max_projected_mem_op = op - if max_projected_mem_op is not None: - op = max_projected_mem_op - if op.projected_mem > op.allowed_mem: - raise ValueError( - f"Projected blockwise memory ({memory_repr(op.projected_mem)}) exceeds allowed_mem ({memory_repr(op.allowed_mem)}), " - f"including reserved_mem ({memory_repr(op.reserved_mem)}) for {op_name}" - ) + if op.projected_mem > op.allowed_mem: + ops_exceeding.append((n, op)) + # Sort by projected_mem descending so worst offenders are first + ops_exceeding.sort(key=lambda x: x[1].projected_mem, reverse=True) + return ops_exceeding @lru_cache # noqa: B019 def _finalize( From f28a23bac9d40ecfe6deee48ae6e216aa061bb67 Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:10:22 -0600 Subject: [PATCH 03/18] Add operation memory check to finalize plan creation --- cubed/core/plan.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index b928a8fe3..4a27115fa 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -302,8 +302,10 @@ def _finalize( if callable(compile_function): dag = self._compile_blockwise(dag, compile_function) dag = self._create_lazy_zarr_arrays(dag) - self._check_projected_mem(dag) - return FinalizedPlan(nx.freeze(dag), self.array_names, optimize_graph) + ops_exceeding_memory = self._find_ops_exceeding_memory(dag) + return FinalizedPlan( + nx.freeze(dag), self.array_names, optimize_graph, ops_exceeding_memory + ) class ArrayRole(Enum): From 40ea158a8c1068b84268f779ad3755875de97c1b Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:10:32 -0600 Subject: [PATCH 04/18] Add optional parameter for operations exceeding memory in FinalizedPlan constructor --- cubed/core/plan.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 4a27115fa..3e81080ff 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -324,7 +324,7 @@ class FinalizedPlan: 4. freezing the final DAG so it can't be changed """ - def __init__(self, dag, array_names, optimized): + def __init__(self, dag, array_names, optimized, ops_exceeding_memory=None): self.dag = dag self.array_names = array_names self.optimized = optimized From ade9dbbe73d501ea8378a5dc245e2e7ca740ea30 Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:10:47 -0600 Subject: [PATCH 05/18] Add handling for operations exceeding memory in FinalizedPlan --- cubed/core/plan.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 3e81080ff..0f5961955 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -328,6 +328,7 @@ def __init__(self, dag, array_names, optimized, ops_exceeding_memory=None): self.dag = dag self.array_names = array_names self.optimized = optimized + self._ops_exceeding_memory = ops_exceeding_memory or [] self._calculate_stats() self.input_array_names = [] From d18ab1422a5b62a699771768e1fd983ffec4426b Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:11:03 -0600 Subject: [PATCH 06/18] Add memory validation and reporting in FinalizedPlan --- cubed/core/plan.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 0f5961955..a42c5859a 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -541,6 +541,34 @@ def total_nchunks(self) -> int: """The total number of chunks for all materialized arrays in this plan.""" return self._total_nchunks + @property + def exceeds_memory(self) -> bool: + """True if any operation in this plan exceeds the allowed memory.""" + return len(self._ops_exceeding_memory) > 0 + + @property + def ops_exceeding_memory(self) -> List[Tuple[str, "PrimitiveOperation"]]: + """List of (op_name, primitive_op) tuples for operations exceeding memory. + + Sorted by projected memory (highest first). + """ + return self._ops_exceeding_memory + + def validate(self) -> None: + """Validate that this plan can be executed. + + Raises + ------ + ValueError + If any operation's projected memory exceeds the allowed memory. + """ + if self._ops_exceeding_memory: + op_name, op = self._ops_exceeding_memory[0] # Report worst offender + raise ValueError( + f"Projected blockwise memory ({memory_repr(op.projected_mem)}) exceeds allowed_mem ({memory_repr(op.allowed_mem)}), " + f"including reserved_mem ({memory_repr(op.reserved_mem)}) for {op_name}" + ) + def execute( self, executor=None, From 916b1c14217386aa680310a562fa20cce58b0627 Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:11:10 -0600 Subject: [PATCH 07/18] Add validation call in FinalizedPlan constructor --- cubed/core/plan.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index a42c5859a..f051fc36e 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -577,6 +577,8 @@ def execute( spec=None, **kwargs, ): + self.validate() + dag = self.dag if resume: From 900c8d9334308410116d440aeb512c598f7f8e2e Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:11:22 -0600 Subject: [PATCH 08/18] Add warning for operations exceeding memory in FinalizedPlan visualization --- cubed/core/plan.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index f051fc36e..c1d42d081 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -611,6 +611,15 @@ def visualize( rankdir="TB", show_hidden=False, ): + if self._ops_exceeding_memory: + op_names = [name for name, _ in self._ops_exceeding_memory] + warnings.warn( + f"Plan has {len(self._ops_exceeding_memory)} operation(s) that exceed allowed memory: {op_names}. " + "These are shown in red in the visualization.", + stacklevel=2, + ) + ops_exceeding_names = {name for name, _ in self._ops_exceeding_memory} + dag = self.dag.copy() # make a copy since we mutate the DAG below # remove edges from create-arrays output node to avoid cluttering the diagram From f6dc5ab28921ce1428f3bd8a011e172977d60d58 Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:11:32 -0600 Subject: [PATCH 09/18] Add warnings import for enhanced memory management in FinalizedPlan --- cubed/core/plan.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index c1d42d081..46e8acbca 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -4,9 +4,10 @@ import shutil import tempfile import uuid +import warnings from datetime import datetime from enum import Enum -from functools import lru_cache +from typing import Any, Callable, Dict, List, Optional, Tuple from typing import Any, Callable, Dict, Optional import networkx as nx From ac2c403eaa3d0fae585c9020054ac5ab48773256 Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:11:44 -0600 Subject: [PATCH 10/18] Refactor imports in plan.py for improved organization --- cubed/core/plan.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 46e8acbca..86fdb5d40 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -7,8 +7,8 @@ import warnings from datetime import datetime from enum import Enum +from functools import lru_cache from typing import Any, Callable, Dict, List, Optional, Tuple -from typing import Any, Callable, Dict, Optional import networkx as nx From f15bea8177203c8a9541120959e2cab9900edc4d Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:12:11 -0600 Subject: [PATCH 11/18] Add HTML warning for memory exceeded in FinalizedPlan visualization --- cubed/core/plan.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 86fdb5d40..139ba8a44 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -631,6 +631,29 @@ def visualize( list(n for n, d in dag.nodes(data=True) if d.get("hidden", False)) ) + # Build the graph label - use HTML-like label for mixed colors if memory exceeded + stats_text = ( + f"num tasks: {self.num_tasks}
" + f"max projected memory: {memory_repr(self.max_projected_mem)}
" + f"total nbytes written: {memory_repr(self.total_nbytes_written)}
" + f"optimized: {self.optimized}
" + ) + + if self._ops_exceeding_memory: + # Build warning text in red + warning_lines = ["
⚠ MEMORY EXCEEDED ⚠
"] + for op_name, op in self._ops_exceeding_memory: + warning_lines.append( + f"{op_name}: requires {memory_repr(op.projected_mem)}, " + f"allowed {memory_repr(op.allowed_mem)}
" + ) + warning_text = "".join(warning_lines) + # HTML-like label with mixed colors + label = f"<{stats_text}{warning_text}>" + else: + # Simple HTML label (no warning) + label = f"<{stats_text}>" + dag.graph["graph"] = { "rankdir": rankdir, "label": ( From eadb8dced064d5ac0dbb966e5abdfa5151268f78 Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:12:24 -0600 Subject: [PATCH 12/18] Refactor FinalizedPlan graph label to use a predefined variable --- cubed/core/plan.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 139ba8a44..795630472 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -656,13 +656,7 @@ def visualize( dag.graph["graph"] = { "rankdir": rankdir, - "label": ( - # note that \l is used to left-justify each line (see https://www.graphviz.org/docs/attrs/nojustify/) - rf"num tasks: {self.num_tasks}\l" - rf"max projected memory: {memory_repr(self.max_projected_mem)}\l" - rf"total nbytes written: {memory_repr(self.total_nbytes_written)}\l" - rf"optimized: {self.optimized}\l" - ), + "label": label, "labelloc": "bottom", "labeljust": "left", "fontsize": "10", From bd9ca4f908378253a5ad964b19565c900fd71817 Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:12:32 -0600 Subject: [PATCH 13/18] Add missing line break for improved readability in FinalizedPlan class --- cubed/core/plan.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 795630472..e6d0eb175 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -661,6 +661,7 @@ def visualize( "labeljust": "left", "fontsize": "10", } + dag.graph["node"] = {"fontname": "helvetica", "shape": "box", "fontsize": "10"} # do an initial pass to extract array variable names from stack summaries From 60318558361b9988e0ee4af256aab92770202bbf Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:12:42 -0600 Subject: [PATCH 14/18] Highlight operations exceeding memory in red within FinalizedPlan visualization --- cubed/core/plan.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index e6d0eb175..138ebada7 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -686,7 +686,11 @@ def visualize( func_name = d["func_name"] label = f"{n}\n{func_name}".strip() op_name = d["op_name"] - if op_name == "blockwise": + if n in ops_exceeding_names: + # operation exceeds memory - show in red + d["style"] = '"rounded,filled"' + d["fillcolor"] = "#ff6b6b" + elif op_name == "blockwise": d["style"] = '"rounded,filled"' d["fillcolor"] = "#dcbeff" elif op_name == "rechunk": From 99c754c1041a1921d94b1f60488572af1e99d829 Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:13:08 -0600 Subject: [PATCH 15/18] Add test for plan exceeding memory in default spec --- cubed/tests/test_core.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cubed/tests/test_core.py b/cubed/tests/test_core.py index 29baa60d9..a248f0f1b 100644 --- a/cubed/tests/test_core.py +++ b/cubed/tests/test_core.py @@ -493,6 +493,11 @@ def test_default_spec_allowed_mem_exceeded(): # default spec fails for large computations a = xp.ones((20000, 10000), chunks=(10000, 10000)) b = xp.negative(a) + # plan() succeeds but marks plan as exceeding memory + plan = b.plan() + assert plan.exceeds_memory + assert len(plan.ops_exceeding_memory) == 1 + # compute() raises the error with pytest.raises( ValueError, match=r"Projected blockwise memory \(.+\) exceeds allowed_mem \(.+\), including reserved_mem \(.+\) for op-\d+", From a57491c91eee9641518438bf7d43e3bc4ec3fab7 Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 11:13:17 -0600 Subject: [PATCH 16/18] Add visualization test for memory exceeded warning in default spec --- cubed/tests/test_core.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/cubed/tests/test_core.py b/cubed/tests/test_core.py index a248f0f1b..e5599010f 100644 --- a/cubed/tests/test_core.py +++ b/cubed/tests/test_core.py @@ -502,7 +502,20 @@ def test_default_spec_allowed_mem_exceeded(): ValueError, match=r"Projected blockwise memory \(.+\) exceeds allowed_mem \(.+\), including reserved_mem \(.+\) for op-\d+", ): - b.plan() + b.compute() + + +def test_default_spec_allowed_mem_exceeded_visualize(tmp_path): + # visualize works but warns when memory is exceeded + import warnings + + a = xp.ones((20000, 10000), chunks=(10000, 10000)) + b = xp.negative(a) + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + b.visualize(filename=str(tmp_path / "cubed")) + assert len(w) == 1 + assert "exceed allowed memory" in str(w[0].message) def test_default_spec_config_override(): From bc76787d0f9232809537cce8acdecaac1f627087 Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 13:30:27 -0600 Subject: [PATCH 17/18] lint --- cubed/core/plan.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 138ebada7..257d03c83 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -272,9 +272,7 @@ def _compile_blockwise(self, dag, compile_function: Decorator) -> nx.MultiDiGrap return dag - def _find_ops_exceeding_memory( - self, dag - ) -> List[Tuple[str, "PrimitiveOperation"]]: + def _find_ops_exceeding_memory(self, dag) -> List[Tuple[str, "PrimitiveOperation"]]: """Find all operations where projected memory exceeds allowed memory. Returns a list of (op_name, primitive_op) tuples for operations that From a778403116393dd0ed7f61b6f43bab7ebff0aec0 Mon Sep 17 00:00:00 2001 From: neilSchroeder Date: Mon, 1 Dec 2025 13:40:05 -0600 Subject: [PATCH 18/18] Update memory exceeded warning text format in FinalizedPlan class --- cubed/core/plan.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 257d03c83..b1eb17526 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -639,7 +639,9 @@ def visualize( if self._ops_exceeding_memory: # Build warning text in red - warning_lines = ["
⚠ MEMORY EXCEEDED ⚠
"] + warning_lines = [ + "
!!! MEMORY EXCEEDED !!!
" + ] for op_name, op in self._ops_exceeding_memory: warning_lines.append( f"{op_name}: requires {memory_repr(op.projected_mem)}, "