Skip to content

Commit 82e7a92

Browse files
committed
added Listener functionality
1 parent 125d388 commit 82e7a92

5 files changed

Lines changed: 250 additions & 0 deletions

File tree

sams-collector.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class Main:
4343
def __init__(self):
4444
self.samplers = []
4545
self.outputs = []
46+
self.listeners = []
4647
self.pidQueue = None
4748
self.outQueue = None
4849

@@ -196,6 +197,10 @@ def cleanup(self):
196197
for o in filter(lambda t: t.is_alive(), self.outputs):
197198
o.join()
198199

200+
for lis in self.listeners:
201+
lis.exit()
202+
lis.thread.join()
203+
199204
# exit queues
200205
self.pidQueue.exit()
201206
self.outQueue.exit()
@@ -248,6 +253,19 @@ def start(self):
248253
self.cleanup()
249254
sys.exit(1)
250255

256+
for s in self.config.get([id, "listeners"], []):
257+
logger.info("Load: %s", s)
258+
try:
259+
Listener = sams.core.ClassLoader.load(s, "Listener")
260+
listener = Listener(s, self.config, self.samplers)
261+
self.listeners.append(listener)
262+
listener.start()
263+
except Exception as e:
264+
logger.error("Failed to initialize listener: %s", s)
265+
logger.exception(e)
266+
self.cleanup()
267+
sys.exit(1)
268+
251269
# load PIDFinder class
252270
PidFinder = sams.core.ClassLoader.load(
253271
self.config.get([id, "pid_finder"]), "PIDFinder"

sams/listeners/__init__.py

Whitespace-only changes.

sams/listeners/base_listener.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
""" Base class for socket listening-based demand-driven output. """
2+
from abc import ABC, abstractmethod
3+
from sams.core import Config
4+
from typing import Iterable
5+
import logging
6+
import os
7+
import select
8+
import socket
9+
import threading
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class Listener(ABC):
15+
""" Base class for listening to sockets and sending encoded data. """
16+
def __init__(self,
17+
class_path: str,
18+
config: Config,
19+
samplers: Iterable):
20+
self.class_path = class_path
21+
self.config = config
22+
self.samplers = samplers
23+
socket_family = self.config.get([self.class_path, 'socket_family'],
24+
socket.AF_UNIX)
25+
socket_type = self.config.get([self.class_path, 'socket_type'],
26+
socket.SOCK_STREAM)
27+
protocol_number = self.config.get([self.class_path, 'socket'],
28+
0)
29+
file_number = self.config.get([self.class_path, 'file_number'],
30+
None)
31+
socket_directory = self.config.get([self.class_path, 'socketdir'],
32+
'/tmp/softwareaccounting')
33+
self.job_id = self.config.get(['options', 'jobid'],
34+
0)
35+
self.server_socket = socket.socket(socket_family,
36+
socket_type,
37+
protocol_number,
38+
file_number)
39+
self.is_finished = False
40+
if not os.path.isdir(socket_directory):
41+
os.mkdir(socket_directory)
42+
self.socket_path = f'{socket_directory}/{self.class_path}_{self.job_id:d}.socket'
43+
logger.debug(f'Socket path: {self.socket_path}')
44+
if os.path.exists(self.socket_path):
45+
os.unlink(self.socket_path)
46+
self.server_socket.bind(self.socket_path)
47+
self.thread = threading.Thread(target=self._listen)
48+
49+
def start(self):
50+
""" Starts listening on thread. """
51+
self.thread.start()
52+
logger.debug('Launching listening thread...')
53+
54+
def exit(self):
55+
""" Sets finished flag to True to kill thread, joins and cleans up. """
56+
if not self.is_finished:
57+
self.is_finished = True
58+
if self.thread.is_alive:
59+
self.thread.join(1)
60+
self.server_socket.close()
61+
os.unlink(self.socket_path)
62+
63+
def _listen(self):
64+
"""Listens to the socket for connections."""
65+
self.server_socket.listen(5)
66+
while not self.is_finished:
67+
logger.debug('Waiting for connections...')
68+
readable, _, _ = select.select([self.server_socket], [], [], 1)
69+
if len(readable) > 0:
70+
try:
71+
connection, address = self.server_socket.accept()
72+
logger.debug(f'Connected to {address}')
73+
except Exception as e:
74+
logger.debug(f'Sending data failed due to {e}!')
75+
with connection:
76+
try:
77+
connection.sendall(self.encoded_data)
78+
except Exception as e:
79+
logger.debug(f'Sending data to {address} failed due to {e}!')
80+
81+
@property
82+
@abstractmethod
83+
def encoded_data(self) -> str:
84+
""" Encoded data to be sent to client. """
85+
raise NotImplementedError

sams/listeners/prometheus.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
import logging
2+
import re
3+
from typing import Iterable, Dict, List
4+
from sams.core import Config
5+
from .base_listener import Listener as BaseListener
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class Listener(BaseListener):
11+
"""
12+
Listener for Prometheus output.
13+
"""
14+
def __init__(self,
15+
class_path: str,
16+
config: Config,
17+
samplers: List):
18+
super().__init__(class_path,
19+
config,
20+
samplers)
21+
self.static_map = self.config.get([self.class_path, "static_map"], {})
22+
self.map = self.config.get([self.class_path, "map"], {})
23+
self.metrics = self.config.get([self.class_path, "metrics"], {})
24+
25+
@staticmethod
26+
def _nested_getitem(dct: Dict, keys: Iterable) -> Dict:
27+
""" Fetches an item from nested dictionaries.
28+
If any of the keys in `keys` is missing, this method
29+
returns `None`."""
30+
for key in keys:
31+
if key in dct:
32+
dct = dct[key]
33+
else:
34+
return None
35+
return dct
36+
37+
def _flatten_dict(self, dct, base='') -> List:
38+
""" Recursively flattens a dictionary into a list-of-dictionaries,
39+
with each dictionary in the list possessing the entries `'match'`
40+
and `'value'`."""
41+
out = []
42+
for key in dct.keys():
43+
nb = "/".join([base, key])
44+
if key in dct and isinstance(dct[key], dict):
45+
out = out + self._flatten_dict(dct[key], base=nb)
46+
else:
47+
out = out + [{"match": nb, "value": dct[key]}]
48+
return out
49+
50+
@property
51+
def encoded_data(self) -> bytes:
52+
""" The most recent data from attached samplers,
53+
formatted, compiled and encoded to UTF-8 format. """
54+
data = self._get_all_samples()
55+
# Then match config with entries and create flattened, compiled data
56+
compiled_data = dict()
57+
for match_set in self._get_matching_entries(data):
58+
self._compile_data(data,
59+
compiled_data,
60+
*match_set)
61+
# Parse & encode compiled data into bytestring.
62+
return self._get_bytestring(compiled_data)
63+
64+
def _get_all_samples(self) -> Dict:
65+
""" Returns compilation of all most recent samples
66+
as a dictionary ``{sample['id']: sample[v'alue']}``
67+
Skips ``None`` entries.
68+
"""
69+
data = dict()
70+
for sampler in self.samplers:
71+
if sampler.most_recent_sample is not None:
72+
for s in sampler.most_recent_sample:
73+
# if id repeated, update data dictionary
74+
if s['id'] in data:
75+
data[s['id']].update(s['data'])
76+
else:
77+
data.update({s['id']: s['data']})
78+
return data
79+
80+
def _get_matching_entries(self,
81+
data: Dict) -> Iterable:
82+
flat_dictionary = self._flatten_dict(data)
83+
logger.debug(f'Flat dictionary: {flat_dictionary}')
84+
for d in flat_dictionary:
85+
for metric, destination in self.metrics.items():
86+
reg = re.compile(metric)
87+
m = reg.match(d['match'])
88+
logger.debug(f'Metric matches: {m}')
89+
if m is not None:
90+
yield (str(d['value']),
91+
destination,
92+
m.groupdict())
93+
94+
@staticmethod
95+
def _get_bytestring(compiled_data: Dict) -> bytes:
96+
""" Takes the compiled data, parses entry names, and
97+
turns it into a Prometheus-formatted bytestring,
98+
ready to be sent over socket. """
99+
helped = dict()
100+
metric_re = re.compile(r'^(\S+){')
101+
formatted_data = []
102+
for m in sorted(compiled_data.keys()):
103+
logger.debug(f'Data key: {m}')
104+
match = metric_re.match(m)
105+
# Check if key is in config
106+
if match is None:
107+
continue
108+
if match.group(1) not in helped:
109+
helped[match.group(1)] = True
110+
formatted_data.append(f'# HELP {match.group(1):s} Job Usage Metrics')
111+
formatted_data.append(f'# TYPE {match.group(1):s} gauge')
112+
v = compiled_data[m]
113+
formatted_data.append(f'{m} {str(v)}')
114+
data_bytestring = '\n'.join(formatted_data).encode('utf-8')
115+
return data_bytestring
116+
117+
def _compile_data(self,
118+
in_data: Dict,
119+
out_data: Dict,
120+
value,
121+
destination: str,
122+
extra_mappings: Dict) -> None:
123+
""" Subroutine method that updates `out_data`
124+
with entries from `in_data` based on the provided
125+
mappings. """
126+
# static_map is per-node, constant metadata
127+
d = self.static_map.copy()
128+
d.update(extra_mappings)
129+
# Get per-job metadata
130+
for k, v in self.map.items():
131+
m = self._nested_getitem(in_data, v.split('/'))
132+
if m is None:
133+
logger.warning(f'map: {k}: {v} is missing')
134+
return
135+
d[k] = m
136+
try:
137+
# Parse metadata into output string
138+
dest = destination % d
139+
except Exception as e:
140+
logger.error(e)
141+
return
142+
if len(value) == 0:
143+
logger.warning(f'{dest} got no metric')
144+
# Store output string as key and data as value
145+
out_data[dest] = value
146+
logger.debug(f'Storing {dest} = {str(value)}')

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def finalize_options(self):
8383
"sams.backend",
8484
"sams.software",
8585
"sams.xmlwriter",
86+
"sams.listeners",
8687
],
8788
scripts=[
8889
"sams-aggregator.py",

0 commit comments

Comments
 (0)