Skip to content

Commit e1c28e7

Browse files
vitodbmambelli
authored andcommitted
commit da19f6f
Author: Kyle Knoepfel <knoepfel@fnal.gov> Date: Thu Sep 30 21:45:50 2021 +0000 Support retries for contacting Graphite. (cherry picked from commit a9c1f3c)
1 parent e56c4d2 commit e1c28e7

3 files changed

Lines changed: 51 additions & 37 deletions

File tree

src/decisionengine_modules/AWS/publishers/AWS_generic_publisher.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@ class AWSGenericPublisher(Publisher.Publisher, metaclass=abc.ABCMeta):
1818
def __init__(self, config):
1919
super().__init__(config)
2020
self.logger = self.logger.bind(class_module=__name__.split(".")[-1], )
21-
self.graphite_host = config.get(
22-
'graphite_host', DEFAULT_GRAPHITE_HOST)
23-
self.graphite_port = config.get(
24-
'graphite_port', DEFAULT_GRAPHITE_PORT)
25-
self.graphite_context_header = config.get(
26-
'graphite_context', DEFAULT_GRAPHITE_CONTEXT)
27-
self.publish_to_graphite = config.get('publish_to_graphite')
28-
self.output_file = config.get('output_file')
21+
self.graphite_host = config.get("graphite_host", DEFAULT_GRAPHITE_HOST)
22+
self.graphite_port = config.get("graphite_port", DEFAULT_GRAPHITE_PORT)
23+
self.graphite_context_header = config.get("graphite_context", DEFAULT_GRAPHITE_CONTEXT)
24+
self.max_retries = config.get("max_retries", 2)
25+
self.retry_interval = config.get("retry_interval", 60)
26+
self.publish_to_graphite = config.get("publish_to_graphite")
27+
self.output_file = config.get("output_file")
2928

3029
@classmethod
3130
def consumes_dataframe(cls, product_name):
@@ -52,12 +51,15 @@ def publish(self, data_block):
5251
return
5352
data = data_block[list(self._consumes.keys())[0]]
5453
if self.graphite_host and self.publish_to_graphite:
55-
end_point = graphite.Graphite(host=self.graphite_host,
56-
pickle_port=self.graphite_port)
57-
end_point.send_dict(self.graphite_context(data)[0],
58-
self.graphite_context(data)[1],
59-
debug_print=False,
60-
send_data=True)
54+
end_point = graphite.Graphite(host=self.graphite_host, pickle_port=self.graphite_port)
55+
end_point.send_dict(
56+
self.graphite_context(data)[0],
57+
self.graphite_context(data)[1],
58+
debug_print=False,
59+
send_data=True,
60+
max_retries=self.max_retries,
61+
retry_interval=self.retry_interval,
62+
)
6163
csv_data = data.to_csv(self.output_file, index=False)
6264
if not self.output_file:
6365
print(csv_data)

src/decisionengine_modules/graphite/publishers/generic_publisher.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,26 @@
1414
DEFAULT_GRAPHITE_CONTEXT = ""
1515

1616

17-
@Publisher.supports_config(Parameter('graphite_host', default=DEFAULT_GRAPHITE_HOST),
18-
Parameter('graphite_port', default=DEFAULT_GRAPHITE_PORT),
19-
Parameter('graphite_context', default=DEFAULT_GRAPHITE_CONTEXT),
20-
Parameter('publish_to_graphite', type=bool),
21-
Parameter('output_file', type=str))
17+
@Publisher.supports_config(
18+
Parameter("graphite_host", default=DEFAULT_GRAPHITE_HOST),
19+
Parameter("graphite_port", default=DEFAULT_GRAPHITE_PORT),
20+
Parameter("max_retries", default=2, comment="Number of retries allowed to send data to Graphite."),
21+
Parameter("retry_interval", default=60, comment="Number of seconds to wait between retries."),
22+
Parameter("graphite_context", default=DEFAULT_GRAPHITE_CONTEXT),
23+
Parameter("publish_to_graphite", type=bool),
24+
Parameter("output_file", type=str),
25+
)
2226
class GenericPublisher(Publisher.Publisher, metaclass=abc.ABCMeta):
2327

