Skip to content

Commit 438621a

Browse files
skylerfoster67mambelli
authored andcommitted
Added two metrics on the Source - de_source_status and
de_source_acquire_seconds
1 parent f54b92a commit 438621a

1 file changed

Lines changed: 23 additions & 2 deletions

File tree

src/decisionengine/framework/engine/SourceWorkers.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,20 @@
2323
from decisionengine.framework.taskmanager.module_graph import _create_module_instance
2424
from decisionengine.framework.taskmanager.ProcessingState import ProcessingState, State
2525
from decisionengine.framework.util.countdown import Countdown
26-
from decisionengine.framework.util.metrics import Gauge
26+
from decisionengine.framework.util.metrics import Gauge, Histogram
2727

2828
_DEFAULT_SCHEDULE = 300 # 5 minutes
2929

3030
MB = 1000000
3131

32+
SOURCE_STATUS = Gauge(
33+
"de_source_status",
34+
"Status of Source Data",
35+
[
36+
"source_name",
37+
],
38+
)
39+
3240
SOURCE_ACQUIRE_GAUGE = Gauge(
3341
"de_source_last_acquire_timestamp_seconds",
3442
"Last time a source successfully ran its acquire function",
@@ -37,6 +45,14 @@
3745
],
3846
)
3947

48+
SOURCE_ACQUIRE_HISTOGRAM = Histogram(
49+
"de_source_acquire_seconds",
50+
"How long it took to acquire the source data",
51+
[
52+
"source_name",
53+
],
54+
)
55+
4056

4157
class SourceWorker(multiprocessing.Process):
4258
"""
@@ -144,12 +160,14 @@ def take_offline(self):
144160
if self.state.has_value(State.ERROR):
145161
return
146162
self.state.set(State.OFFLINE)
163+
SOURCE_STATUS.labels(self.key).set(State.OFFLINE.value)
147164

148165
def run(self):
149166
"""
150167
Get the data from source
151168
"""
152169
self.state.set(State.ACTIVE)
170+
SOURCE_STATUS.labels(self.key).set(State.ACTIVE.value)
153171
self.setup_logger()
154172
self.logger.info(f"Starting source loop for {self.key}")
155173
SOURCE_ACQUIRE_GAUGE.labels(self.key)
@@ -158,7 +176,8 @@ def run(self):
158176
while not self.state.should_stop():
159177
try:
160178
self.logger.info(f"Source {self.key} calling acquire")
161-
data = self.module_instance.acquire()
179+
with SOURCE_ACQUIRE_HISTOGRAM.labels(self.key).time():
180+
data = self.module_instance.acquire()
162181
Module.verify_products(self.module_instance, data)
163182
self.logger.info(f"Source {self.key} acquire returned")
164183
SOURCE_ACQUIRE_GAUGE.labels(self.key).set_to_current_time()
@@ -180,10 +199,12 @@ def run(self):
180199
self.logger.info(f"Source {self.key} finished cycle")
181200
if not self.state.should_stop() and not self.state.has_value(State.STEADY):
182201
self.state.set(State.STEADY)
202+
SOURCE_STATUS.labels(self.key).set(State.STEADY.value)
183203
except Exception:
184204
self.logger.exception(f"Exception running source {self.key} ")
185205
self.logger.debug(f"Sending shutdown flag to queue {self.queue.name}")
186206
self.state.set(State.ERROR)
207+
SOURCE_STATUS.labels(self.key).set(State.ERROR.value)
187208
producer.publish(
188209
dict(source_name=self.key, source_module=self.module, data=State.SHUTDOWN),
189210
routing_key=self.key,

0 commit comments

Comments
 (0)