Skip to content

Commit e282a65

Browse files
add memory measuring functionality at runtime
1 parent 215f032 commit e282a65

10 files changed

Lines changed: 209 additions & 138 deletions

File tree

src/main/python/systemds/scuro/drsearch/node_executor.py

Lines changed: 73 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,35 @@
4545
from systemds.scuro.representations.unimodal import UnimodalRepresentation
4646
from systemds.scuro.utils.checkpointing import CheckpointManager
4747
import sys
48+
import threading
49+
import time
50+
import psutil
51+
import os
52+
53+
54+
def measure_peak_rss_during(fn, *args, sample_s=0.01, **kwargs):
55+
proc = psutil.Process(os.getpid())
56+
baseline = proc.memory_info().rss
57+
peak = baseline
58+
stop = threading.Event()
59+
60+
def sampler():
61+
nonlocal peak
62+
while not stop.is_set():
63+
rss = proc.memory_info().rss
64+
if rss > peak:
65+
peak = rss
66+
time.sleep(sample_s)
67+
68+
t = threading.Thread(target=sampler, daemon=True)
69+
t.start()
70+
try:
71+
out = fn(*args, **kwargs)
72+
finally:
73+
stop.set()
74+
t.join()
75+
76+
return out, (peak - baseline), peak
4877

4978

5079
class RefCountResultCache:
@@ -86,60 +115,52 @@ def get_memory_total_memory_usage(self):
86115
return sum(self.memory_usage_per_node.values())
87116

88117

89-
def _execute_node_worker(
90-
node: RepresentationNode,
91-
input_mods: List[Any],
92-
task: Any,
93-
rep_cache: Optional[Dict[str, Any]],
94-
gpu_id: Optional[int],
95-
):
96-
proc = psutil.Process(os.getpid())
97-
before = proc.memory_info().rss # bytes
98-
118+
def _execute_node_worker(node, input_mods, task, rep_cache, gpu_id):
99119
if gpu_id is not None:
100120
device = torch.device(f"cuda:{gpu_id}")
101121
torch.cuda.set_device(device)
102122
torch.cuda.reset_peak_memory_stats(device)
103123

104-
result = None
105124
node_operation = node.operation(params=node.parameters)
106125
operation_name = node_operation.name
107-
# print(
108-
# f"Executing node {node.node_id} inputs: {input_mods[0].modality_id}, gpu: {gpu_id}, operation: {operation_name}"
109-
# )
126+
110127
if gpu_id is not None and hasattr(node_operation, "gpu_id"):
111128
node_operation.gpu_id = gpu_id
112129

