Skip to content

Commit 4f31256

Browse files
lcnielmikromyra
authored andcommitted
add more averaging
1 parent 380003b commit 4f31256

3 files changed

Lines changed: 118 additions & 20 deletions

File tree

sams/sampler/SlurmCGroup.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import logging
4040
import os
4141
import re
42+
import time
4243

4344
import sams.base
4445

@@ -51,6 +52,13 @@ def __init__(self, id, outQueue, config):
5152
self.processes = {}
5253
self.cgroup = None
5354
self.cgroup_base = self.config.get([self.id, "cgroup_base"], "/cgroup")
55+
self.create_time = time.time()
56+
self.last_sample_time = self.create_time
57+
self.metrics_to_average = self.config.get(
58+
[self.id, "metrics_to_average"],
59+
["memory_usage"])
60+
self._average_values = {k: 0 for k in self.metrics_to_average}
61+
self._last_averaged_values = {k: 0 for k in self.metrics_to_average}
5462

5563
def do_sample(self):
5664
return self._get_cgroup()
@@ -73,6 +81,7 @@ def sample(self):
7381
"memory_max_usage": memory_max_usage,
7482
"memory_swap": str(int(memory_usage_and_swap) - int(memory_usage)),
7583
}
84+
self.compute_sample_averages(entry)
7685
self._most_recent_sample = [self._storage_wrapping(entry)]
7786
self.store(entry)
7887

@@ -81,6 +90,30 @@ def _get_cgroup_regex():
8190
"""Version-specific regular expression to find correct cgroup path."""
8291
return r"^/(slurm/uid_\d+/job_\d+)/"
8392

93+
def compute_sample_averages(self, data):
94+
""" Computes averages of selected measurements by
95+
means of trapezoidal quadrature, approximating
96+
that the time this function is called is the actual
97+
time of sampling. This is not completely correct but simplifies
98+
the implementation.
99+
"""
100+
sample_time = time.time()
101+
elapsed_time = sample_time - self.last_sample_time
102+
total_elapsed_time = sample_time - self.create_time
103+
for key, item in data.items():
104+
if key in self.metrics_to_average:
105+
# Trapezoidal quadrature
106+
weighted_item = (
107+
0.5 * (float(item) + float(self._last_averaged_values[key])) * elapsed_time)
108+
self._last_averaged_values[key] = item
109+
previous_integral = self._average_values[key] * (total_elapsed_time - elapsed_time)
110+
new_integral = previous_integral + weighted_item
111+
self._average_values[key] = new_integral / total_elapsed_time
112+
113+
for key, item in self._average_values.items():
114+
data[key + '_average'] = item
115+
self.last_sample_time = time.time()
116+
84117
def _get_cgroup(self):
85118
"""Get the cgroup base path for the slurm job"""
86119
if self.cgroup:

sams/sampler/Software.py

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ def __init__(self, id, outQueue, config):
158158
self.last_sample_time = None
159159
self.last_total = None
160160
self.software_mapper = None
161+
self.metrics_to_average = self.config.get(
162+
[self.id, "metrics_to_average"],
163+
["system", "user"])
164+
self._average_values = {k: 0 for k in self.metrics_to_average}
165+
self._last_averaged_values = {k: 0 for k in self.metrics_to_average}
161166

162167
software_mapper = self.config.get([id, "software_mapper"], None)
163168
if software_mapper is not None:
@@ -195,35 +200,62 @@ def sample(self):
195200

196201
for pid in self.pids:
197202
logger.debug("evaluate pid: %d", pid)
198-
if not pid in self.processes.keys():
203+
if pid not in self.processes.keys():
199204
logger.debug("Create new instance of Process for pid: %d", pid)
200205
self.processes[pid] = Process(pid, self.jobid)
201206
self.processes[pid].update(uptime)
202207

