Skip to content

Commit 215f032

Browse files
add modality memory usage
1 parent c920842 commit 215f032

4 files changed

Lines changed: 112 additions & 24 deletions

File tree

src/main/python/systemds/scuro/drsearch/node_executor.py

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,17 @@ class RefCountResultCache:
5151
def __init__(self):
5252
self.cache = {}
5353
self.ref_count = {}
54+
self.memory_usage_per_node = {}
5455

5556
def get(self, node_id: str) -> Any:
5657
return self.cache[node_id]
5758

5859
def add_result(self, node_id: str, result: Any):
5960
self.cache[node_id] = result
61+
self.memory_usage_per_node[node_id] = result.calculate_memory_usage()
62+
print(
63+
f"Node {node_id} has a CPU memory usage of {self.memory_usage_per_node[node_id]/1024**3:.5f} GB"
64+
)
6065

6166
def inc_ref(self, node_id: str):
6267
if node_id not in self.ref_count:
@@ -68,6 +73,7 @@ def dec_ref(self, node_id: str):
6873
if self.ref_count[node_id] == 0:
6974
del self.cache[node_id]
7075
del self.ref_count[node_id]
76+
del self.memory_usage_per_node[node_id]
7177

7278
def clear(self, node_id: str):
7379
del self.cache[node_id]
@@ -76,6 +82,9 @@ def clear(self, node_id: str):
7682
def __len__(self):
7783
return len(self.cache)
7884

85+
def get_memory_total_memory_usage(self):
86+
return sum(self.memory_usage_per_node.values())
87+
7988

