Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cubed/diagnostics/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def on_compute_start(self, event):
name=name,
op_name=node["op_name"],
projected_mem=primitive_op.projected_mem,
allowed_mem=primitive_op.allowed_mem,
reserved_mem=primitive_op.reserved_mem,
num_tasks=primitive_op.num_tasks,
)
Expand Down
98 changes: 98 additions & 0 deletions cubed/diagnostics/mem_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from dataclasses import asdict
from pathlib import Path
from typing import Optional

import matplotlib
import matplotlib.pyplot as plt
import pandas as pd

from cubed.runtime.pipeline import visit_nodes
from cubed.runtime.types import Callback

matplotlib.use("Agg")


class MemoryVisualizationCallback(Callback):
def __init__(self, format: Optional[str] = "svg") -> None:
self.format = format

def on_compute_start(self, event):
plan = []
for name, node in visit_nodes(event.dag):
primitive_op = node["primitive_op"]
plan.append(
dict(
name=name,
op_name=node["op_name"],
projected_mem=primitive_op.projected_mem,
allowed_mem=primitive_op.allowed_mem,
reserved_mem=primitive_op.reserved_mem,
num_tasks=primitive_op.num_tasks,
)
)

self.plan = plan
self.events = []

def on_task_end(self, event):
self.events.append(asdict(event))

def on_compute_end(self, event):
events_df = pd.DataFrame(self.events)
plan_df = pd.DataFrame(self.plan)
fig = generate_mem_usage(events_df, plan_df)

self.dst = Path(f"history/{event.compute_id}")
self.dst.mkdir(parents=True, exist_ok=True)
self.dst = self.dst / f"memory.{self.format}"

fig.savefig(self.dst)


def generate_mem_usage(events_df, plan_df):
# colours match those in https://cubed-dev.github.io/cubed/user-guide/memory.html

events_df = events_df.sort_values(by=["task_create_tstamp", "name"], ascending=True)
projected_mem_map = plan_df.set_index("name")["projected_mem"].to_dict()

tstamp = events_df["task_result_tstamp"].astype("timedelta64[s]")
events_df["time"] = (tstamp - tstamp.min()).astype(int)
events_df["actual usage"] = events_df["peak_measured_mem_end"] / 1_000_000
events_df["projected_mem"] = events_df.name.map(projected_mem_map) / 1_000_000

fig, ax = plt.subplots(figsize=(8, 6))

events_df.plot(
kind="area", y="actual usage", ax=ax, use_index=True, color="#9fc5e8"
)

allowed_mem = plan_df["allowed_mem"].max() / 1_000_000
ax.axhline(allowed_mem, label="allowed", color="#e06666", linestyle="--")

reserved_mem = plan_df["reserved_mem"].max() / 1_000_000
ax.axhline(
reserved_mem,
label="reserved",
color="#f6b26b",
linestyle="--",
)

peak_measured_mem = events_df["peak_measured_mem_end"].max() / 1_000_000
ax.axhline(peak_measured_mem, label="max actual usage", color="#6fa8dc")

events_df.plot(
kind="line",
y="projected_mem",
ax=ax,
use_index=True,
label="projected",
color="#93c47d",
linestyle="--",
)

ax.set_xlabel("Task number")
ax.set_ylim(top=allowed_mem + 100)
ax.set_ylabel("Task memory (MB)")
ax.legend()

return fig
95 changes: 35 additions & 60 deletions cubed/diagnostics/timeline.py
Original file line number Diff line number Diff line change
@@ -1,88 +1,63 @@
import time
from dataclasses import asdict
from pathlib import Path
from typing import Optional

import matplotlib.patches as mpatches
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pylab
import seaborn as sns

from cubed.runtime.types import Callback

sns.set_style("whitegrid")
pylab.switch_backend("Agg")
matplotlib.use("Agg")


class TimelineVisualizationCallback(Callback):
def __init__(self, format: Optional[str] = "svg") -> None:
self.format = format

def on_compute_start(self, event):
self.start_tstamp = time.time()
self.stats = []
self.events = []

def on_task_end(self, event):
self.stats.append(asdict(event))
self.events.append(asdict(event))

def on_compute_end(self, event):
self.end_tstamp = time.time()

