diff --git a/cubed/diagnostics/history.py b/cubed/diagnostics/history.py index d7da50903..64475ce6e 100644 --- a/cubed/diagnostics/history.py +++ b/cubed/diagnostics/history.py @@ -17,6 +17,7 @@ def on_compute_start(self, event): name=name, op_name=node["op_name"], projected_mem=primitive_op.projected_mem, + allowed_mem=primitive_op.allowed_mem, reserved_mem=primitive_op.reserved_mem, num_tasks=primitive_op.num_tasks, ) diff --git a/cubed/diagnostics/mem_usage.py b/cubed/diagnostics/mem_usage.py new file mode 100644 index 000000000..a9ebf32d8 --- /dev/null +++ b/cubed/diagnostics/mem_usage.py @@ -0,0 +1,98 @@ +from dataclasses import asdict +from pathlib import Path +from typing import Optional + +import matplotlib +import matplotlib.pyplot as plt +import pandas as pd + +from cubed.runtime.pipeline import visit_nodes +from cubed.runtime.types import Callback + +matplotlib.use("Agg") + + +class MemoryVisualizationCallback(Callback): + def __init__(self, format: Optional[str] = "svg") -> None: + self.format = format + + def on_compute_start(self, event): + plan = [] + for name, node in visit_nodes(event.dag): + primitive_op = node["primitive_op"] + plan.append( + dict( + name=name, + op_name=node["op_name"], + projected_mem=primitive_op.projected_mem, + allowed_mem=primitive_op.allowed_mem, + reserved_mem=primitive_op.reserved_mem, + num_tasks=primitive_op.num_tasks, + ) + ) + + self.plan = plan + self.events = [] + + def on_task_end(self, event): + self.events.append(asdict(event)) + + def on_compute_end(self, event): + events_df = pd.DataFrame(self.events) + plan_df = pd.DataFrame(self.plan) + fig = generate_mem_usage(events_df, plan_df) + + self.dst = Path(f"history/{event.compute_id}") + self.dst.mkdir(parents=True, exist_ok=True) + self.dst = self.dst / f"memory.{self.format}" + + fig.savefig(self.dst) + + +def generate_mem_usage(events_df, plan_df): + # colours match those in https://cubed-dev.github.io/cubed/user-guide/memory.html + + events_df = events_df.sort_values(by=["task_create_tstamp", "name"], ascending=True) + projected_mem_map = plan_df.set_index("name")["projected_mem"].to_dict() + + tstamp = events_df["task_result_tstamp"].astype("timedelta64[s]") + events_df["time"] = (tstamp - tstamp.min()).astype(int) + events_df["actual usage"] = events_df["peak_measured_mem_end"] / 1_000_000 + events_df["projected_mem"] = events_df.name.map(projected_mem_map) / 1_000_000 + + fig, ax = plt.subplots(figsize=(8, 6)) + + events_df.plot( + kind="area", y="actual usage", ax=ax, use_index=True, color="#9fc5e8" + ) + + allowed_mem = plan_df["allowed_mem"].max() / 1_000_000 + ax.axhline(allowed_mem, label="allowed", color="#e06666", linestyle="--") + + reserved_mem = plan_df["reserved_mem"].max() / 1_000_000 + ax.axhline( + reserved_mem, + label="reserved", + color="#f6b26b", + linestyle="--", + ) + + peak_measured_mem = events_df["peak_measured_mem_end"].max() / 1_000_000 + ax.axhline(peak_measured_mem, label="max actual usage", color="#6fa8dc") + + events_df.plot( + kind="line", + y="projected_mem", + ax=ax, + use_index=True, + label="projected", + color="#93c47d", + linestyle="--", + ) + + ax.set_xlabel("Task number") + ax.set_ylim(top=allowed_mem + 100) + ax.set_ylabel("Task memory (MB)") + ax.legend() + + return fig diff --git a/cubed/diagnostics/timeline.py b/cubed/diagnostics/timeline.py index 64de75518..d4af43e67 100644 --- a/cubed/diagnostics/timeline.py +++ b/cubed/diagnostics/timeline.py @@ -1,18 +1,17 @@ -import time from dataclasses import asdict from pathlib import Path from typing import Optional -import matplotlib.patches as mpatches +import matplotlib +import matplotlib.pyplot as plt import numpy as np import pandas as pd -import pylab import seaborn as sns from cubed.runtime.types import Callback sns.set_style("whitegrid") -pylab.switch_backend("Agg") +matplotlib.use("Agg") class TimelineVisualizationCallback(Callback): @@ -20,69 +19,45 @@ def __init__(self, format: Optional[str] = "svg") -> None: self.format = format def on_compute_start(self, event): - self.start_tstamp = time.time() - self.stats = [] + self.events = [] def on_task_end(self, event): - self.stats.append(asdict(event)) + self.events.append(asdict(event)) def on_compute_end(self, event): - self.end_tstamp = time.time() - - stats_df = pd.DataFrame(self.stats) - stats_df = stats_df.sort_values( - by=["task_create_tstamp", "name"], ascending=True - ) - total_calls = len(stats_df) - palette = sns.color_palette("deep", 6) - - fig = pylab.figure(figsize=(10, 6)) - ax = fig.add_subplot(1, 1, 1) - - y = np.arange(total_calls) - point_size = 10 - - fields = [ - ("task create", stats_df.task_create_tstamp - self.start_tstamp), - ("function start", stats_df.function_start_tstamp - self.start_tstamp), - ("function end", stats_df.function_end_tstamp - self.start_tstamp), - ("task result", stats_df.task_result_tstamp - self.start_tstamp), - ] - - patches = [] - for f_i, (field_name, val) in enumerate(fields): - ax.scatter( - val, y, c=[palette[f_i]], edgecolor="none", s=point_size, alpha=0.8 - ) - patches.append(mpatches.Patch(color=palette[f_i], label=field_name)) - - ax.set_xlabel("Execution Time (sec)") - ax.set_ylabel("Function Call") - - legend = pylab.legend(handles=patches, loc="upper right", frameon=True) - legend.get_frame().set_facecolor("#FFFFFF") - - yplot_step = int(np.max([1, total_calls / 20])) - y_ticks = np.arange(total_calls // yplot_step + 2) * yplot_step - ax.set_yticks(y_ticks) - ax.set_ylim(-0.02 * total_calls, total_calls * 1.02) - for y in y_ticks: - ax.axhline(y, c="k", alpha=0.1, linewidth=1) - - max_seconds = np.max(self.end_tstamp - self.start_tstamp) * 1.25 - xplot_step = max(int(max_seconds / 8), 1) - x_ticks = np.arange(max_seconds // xplot_step + 2) * xplot_step - ax.set_xlim(0, max_seconds) - - ax.set_xticks(x_ticks) - for x in x_ticks: - ax.axvline(x, c="k", alpha=0.2, linewidth=0.8) - - ax.grid(False) - fig.tight_layout() + events_df = pd.DataFrame(self.events) + fig = generate_timeline(events_df) self.dst = Path(f"history/{event.compute_id}") self.dst.mkdir(parents=True, exist_ok=True) self.dst = self.dst / f"timeline.{self.format}" fig.savefig(self.dst) + + +def generate_timeline(events_df): + events_df = events_df.sort_values(by=["task_create_tstamp", "name"], ascending=True) + start_tstamp = events_df["task_create_tstamp"].min() + total_calls = len(events_df) + + fig, ax = plt.subplots(figsize=(10, 8)) + + y = np.arange(total_calls) + point_size = 7 + + fields = [ + ("task create", events_df.task_create_tstamp - start_tstamp), + ("function start", events_df.function_start_tstamp - start_tstamp), + ("function end", events_df.function_end_tstamp - start_tstamp), + ("task result", events_df.task_result_tstamp - start_tstamp), + ] + + for f_i, (field_name, val) in enumerate(fields): + ax.scatter(val, y, label=field_name, edgecolor="none", s=point_size, alpha=0.8) + + ax.set_xlabel("Execution time (sec)") + ax.set_ylabel("Task number") + + ax.legend() + + return fig diff --git a/cubed/tests/test_executor_features.py b/cubed/tests/test_executor_features.py index 25e695f3c..0dddb54ff 100644 --- a/cubed/tests/test_executor_features.py +++ b/cubed/tests/test_executor_features.py @@ -14,6 +14,7 @@ import cubed.random from cubed.diagnostics import ProgressBar from cubed.diagnostics.history import HistoryCallback +from cubed.diagnostics.mem_usage import MemoryVisualizationCallback from cubed.diagnostics.mem_warn import MemoryWarningCallback from cubed.diagnostics.rich import RichProgressBar from cubed.diagnostics.timeline import TimelineVisualizationCallback @@ -101,13 +102,15 @@ def test_callbacks(spec, executor): progress = TqdmProgressBar() hist = HistoryCallback() timeline_viz = TimelineVisualizationCallback() + memory_viz = MemoryVisualizationCallback() a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec) b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec) c = xp.add(a, b) assert_array_equal( c.compute( - executor=executor, callbacks=[task_counter, progress, hist, timeline_viz] + executor=executor, + callbacks=[task_counter, progress, hist, timeline_viz, memory_viz], ), np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]]), ) diff --git a/setup.cfg b/setup.cfg index 7ca3c85b6..288a3a057 100644 --- a/setup.cfg +++ b/setup.cfg @@ -64,8 +64,6 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-psutil.*] ignore_missing_imports = True -[mypy-pylab.*] -ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True [mypy-ray.*]