113-
if len(input_mods) == 1:
114-
if isinstance(node_operation, Context):
115-
result = input_mods[0].context(node_operation)
116-
elif isinstance(node_operation, DimensionalityReduction):
117-
result = input_mods[0].dimensionality_reduction(node_operation)
118-
elif isinstance(node_operation, AggregatedRepresentation):
119-
result = node_operation.transform(input_mods[0])
120-
elif isinstance(node_operation, UnimodalRepresentation):
121-
if rep_cache is not None and node_operation.name in rep_cache:
122-
result = rep_cache[node_operation.name]
123-
else:
124-
result = input_mods[0].apply_representation(node_operation)
125-
else:
126-
result = input_mods[0].apply_representation(node_operation)
127-
else:
128-
fusion_op = node_operation
129-
if hasattr(fusion_op, "needs_training") and fusion_op.needs_training:
130-
result = input_mods[0].combine_with_training(
131-
input_mods[1:], fusion_op, task
132-
)
130+
def _run_node_op():
131+
if len(input_mods) == 1:
132+
if isinstance(node_operation, Context):
133+
return input_mods[0].context(node_operation)
134+
elif isinstance(node_operation, DimensionalityReduction):
135+
return input_mods[0].dimensionality_reduction(node_operation)
136+
elif isinstance(node_operation, AggregatedRepresentation):
137+
return node_operation.transform(input_mods[0])
138+
elif isinstance(node_operation, UnimodalRepresentation):
139+
if rep_cache is not None and node_operation.name in rep_cache:
140+
return rep_cache[node_operation.name]
141+
return input_mods[0].apply_representation(node_operation)
142+
return input_mods[0].apply_representation(node_operation)
133143
else:
134-
result = input_mods[0].combine(input_mods[1:], fusion_op)
135-
delta_bytes = proc.memory_info().rss - before
144+
fusion_op = node_operation
145+
if hasattr(fusion_op, "needs_training") and fusion_op.needs_training:
146+
return input_mods[0].combine_with_training(
147+
input_mods[1:], fusion_op, task
148+
)
149+
return input_mods[0].combine(input_mods[1:], fusion_op)
150+
151+
result, peak_delta_bytes, peak_abs_rss = measure_peak_rss_during(
152+
_run_node_op,
153+
sample_s=0.01,
154+
)
155+
136156
gpu_peak_bytes = (
137157
torch.cuda.max_memory_allocated(device) if gpu_id is not None else 0
138158
)
139-
# print(f"Node {node.node_id}: {operation_name} has a CPU peak memory usage of {delta_bytes/1024**3:.2f} GB, and a GPU peak memory usage of {gpu_peak_bytes/1024**3:.2f} GB")
159+
140160
return {
141161
"result": result,
142-
"peak_bytes": delta_bytes,
162+
"peak_bytes": peak_delta_bytes, # per-call CPU peak over baseline
163+
"peak_abs_rss_bytes": peak_abs_rss, # optional
143164
"gpu_peak_bytes": gpu_peak_bytes,
144165
"operation_name": operation_name,
145166
}
@@ -148,8 +169,6 @@ def _execute_node_worker(
148169
def _execute_task_worker(
149170
task_node_id: str, task: Any, data: Any, gpu_id: Optional[int]
150171
) -> Dict[str, Any]:
151-
proc = psutil.Process(os.getpid())
152-
before = proc.memory_info().rss # bytes
153172

154173
# print(f"Executing task {task_node_id} on GPU {gpu_id}")
155174
if gpu_id is not None:
@@ -159,18 +178,25 @@ def _execute_task_worker(
159178

160179
if gpu_id is not None and hasattr(task, "model") and hasattr(task.model, "device"):
161180
task.model.device = torch.device(f"cuda:{gpu_id}")
162-
start = time.perf_counter()
163-
scores = task.run(data)
164-
end = time.perf_counter()
165-
delta_bytes = proc.memory_info().rss - before
181+
182+
def _run_task():
183+
start = time.perf_counter()
184+
scores = task.run(data)
185+
end = time.perf_counter()
186+
return scores, end - start
187+
166188
gpu_peak_bytes = (
167189
torch.cuda.max_memory_allocated(device) if gpu_id is not None else 0
168190
)
191+
result, peak_delta_bytes, peak_abs_rss = measure_peak_rss_during(
192+
_run_task,
193+
sample_s=0.01,
194+
)
169195
# print(f"Task {task_node_id} has a CPU peak memory usage of {delta_bytes/1024**3:.2f} GB, and a GPU peak memory usage of {gpu_peak_bytes/1024**3:.2f} GB")
170196
return {
171-
"scores": scores,
172-
"task_time": end - start,
173-
"peak_bytes": delta_bytes,
197+
"scores": result[0],
198+
"task_time": result[1],
199+
"peak_bytes": peak_delta_bytes,
174200
"gpu_peak_bytes": gpu_peak_bytes,
175201
}
176202

src/main/python/systemds/scuro/representations/aggregated_representation.py

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -49,35 +49,31 @@ def __init__(self, aggregation="mean", target_dimensions=None, params=None):
4949
self.data_type = np.float32
5050

5151
def get_output_stats(self, input_stats: RepresentationStats) -> RepresentationStats:
52-
if len(input_stats.output_shape) == 1 or len(input_stats.output_shape) == 2:
53-
return RepresentationStats(
54-
input_stats.num_instances, (input_stats.output_shape[0],)
55-
)
56-
elif len(input_stats.output_shape) == 3:
57-
return RepresentationStats(
58-
input_stats.num_instances,
59-
(
60-
input_stats.output_shape[0],
61-
input_stats.output_shape[1],
62-
),
63-
)
52+
if len(input_stats.output_shape) == 0:
53+
out_shape = (1,)
54+
elif len(input_stats.output_shape) == 1:
55+
out_shape = (1,)
6456
else:
65-
raise ValueError(f"Invalid output shape: {input_stats.output_shape}")
57+
out_shape = input_stats.output_shape[:-1]
58+
return RepresentationStats(input_stats.num_instances, out_shape)
6659

6760
def estimate_output_memory_bytes(self, input_stats: RepresentationStats) -> int:
68-
output_memory_bytes = 1
69-
output_shape = self.get_output_stats(input_stats).output_shape
70-
for dim in output_shape:
71-
output_memory_bytes *= dim
72-
return (
73-
input_stats.num_instances
74-
* output_memory_bytes
75-
* np.dtype(self.data_type).itemsize
76-
)
61+
out_shape = self.get_output_stats(input_stats).output_shape
62+
out_numel = int(np.prod(out_shape)) if len(out_shape) > 0 else 1
63+
dtype_size = 8
64+
return int(input_stats.num_instances * out_numel * dtype_size)
7765

7866
def estimate_peak_memory_bytes(self, input_stats: RepresentationStats) -> dict:
67+
dtype_size = np.dtype(self.data_type).itemsize
68+
in_shape = tuple(input_stats.output_shape)
69+
in_numel = int(np.prod(in_shape)) if len(in_shape) > 0 else 1
70+
input_bytes = int(input_stats.num_instances * in_numel * dtype_size)
71+
output_bytes = self.estimate_output_memory_bytes(input_stats)
72+
safety = 1.2
73+
cpu_peak = input_bytes * 2 + output_bytes * 2
74+
7975
return {
80-
"cpu_peak_bytes": self.estimate_output_memory_bytes(input_stats),
76+
"cpu_peak_bytes": int(cpu_peak * safety),
8177
"gpu_peak_bytes": 0,
8278
}
8379

src/main/python/systemds/scuro/representations/bert.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,8 @@ def get_output_stats(self, input_stats) -> RepresentationStats:
9898
)
9999

100100
def estimate_output_memory_bytes(self, input_stats):
101-
return (
102-
input_stats.num_instances
103-
* self.max_seq_length
104-
* 768
105-
* self.data_type.itemsize
106-
)
101+
output_stats = self.get_output_stats(input_stats).output_shape
102+
return int(input_stats.num_instances * np.prod(output_stats) * 8)
107103

108104
def estimate_peak_memory_bytes(self, input_stats):
109105
model = AutoModel.from_pretrained(self.model_name)
@@ -112,7 +108,7 @@ def estimate_peak_memory_bytes(self, input_stats):
112108

113109
output_bytes = self.estimate_output_memory_bytes(input_stats)
114110

115-
per_instance_input_bytes = self.max_seq_length * 3 * 8
111+
per_instance_input_bytes = int(np.prod(input_stats.output_shape)) * 8
116112
input_bytes_all_instances = input_stats.num_instances * per_instance_input_bytes
117113

118114
safety_margin_bytes = 64 * 1024 * 1024 # 64 MB

src/main/python/systemds/scuro/representations/bow.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,12 @@ def get_output_stats(self, input_stats: TextStats) -> RepresentationStats:
5252
return RepresentationStats(input_stats.num_instances, (vocab_estimate,))
5353

5454
def estimate_output_memory_bytes(self, input_stats: TextStats) -> int:
55+
output_bytes = 1
56+
output_shape = self.get_output_stats(input_stats).output_shape
57+
for dim in output_shape:
58+
output_bytes *= dim
5559
return (
56-
input_stats.num_instances
57-
* self.get_output_stats(input_stats).output_shape[0]
58-
* np.dtype(self.data_type).itemsize
60+
input_stats.num_instances * output_bytes * np.dtype(self.data_type).itemsize
5961
)
6062

6163
def estimate_peak_memory_bytes(self, input_stats: TextStats) -> dict:

src/main/python/systemds/scuro/representations/clip.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,12 @@ def gpu_id(self, gpu_id):
223223
self.device = get_device(gpu_id)
224224

225225
def estimate_output_memory_bytes(self, input_stats) -> int:
226-
return input_stats.num_instances * 512 * self.data_type.itemsize
226+
output_stats = self.get_output_stats(input_stats)
227+
output_bytes = 1
228+
for dim in output_stats.output_shape:
229+
output_bytes *= dim
230+
231+
return input_stats.num_instances * output_bytes * self.data_type.itemsize
227232

228233
def get_output_stats(self, input_stats) -> RepresentationStats:
229234
if not isinstance(input_stats, RepresentationStats):

src/main/python/systemds/scuro/representations/glove.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,12 @@ def get_output_stats(self, input_stats: TextStats) -> RepresentationStats:
6262
return RepresentationStats(input_stats.num_instances, (self.embedding_dim,))
6363

6464
def estimate_output_memory_bytes(self, input_stats: TextStats) -> int:
65+
output_bytes = 1
66+
output_shape = self.get_output_stats(input_stats).output_shape
67+
for dim in output_shape:
68+
output_bytes *= dim
6569
return (
66-
input_stats.num_instances
67-
* self.embedding_dim
68-
* np.dtype(self.data_type).itemsize
70+
input_stats.num_instances * output_bytes * np.dtype(self.data_type).itemsize
6971
)
7072

7173
def estimate_peak_memory_bytes(self, input_stats: TextStats) -> dict:

src/main/python/systemds/scuro/representations/mlp_averaging.py

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,73 @@ def __init__(self, output_dim=512, batch_size=32, params=None):
5858
self.batch_size = batch_size
5959
self.device = None
6060
self.data_type = np.float32
61+
self.gpu_id = None
62+
63+
@property
64+
def gpu_id(self):
65+
return self._gpu_id
66+
67+
@gpu_id.setter
68+
def gpu_id(self, gpu_id):
69+
self._gpu_id = gpu_id
70+
self.device = get_device(gpu_id)
6171

6272
def get_output_stats(self, input_stats: RepresentationStats) -> RepresentationStats:
6373
return RepresentationStats(input_stats.num_instances, (self.output_dim,))
6474

6575
def estimate_output_memory_bytes(self, input_stats: RepresentationStats) -> int:
76+
output_bytes = 1
77+
for dim in input_stats.output_shape:
78+
output_bytes *= dim
6679
return (
67-
input_stats.num_instances
68-
* self.output_dim
69-
* np.dtype(self.data_type).itemsize
80+
input_stats.num_instances * output_bytes * np.dtype(self.data_type).itemsize
7081
)
7182

7283
def estimate_peak_memory_bytes(self, input_stats: RepresentationStats) -> dict:
73-
return {
74-
"cpu_peak_bytes": self.estimate_output_memory_bytes(input_stats),
75-
"gpu_peak_bytes": 0,
76-
}
84+
n = int(input_stats.num_instances)
85+
input_dim = (
86+
int(np.prod(input_stats.output_shape)) if input_stats.output_shape else 0
87+
)
88+
elem_size = np.dtype(self.data_type).itemsize
89+
90+
if input_dim < self.output_dim or n == 0 or input_dim == 0:
91+
input_bytes = n * input_dim * elem_size
92+
cpu_peak = int(input_bytes * 1.05 + 8 * 1024**2) # small safety margin
93+
return {"cpu_peak_bytes": cpu_peak, "gpu_peak_bytes": 0}
94+
95+
out_dim = int(self.output_dim)
96+
batch = int(max(1, min(self.batch_size, n)))
97+
98+
input_bytes = n * input_dim * elem_size
99+
output_bytes = n * out_dim * elem_size
100+
weight_bytes = out_dim * input_dim * elem_size
101+
102+
batch_input_bytes = batch * input_dim * elem_size
103+
batch_output_bytes = batch * out_dim * elem_size
104+
105+
num_batches = (n + batch - 1) // batch
106+
python_overhead = num_batches * 1024
107+
108+
cpu_working = (
109+
input_bytes
110+
+ 2 * output_bytes
111+
+ weight_bytes
112+
+ batch_input_bytes
113+
+ batch_output_bytes
114+
+ python_overhead
115+
)
116+
cpu_peak = int(
117+
cpu_working * 1.20 + 64 * 1024**2
118+
)
119+
120+
device_type = getattr(self.device, "type", "cpu")
121+
if device_type == "cuda":
122+
gpu_working = weight_bytes + batch_input_bytes + batch_output_bytes
123+
gpu_peak = int(gpu_working * 1.35 + 64 * 1024**2)
124+
else:
125+
gpu_peak = 0
126+
127+
return {"cpu_peak_bytes": cpu_peak, "gpu_peak_bytes": gpu_peak}
77128

78129
def execute(self, data):
79130
set_random_seeds(42)
@@ -86,20 +137,9 @@ def execute(self, data):
86137
return data
87138

88139
dim_reduction_model = AggregationMLP(input_dim, self.output_dim)
89-
self.device = get_device_for_model(dim_reduction_model, memory_factor=1.5)
90140
dim_reduction_model = dim_reduction_model.to(self.device)
91141
dim_reduction_model.eval()
92142

93-
# sample = data[0] if data else ""
94-
# self.batch_size = compute_batch_size(
95-
# model=dim_reduction_model,
96-
# device=self.device,
97-
# sample_data=sample,
98-
# tokenizer=None,
99-
# max_seq_length=None,
100-
# max_batch_size=self.batch_size,
101-
# )
102-
103143
tensor_data = torch.from_numpy(data).float()
104144

105145
dataset = TensorDataset(tensor_data)

0 commit comments

Comments
 (0)