stats_df = pd.DataFrame(self.stats)
stats_df = stats_df.sort_values(
by=["task_create_tstamp", "name"], ascending=True
)
total_calls = len(stats_df)
palette = sns.color_palette("deep", 6)

fig = pylab.figure(figsize=(10, 6))
ax = fig.add_subplot(1, 1, 1)

y = np.arange(total_calls)
point_size = 10

fields = [
("task create", stats_df.task_create_tstamp - self.start_tstamp),
("function start", stats_df.function_start_tstamp - self.start_tstamp),
("function end", stats_df.function_end_tstamp - self.start_tstamp),
("task result", stats_df.task_result_tstamp - self.start_tstamp),
]

patches = []
for f_i, (field_name, val) in enumerate(fields):
ax.scatter(
val, y, c=[palette[f_i]], edgecolor="none", s=point_size, alpha=0.8
)
patches.append(mpatches.Patch(color=palette[f_i], label=field_name))

ax.set_xlabel("Execution Time (sec)")
ax.set_ylabel("Function Call")

legend = pylab.legend(handles=patches, loc="upper right", frameon=True)
legend.get_frame().set_facecolor("#FFFFFF")

yplot_step = int(np.max([1, total_calls / 20]))
y_ticks = np.arange(total_calls // yplot_step + 2) * yplot_step
ax.set_yticks(y_ticks)
ax.set_ylim(-0.02 * total_calls, total_calls * 1.02)
for y in y_ticks:
ax.axhline(y, c="k", alpha=0.1, linewidth=1)

max_seconds = np.max(self.end_tstamp - self.start_tstamp) * 1.25
xplot_step = max(int(max_seconds / 8), 1)
x_ticks = np.arange(max_seconds // xplot_step + 2) * xplot_step
ax.set_xlim(0, max_seconds)

ax.set_xticks(x_ticks)
for x in x_ticks:
ax.axvline(x, c="k", alpha=0.2, linewidth=0.8)

ax.grid(False)
fig.tight_layout()
events_df = pd.DataFrame(self.events)
fig = generate_timeline(events_df)

self.dst = Path(f"history/{event.compute_id}")
self.dst.mkdir(parents=True, exist_ok=True)
self.dst = self.dst / f"timeline.{self.format}"

fig.savefig(self.dst)


def generate_timeline(events_df):
events_df = events_df.sort_values(by=["task_create_tstamp", "name"], ascending=True)
start_tstamp = events_df["task_create_tstamp"].min()
total_calls = len(events_df)

fig, ax = plt.subplots(figsize=(10, 8))

y = np.arange(total_calls)
point_size = 7

fields = [
("task create", events_df.task_create_tstamp - start_tstamp),
("function start", events_df.function_start_tstamp - start_tstamp),
("function end", events_df.function_end_tstamp - start_tstamp),
("task result", events_df.task_result_tstamp - start_tstamp),
]

for f_i, (field_name, val) in enumerate(fields):
ax.scatter(val, y, label=field_name, edgecolor="none", s=point_size, alpha=0.8)

ax.set_xlabel("Execution time (sec)")
ax.set_ylabel("Task number")

ax.legend()

return fig
5 changes: 4 additions & 1 deletion cubed/tests/test_executor_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import cubed.random
from cubed.diagnostics import ProgressBar
from cubed.diagnostics.history import HistoryCallback
from cubed.diagnostics.mem_usage import MemoryVisualizationCallback
from cubed.diagnostics.mem_warn import MemoryWarningCallback
from cubed.diagnostics.rich import RichProgressBar
from cubed.diagnostics.timeline import TimelineVisualizationCallback
Expand Down Expand Up @@ -101,13 +102,15 @@ def test_callbacks(spec, executor):
progress = TqdmProgressBar()
hist = HistoryCallback()
timeline_viz = TimelineVisualizationCallback()
memory_viz = MemoryVisualizationCallback()

a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec)
c = xp.add(a, b)
assert_array_equal(
c.compute(
executor=executor, callbacks=[task_counter, progress, hist, timeline_viz]
executor=executor,
callbacks=[task_counter, progress, hist, timeline_viz, memory_viz],
),
np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]]),
)
Expand Down
2 changes: 0 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-psutil.*]
ignore_missing_imports = True
[mypy-pylab.*]
ignore_missing_imports = True
[mypy-pytest.*]
ignore_missing_imports = True
[mypy-ray.*]
Expand Down
Loading