-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathprometheus.py
More file actions
123 lines (106 loc) · 4.18 KB
/
Copy pathprometheus.py
File metadata and controls
123 lines (106 loc) · 4.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from __future__ import annotations
from logging import getLogger
from typing import Optional
from fastapi import APIRouter, Depends
from prometheus_client import Counter, Gauge
from sqlmodel import select
import murfey.server.prometheus as prom
from murfey.server.api.auth import validate_instrument_token
from murfey.server.murfey_db import murfey_db
from murfey.util import sanitise
from murfey.util.db import RsyncInstance
from murfey.util.models import RsyncerInfo
logger = getLogger("murfey.server.api.prometheus")
router = APIRouter(
prefix="/prometheus",
dependencies=[Depends(validate_instrument_token)],
tags=["Prometheus"],
)
@router.post("/visits/{visit_name}/increment_rsync_file_count")
def increment_rsync_file_count(
visit_name: str, rsyncer_info: RsyncerInfo, db=murfey_db
):
try:
rsync_instance = db.exec(
select(RsyncInstance).where(
RsyncInstance.source == rsyncer_info.source,
RsyncInstance.destination == rsyncer_info.destination,
RsyncInstance.session_id == rsyncer_info.session_id,
)
).one()
except Exception:
logger.error(
f"Failed to find rsync instance for visit {sanitise(visit_name)} "
"with the following properties: \n"
f"{rsyncer_info.model_dump()}",
exc_info=True,
)
return None
rsync_instance.files_counted += rsyncer_info.increment_count
db.add(rsync_instance)
db.commit()
db.close()
prom.seen_files.labels(rsync_source=rsyncer_info.source, visit=visit_name).inc(
rsyncer_info.increment_count
)
prom.seen_data_files.labels(rsync_source=rsyncer_info.source, visit=visit_name).inc(
rsyncer_info.increment_data_count
)
@router.post("/visits/{visit_name}/increment_rsync_transferred_files")
def increment_rsync_transferred_files(
visit_name: str, rsyncer_info: RsyncerInfo, db=murfey_db
):
rsync_instance = db.exec(
select(RsyncInstance).where(
RsyncInstance.source == rsyncer_info.source,
RsyncInstance.destination == rsyncer_info.destination,
RsyncInstance.session_id == rsyncer_info.session_id,
)
).one()
rsync_instance.files_transferred += rsyncer_info.increment_count
db.add(rsync_instance)
db.commit()
db.close()
@router.post("/visits/{visit_name}/increment_rsync_transferred_files_prometheus")
def increment_rsync_transferred_files_prometheus(
visit_name: str, rsyncer_info: RsyncerInfo, db=murfey_db
):
prom.transferred_files.labels(
rsync_source=rsyncer_info.source, visit=visit_name
).inc(rsyncer_info.increment_count)
prom.transferred_files_bytes.labels(
rsync_source=rsyncer_info.source, visit=visit_name
).inc(rsyncer_info.bytes)
prom.transferred_data_files.labels(
rsync_source=rsyncer_info.source, visit=visit_name
).inc(rsyncer_info.increment_data_count)
prom.transferred_data_files_bytes.labels(
rsync_source=rsyncer_info.source, visit=visit_name
).inc(rsyncer_info.data_bytes)
@router.post("/visits/{visit_name}/monitoring/{on}")
def change_monitoring_status(visit_name: str, on: int):
prom.monitoring_switch.labels(visit=visit_name)
prom.monitoring_switch.labels(visit=visit_name).set(on)
@router.get("/metrics/{metric_name}")
def inspect_prometheus_metrics(
metric_name: str,
):
"""
A debugging endpoint that returns the current contents of any Prometheus
gauges and counters that have been set up thus far.
"""
# Extract the Prometheus metric defined in the Prometheus module
metric: Optional[Counter | Gauge] = getattr(prom, metric_name, None)
if metric is None or not isinstance(metric, (Counter, Gauge)):
raise LookupError("No matching metric was found")
# Package contents into dict and return
results = {}
if hasattr(metric, "_metrics"):
for i, (label_tuple, sub_metric) in enumerate(metric._metrics.items()):
labels = dict(zip(metric._labelnames, label_tuple))
labels["value"] = sub_metric._value.get()
results[i] = labels
return results
else:
value = metric._value.get()
return {"value": value}