2428
def __init__(self, config):
2529
super().__init__(config)
26-
self.graphite_host = config.get('graphite_host', DEFAULT_GRAPHITE_HOST)
27-
self.graphite_port = config.get('graphite_port', DEFAULT_GRAPHITE_PORT)
28-
self.graphite_context_header = config.get('graphite_context', DEFAULT_GRAPHITE_CONTEXT)
29-
self.publish_to_graphite = config.get('publish_to_graphite')
30-
self.output_file = config.get('output_file')
30+
self.graphite_host = config.get("graphite_host", DEFAULT_GRAPHITE_HOST)
31+
self.graphite_port = config.get("graphite_port", DEFAULT_GRAPHITE_PORT)
32+
self.graphite_context_header = config.get("graphite_context", DEFAULT_GRAPHITE_CONTEXT)
33+
self.max_retries = config.get("max_retries", 2)
34+
self.retry_interval = config.get("retry_interval", 60)
35+
self.publish_to_graphite = config.get("publish_to_graphite")
36+
self.output_file = config.get("output_file")
3137
self.logger = self.logger.bind(class_module=__name__.split(".")[-1], )
3238

3339
@classmethod
@@ -60,11 +66,14 @@ def publish(self, data_block):
6066
self.logger.exception(f"Failed to extract {product} from data block.")
6167
return
6268
if self.graphite_host and self.publish_to_graphite:
63-
end_point = graphite.Graphite(host=self.graphite_host,
64-
pickle_port=self.graphite_port)
65-
end_point.send_dict(self.graphite_context(data)[0],
66-
self.graphite_context(data)[1],
67-
debug_print=False,
68-
send_data=True)
69+
end_point = graphite.Graphite(host=self.graphite_host, pickle_port=self.graphite_port)
70+
end_point.send_dict(
71+
self.graphite_context(data)[0],
72+
self.graphite_context(data)[1],
73+
debug_print=False,
74+
send_data=True,
75+
max_retries=self.max_retries,
76+
retry_interval=self.retry_interval,
77+
)
6978
if not self.output_file:
7079
self.logger.debug(data.to_csv(self.output_file, index=False))

src/decisionengine_modules/graphite_client.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
import structlog
66
from decisionengine.framework.modules.logging_configDict import CHANNELLOGGERNAME
77

8+
from functools import partial
9+
10+
from decisionengine_modules.util.retry_function import retry_wrapper
11+
812

913
def sanitize_key(key):
1014
if key is None:
@@ -25,12 +29,13 @@ def __init__(self, host="fifemondata.fnal.gov", pickle_port=2004):
2529
self.logger = structlog.getLogger(CHANNELLOGGERNAME)
2630
self.logger = self.logger.bind(module_class=__name__.split(".")[-1], channel="")
2731

28-
def send_dict(self, namespace, data, debug_print=True, send_data=True):
32+
def send_dict(self, namespace, data, debug_print=True, send_data=True, max_retries=2, retry_interval=60):
2933
"""send data contained in dictionary as {k: v} to graphite dataset
3034
$namespace.k with current timestamp"""
3135
if data is None:
3236
self.logger.warning("Warning: send_dict called with no data")
3337
return
38+
3439
now = int(time.time())
3540
post_data = []
3641
# turning data dict into [('$path.$key',($timestamp,$value)),...]]
@@ -48,14 +53,12 @@ def send_dict(self, namespace, data, debug_print=True, send_data=True):
4853
return
4954
# throw data at graphite
5055

51-
s = socket.socket()
52-
try:
56+
retry_wrapper(partial(self._send_to_graphite, message), max_retries, retry_interval)
57+
58+
def _send_to_graphite(self, message):
59+
with socket.socket() as s:
5360
s.connect((self.graphite_host, self.graphite_pickle_port))
5461
s.sendall(message)
55-
except socket.error:
56-
self.logger.exception(f"Error sending data to graphite at {self.graphite_host}:{self.graphite_pickle_port}")
57-
finally:
58-
s.close()
5962

6063

6164
if __name__ == "__main__":

0 commit comments

Comments
 (0)