203208
# Send information about current usage
204209
aggr, total = self._aggregate()
205-
if self.last_sample_time:
206-
time_diff = time.time() - self.last_sample_time
207-
if time_diff > self.sampler_interval / 2:
208-
entry = {
209-
"current": {
210-
"software": self.map_software(aggr),
211-
"total_user": total["user"],
212-
"total_system": total["system"],
213-
"user": (total["user"] - self.last_total["user"])
214-
/ time_diff,
215-
"system": (total["system"] - self.last_total["system"])
216-
/ time_diff,
217-
}
210+
211+
if self.last_sample_time is None:
212+
self.last_total = total
213+
self.last_sample_time = time.time()
214+
return
215+
216+
time_diff = time.time() - self.last_sample_time
217+
if time_diff > self.sampler_interval / 2:
218+
entry = {
219+
"current": {
220+
"software": self.map_software(aggr),
221+
"total_user": total["user"],
222+
"total_system": total["system"],
223+
"user": (total["user"] - self.last_total["user"])
224+
/ time_diff,
225+
"system": (total["system"] - self.last_total["system"])
226+
/ time_diff,
218227
}
219-
self._most_recent_sample = [self._storage_wrapping(entry)]
220-
self.store(entry)
221-
self.last_total = total
222-
self.last_sample_time = time.time()
223-
else:
228+
}
229+
self.compute_sample_averages(entry["current"])
230+
self._most_recent_sample = [self._storage_wrapping(entry)]
231+
self.store(entry)
224232
self.last_total = total
225233
self.last_sample_time = time.time()
226234

235+
def compute_sample_averages(self, data):
236+
""" Computes averages of selected measurements by
237+
means of trapezoidal quadrature, approximating
238+
that the time this function is called is the actual
239+
time of sampling. This is not completely correct but simplifies
240+
the implementation.
241+
"""
242+
sample_time = time.time()
243+
elapsed_time = sample_time - self.last_sample_time
244+
total_elapsed_time = sample_time - self.create_time
245+
for key, item in data.items():
246+
if key in self.metrics_to_average:
247+
# Trapezoidal quadrature
248+
weighted_item = (
249+
0.5 * (float(item) + float(self._last_averaged_values[key])) * elapsed_time)
250+
self._last_averaged_values[key] = item
251+
previous_integral = self._average_values[key] * (total_elapsed_time - elapsed_time)
252+
new_integral = previous_integral + weighted_item
253+
self._average_values[key] = new_integral / total_elapsed_time
254+
255+
for key, item in self._average_values.items():
256+
data[key + '_average'] = item
257+
data['elapsed_time'] = total_elapsed_time
258+
227259
def last_updated(self):
228260
procs = list(filter(lambda p: not p.ignore, self.processes.values()))
229261
if not procs:
@@ -250,7 +282,7 @@ def _aggregate(self):
250282
a["system"],
251283
)
252284
exe = a["exe"]
253-
if not exe in aggr:
285+
if exe not in aggr:
254286
aggr[exe] = {"user": 0.0, "system": 0.0}
255287
aggr[exe]["user"] += a["user"]
256288
aggr[exe]["system"] += a["system"]

sams/sampler/ZFSStats.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
import logging
4444
import subprocess
45+
import time
4546

4647
import sams.base
4748

@@ -79,12 +80,19 @@ def __init__(self, id, outQueue, config):
7980
self.volumes = self.config.get([self.id, "volumes"])
8081
self.zfs_command = self.config.get([self.id, "zfs_command"], "/sbin/zfs")
8182
self.jobid = self.config.get(["options", "jobid"], 0)
83+
self.create_time = time.time()
84+
self.last_sample_time = self.create_time
85+
self.metrics_to_average = self.config.get(
86+
[self.id, "metrics_to_average"],
87+
["used"])
8288

8389
if not self.volumes:
8490
raise sams.base.SamplerException("volumes not configured")
8591

8692
volumes = [volume % dict(jobid=self.jobid) for volume in self.volumes]
8793

94+
self._average_values = {v: {k: 0 for k in self.metrics_to_average} for v in volumes}
95+
self._last_averaged_values = {v: {k: 0 for k in self.metrics_to_average} for v in volumes}
8896
self.zfsstat = None
8997
if volumes:
9098
self.zfsstat = ZFSStats(volumes=volumes, zfs_command=self.zfs_command)
@@ -98,9 +106,34 @@ def sample(self):
98106
logger.debug("sample()")
99107
if self.zfsstat:
100108
entry = self.zfsstat.sample()
109+
self.compute_sample_averages(entry)
101110
self._most_recent_sample = [self._storage_wrapping(entry)]
102111
self.store(entry)
103112

113+
def compute_sample_averages(self, volume_data):
114+
""" Computes averages of selected measurements by
115+
means of trapezoidal quadrature, approximating
116+
that the time this function is called is the actual
117+
time of sampling. This is not completely correct but simplifies
118+
the implementation.
119+
"""
120+
sample_time = time.time()
121+
elapsed_time = sample_time - self.last_sample_time
122+
total_elapsed_time = sample_time - self.create_time
123+
for v, data in volume_data.items():
124+
for key, item in data.items():
125+
if key in self.metrics_to_average:
126+
# Trapezoidal quadrature
127+
weighted_item = (
128+
0.5 * (float(item) + float(self._last_averaged_values[v][key])) * elapsed_time)
129+
self._last_averaged_values[v][key] = item
130+
previous_integral = self._average_values[v][key] * (total_elapsed_time - elapsed_time)
131+
new_integral = previous_integral + weighted_item
132+
self._average_values[v][key] = new_integral / total_elapsed_time
133+
for key, item in self._average_values[v].items():
134+
data[key + '_average'] = item
135+
self.last_sample_time = sample_time
136+
104137
@classmethod
105138
def final_data(cls):
106139
return {}

0 commit comments

Comments
 (0)