8089
def _execute_node_worker(
8190
node: RepresentationNode,
@@ -84,6 +93,9 @@ def _execute_node_worker(
8493
rep_cache: Optional[Dict[str, Any]],
8594
gpu_id: Optional[int],
8695
):
96+
proc = psutil.Process(os.getpid())
97+
before = proc.memory_info().rss # bytes
98+
8799
if gpu_id is not None:
88100
device = torch.device(f"cuda:{gpu_id}")
89101
torch.cuda.set_device(device)
@@ -92,9 +104,9 @@ def _execute_node_worker(
92104
result = None
93105
node_operation = node.operation(params=node.parameters)
94106
operation_name = node_operation.name
95-
print(
96-
f"Executing node {node.node_id} inputs: {input_mods[0].modality_id}, gpu: {gpu_id}, operation: {operation_name}"
97-
)
107+
# print(
108+
# f"Executing node {node.node_id} inputs: {input_mods[0].modality_id}, gpu: {gpu_id}, operation: {operation_name}"
109+
# )
98110
if gpu_id is not None and hasattr(node_operation, "gpu_id"):
99111
node_operation.gpu_id = gpu_id
100112

@@ -120,13 +132,14 @@ def _execute_node_worker(
120132
)
121133
else:
122134
result = input_mods[0].combine(input_mods[1:], fusion_op)
123-
peak_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
135+
delta_bytes = proc.memory_info().rss - before
124136
gpu_peak_bytes = (
125137
torch.cuda.max_memory_allocated(device) if gpu_id is not None else 0
126138
)
139+
# print(f"Node {node.node_id}: {operation_name} has a CPU peak memory usage of {delta_bytes/1024**3:.2f} GB, and a GPU peak memory usage of {gpu_peak_bytes/1024**3:.2f} GB")
127140
return {
128141
"result": result,
129-
"peak_bytes": peak_kb * 1024,
142+
"peak_bytes": delta_bytes,
130143
"gpu_peak_bytes": gpu_peak_bytes,
131144
"operation_name": operation_name,
132145
}
@@ -135,7 +148,10 @@ def _execute_node_worker(
135148
def _execute_task_worker(
136149
task_node_id: str, task: Any, data: Any, gpu_id: Optional[int]
137150
) -> Dict[str, Any]:
138-
print(f"Executing task {task_node_id} on GPU {gpu_id}")
151+
proc = psutil.Process(os.getpid())
152+
before = proc.memory_info().rss # bytes
153+
154+
# print(f"Executing task {task_node_id} on GPU {gpu_id}")
139155
if gpu_id is not None:
140156
device = torch.device(f"cuda:{gpu_id}")
141157
torch.cuda.set_device(device)
@@ -146,14 +162,15 @@ def _execute_task_worker(
146162
start = time.perf_counter()
147163
scores = task.run(data)
148164
end = time.perf_counter()
149-
peak_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
165+
delta_bytes = proc.memory_info().rss - before
150166
gpu_peak_bytes = (
151167
torch.cuda.max_memory_allocated(device) if gpu_id is not None else 0
152168
)
169+
# print(f"Task {task_node_id} has a CPU peak memory usage of {delta_bytes/1024**3:.2f} GB, and a GPU peak memory usage of {gpu_peak_bytes/1024**3:.2f} GB")
153170
return {
154171
"scores": scores,
155172
"task_time": end - start,
156-
"peak_bytes": peak_kb * 1024,
173+
"peak_bytes": delta_bytes,
157174
"gpu_peak_bytes": gpu_peak_bytes,
158175
}
159176

@@ -290,6 +307,8 @@ def submit_new_ready_nodes():
290307
"task",
291308
memory_usage_data,
292309
)
310+
self.scheduler.complete_node(node_id)
311+
293312
else:
294313
transformed_modality = result["result"]
295314
self._checkpoint_memory_usage(
@@ -299,9 +318,9 @@ def submit_new_ready_nodes():
299318
result["operation_name"],
300319
memory_usage_data,
301320
)
302-
before_bytes = self._result_cache_size_bytes()
321+
before_bytes = self.result_cache.get_memory_total_memory_usage()
303322
self._manage_result_cache(node_id, transformed_modality)
304-
after_bytes = self._result_cache_size_bytes()
323+
after_bytes = self.result_cache.get_memory_total_memory_usage()
305324
self.scheduler.update_cpu_memory_in_use(
306325
after_bytes - before_bytes
307326
)
@@ -327,6 +346,9 @@ def _checkpoint_memory_usage(
327346
"estimated_cpu_bytes": self.scheduler.node_resources[node_id][0],
328347
"estimated_gpu_bytes": self.scheduler.node_resources[node_id][1],
329348
}
349+
print(
350+
f"Node {node_id}: {operation_name} has a CPU peak memory usage of {peak_bytes/1024**3:.2f}/{self.scheduler.node_resources[node_id][0]/1024**3:.2f} GB estimated, and a GPU peak memory usage of {gpu_peak_bytes/1024**3:.2f}/{self.scheduler.node_resources[node_id][1]/1024**3:.2f} GB estimated "
351+
)
330352
self.memory_usage_checkpoint.checkpoint_if_due(data)
331353

332354
def _result_cache_size_bytes(self) -> int:
@@ -357,10 +379,10 @@ def _manage_result_cache(self, node_id: str, result: Any):
357379
self.result_cache.add_result(node_id, result)
358380

359381
if (
360-
node_id in self.result_cache.ref_count
361-
and self.result_cache.ref_count[node_id] == 0
382+
parent_node_id in self.result_cache.ref_count
383+
and self.result_cache.ref_count[parent_node_id] == 0
362384
):
363-
self.result_cache.clear(node_id)
385+
self.result_cache.clear(parent_node_id)
364386

365387
def _get_nodes_by_ids(self, nodes_ids: List[str]) -> List[RepresentationNode]:
366388
return [self.scheduler.mapping[node_id] for node_id in nodes_ids]

src/main/python/systemds/scuro/drsearch/task.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from systemds.scuro.models.model import Model
2525
import numpy as np
2626
from sklearn.model_selection import train_test_split
27-
27+
import sys
2828
from systemds.scuro.representations.representation import RepresentationStats
2929

3030

@@ -104,15 +104,53 @@ def get_output_stats(self, input_stats):
104104
return RepresentationStats(0, (0,))
105105

106106
def estimate_peak_memory_bytes(self, input_stats):
107+
label_bytes = self.labels.nbytes * 3 # should be a np array
108+
train_indices_bytes = sum([sys.getsizeof(i) for i in self.train_indices]) * 2
109+
test_indices_bytes = sum([sys.getsizeof(i) for i in self.test_indices]) * 2
110+
cv_train_indices_bytes = (
111+
sum(
112+
[
113+
sum([sys.getsizeof(i) for i in fold])
114+
for fold in self.cv_train_indices
115+
]
116+
)
117+
* 2
118+
)
119+
cv_val_indices_bytes = (
120+
sum([sum([sys.getsizeof(i) for i in fold]) for fold in self.cv_val_indices])
121+
* 2
122+
)
123+
fusion_train_indices_bytes = (
124+
sum([sys.getsizeof(i) for i in self.fusion_train_indices]) * 2
125+
)
126+
input_data = input_stats.num_instances * input_stats.output_shape[0] * 4 * 3
127+
128+
total_bytes = (
129+
input_data
130+
+ label_bytes
131+
+ train_indices_bytes
132+
+ test_indices_bytes
133+
+ cv_train_indices_bytes
134+
+ cv_val_indices_bytes
135+
+ fusion_train_indices_bytes
136+
)
137+
input_stats_bytes = input_stats.num_instances * input_stats.output_shape[0] * 4
107138
if hasattr(self.model, "estimate_peak_memory_bytes"):
108-
# TODO: Investigate the influence of cv on the memory footprint of the task
109-
return self.model.estimate_peak_memory_bytes(
110-
input_stats.output_shape[0], len(self.train_indices)
139+
model_peak_memory_cpu, model_peak_memory_gpu = (
140+
self.model.estimate_peak_memory_bytes(
141+
input_stats.output_shape[0], len(self.train_indices)
142+
)
111143
)
144+
return {
145+
"cpu_peak_bytes": model_peak_memory_cpu
146+
+ total_bytes
147+
+ input_stats_bytes,
148+
"gpu_peak_bytes": model_peak_memory_gpu,
149+
}
112150
else:
113151
# TODO: Implement a default estimate of the peak memory bytes for the task
114152
return {
115-
"cpu_peak_bytes": 0,
153+
"cpu_peak_bytes": total_bytes + input_stats_bytes,
116154
"gpu_peak_bytes": 0,
117155
}
118156

src/main/python/systemds/scuro/modality/modality.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ def __init__(
4646
self.metadata = metadata
4747
self.data = []
4848
self.data_type = data_type
49-
self.cost = None
50-
self.shape = None
5149
self.modality_id = modality_id
5250
self.transform_time = transform_time if transform_time else 0
5351
self.stats = RepresentationStats(0, ())

src/main/python/systemds/scuro/modality/transformed.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from systemds.scuro.modality.modality import Modality
2626
from systemds.scuro.representations.window_aggregation import WindowAggregation
2727
import time
28-
import copy
28+
import sys
2929

3030

3131
class TransformedModality(Modality):
@@ -56,7 +56,7 @@ def __init__(
5656
)
5757
if set_data:
5858
self.data = modality.data
59-
self.transformation = None
59+
6060
self.self_contained = (
6161
self_contained and transformation.self_contained
6262
if isinstance(transformation, TransformedModality)
@@ -84,8 +84,28 @@ def __init__(
8484
# )
8585
# self.transformation.append(transformation)
8686

87-
def copy_from_instance(self):
88-
return type(self)(self, self.transformation)
87+
def calculate_memory_usage(self):
88+
data_bytes = 0
89+
for instance in self.data:
90+
data_bytes += self._estimate_data_bytes(instance)
91+
92+
md_bytes = 0
93+
for key, value in self.metadata.items():
94+
md_bytes += self._estimate_data_bytes(key)
95+
md_bytes += self._estimate_data_bytes(value)
96+
97+
total_bytes = (
98+
data_bytes
99+
+ md_bytes
100+
+ sys.getsizeof(self.data_type)
101+
+ sys.getsizeof(self.modality_id)
102+
+ sys.getsizeof(self.schema)
103+
+ sys.getsizeof(self.stats)
104+
+ sys.getsizeof(self.self_contained)
105+
+ sys.getsizeof(self.transform_time)
106+
+ sys.getsizeof(self.modality_type)
107+
)
108+
return total_bytes
89109

90110
def join(self, right, join_condition):
91111
chunked_execution = False
@@ -225,3 +245,13 @@ def _padded_dimensionality_reduction(self, dimensionality_reduction_operator):
225245
all_outputs.append(out)
226246
start = end
227247
return np.concatenate(all_outputs, axis=0)
248+
249+
def _estimate_data_bytes(self, instance):
250+
if isinstance(instance, np.ndarray):
251+
return instance.nbytes
252+
elif isinstance(instance, list):
253+
return sum(self._estimate_data_bytes(item) for item in instance)
254+
elif isinstance(instance, dict):
255+
return sum(self._estimate_data_bytes(item) for item in instance.values())
256+
else:
257+
return sys.getsizeof(instance)

0 commit comments

Comments
 (0)