Skip to content

Commit d7cf668

Browse files
committed
feat: implement dynamic string metrics
1 parent e316f53 commit d7cf668

10 files changed

Lines changed: 123 additions & 44 deletions

File tree

bec_lib/bec_lib/messages.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from enum import Enum, auto
1010
from importlib.metadata import PackageNotFoundError
1111
from importlib.metadata import version as importlib_version
12-
from typing import Annotated, Any, ClassVar, Literal, Self, Union
12+
from typing import Annotated, Any, ClassVar, Literal, Mapping, Self, TypedDict, Union
1313
from uuid import uuid4
1414

1515
import numpy as np
@@ -1192,8 +1192,19 @@ class ServiceMetricMessage(BECMessage):
11921192

11931193
class _StrDynamicMetricValue(BaseModel):
11941194
value: str
1195+
possible_values: list[str]
11951196
type_name: Literal["str"] = "str"
11961197

1198+
@model_validator(mode="after")
1199+
def _validate(self) -> Self:
1200+
if self.value not in self.possible_values:
1201+
raise ValueError(
1202+
f"Invalid string metric: value '{self.value}' not in possible values: '{self.possible_values}'. Provide the possible values for string metrics."
1203+
)
1204+
if len(set(self.possible_values)) != len(self.possible_values):
1205+
raise ValueError(f"Duplicates in possible values: {self.possible_values}")
1206+
return self
1207+
11971208

11981209
class _IntDynamicMetricValue(BaseModel):
11991210
value: int
@@ -1219,6 +1230,14 @@ class _BoolDynamicMetricValue(BaseModel):
12191230
]
12201231

12211232

1233+
class StringMetricDict(TypedDict):
1234+
value: str
1235+
possible_values: list[str]
1236+
1237+
1238+
DynamicMetricDict = Mapping[str, StringMetricDict | int | float | bool]
1239+
1240+
12221241
class DynamicMetricMessage(BECMessage):
12231242
"""Message for propagating metrics to the log ingestor.
12241243
@@ -1230,14 +1249,16 @@ class DynamicMetricMessage(BECMessage):
12301249
timestamp: float = Field(default_factory=time.time)
12311250

12321251
@classmethod
1233-
def from_dict(cls, metrics: dict[str, str | int | float | bool], separator: str = "-"):
1252+
def from_dict(cls, metrics: DynamicMetricDict, separator: str = "-"):
1253+
def _fmt(value):
1254+
if isinstance(value, dict):
1255+
return {"type_name": "str", **value}
1256+
return {"value": value, "type_name": type(value).__name__}
1257+
12341258
return cls.model_validate(
12351259
{
1236-
"metrics": {
1237-
# Metric names are concatenated with the group in the ingestor
1238-
separator + k: {"value": v, "type_name": type(v).__name__}
1239-
for k, v in metrics.items()
1240-
}
1260+
# Metric names are concatenated with the group in the ingestor
1261+
"metrics": {separator + k: _fmt(v) for k, v in metrics.items()}
12411262
}
12421263
)
12431264

bec_lib/bec_lib/redis_connector.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
BECMessage,
5555
BundleMessage,
5656
ClientInfoMessage,
57+
DynamicMetricDict,
5758
DynamicMetricMessage,
5859
ErrorInfo,
5960
)
@@ -1562,13 +1563,7 @@ def blocking_list_pop(
15621563
return None
15631564
return MsgpackSerialization.loads(raw_msg[1]) # type: ignore # list pop returns one item
15641565

1565-
def publish_metrics(
1566-
self, group_name: str, metrics: dict[str, str | int | float | bool], separator="_"
1567-
):
1568-
if str in set(map(type, metrics.values())):
1569-
bec_logger.logger.warning(
1570-
f"String type found in dynamic metric: {group_name}. String metrics are currently not supported and will be ignored in the ingestor."
1571-
)
1566+
def publish_metrics(self, group_name: str, metrics: DynamicMetricDict, separator="_"):
15721567
msg = DynamicMetricMessage.from_dict(metrics, separator=separator)
15731568
ep = MessageEndpoints.dynamic_metric(group_name)
15741569
self._redis_conn.publish(ep.endpoint, MsgpackSerialization.dumps(msg))

bec_lib/bec_lib/tests/utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,8 @@ def redis_server_is_running(self):
643643
def get_last(self, topic, key):
644644
return None
645645

646-
def publish_metrics(self, group_name: str, metrics: dict[str, str | int | float | bool]):
646+
def publish_metrics(
647+
self, group_name: str, metrics: messages.DynamicMetricDict, separator: str = "_"
648+
):
647649
if not isinstance(group_name, str) or not isinstance(metrics, dict):
648650
raise TypeError("invalid arguments to publish_metrics")

bec_lib/tests/test_bec_messages.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,12 @@ def test_valid_add_slice_various_indices(self, index):
684684

685685
def test_dynamic_metric_message():
686686
message = messages.DynamicMetricMessage.from_dict(
687-
{"m1": 5, "m2": 5.5, "m3": "test", "m4": True}
687+
{
688+
"m1": 5,
689+
"m2": 5.5,
690+
"m3": {"value": "test", "possible_values": ["prod", "test"]},
691+
"m4": True,
692+
}
688693
)
689694
assert isinstance(message.metrics["-m1"], messages._IntDynamicMetricValue)
690695
assert isinstance(message.metrics["-m2"], messages._FloatDynamicMetricValue)

bec_lib/tests/test_redis_connector_fakeredis.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,15 @@ def cb(msg):
600600
data.append(msg.value)
601601

602602
connected_connector.register(ep, cb=cb, start_thread=False)
603-
connected_connector.publish_metrics("test", {"m1": 5, "m2": 5.5, "m3": "test", "m4": True})
603+
connected_connector.publish_metrics(
604+
"test",
605+
{
606+
"m1": 5,
607+
"m2": 5.5,
608+
"m3": {"value": "test", "possible_values": ["test", "prod"]},
609+
"m4": True,
610+
},
611+
)
604612
connected_connector.poll_messages(timeout=1)
605613

606614
stop = time.time()
@@ -610,6 +618,7 @@ def cb(msg):
610618
assert res.metrics["_m1"].value == 5
611619
assert res.metrics["_m2"].value == 5.5
612620
assert res.metrics["_m3"].value == "test"
621+
assert set(res.metrics["_m3"].possible_values) == set(["prod", "test"])
613622
assert res.metrics["_m4"].value is True
614623

615624

bec_server/bec_server/procedures/manager.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,6 @@ def _publish_metrics(self, msg: ProcedureQNotifMessage):
303303
for name in self._helper.get.active_and_pending_queue_names()
304304
},
305305
)
306-
self._conn.publish_metrics(
307-
"active_procedures",
308-
{msg.queue: msg.identifier for msg in self._helper.get.running_procedures()},
309-
)
310306

311307
def _startup(self):
312308
# If the server is restarted, clear any pending requests, they'll have to be resubmitted

bec_server/bec_server/scihub/atlas/atlas_active_client_emitter.py

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919

2020

2121
class AtlasActiveClientEmitter:
22-
2322
def __init__(self, atlas_connector: AtlasConnector) -> None:
2423
self.atlas_connector = atlas_connector
25-
self._last_active_client_metrics: dict[str, messages.DynamicMetricValue] | None = None
24+
self._last_active_client_metrics: messages.DynamicMetricDict = {}
2625
self._start_service_status_subscription()
2726
self._emit_active_client_metrics()
2827

@@ -41,20 +40,29 @@ def _emit_active_client_metrics(self) -> None:
4140
self._last_active_client_metrics = metrics
4241
self.atlas_connector.connector.publish_metrics("active_clients", metrics)
4342

44-
def _get_active_client_metrics(self) -> dict[str, str]:
45-
active_clients = {}
46-
for service_name, status_msg in self.atlas_connector.scihub.service_status.items():
47-
if self._is_internal_service(service_name):
48-
continue
49-
if status_msg.status != messages.BECStatus.RUNNING:
50-
continue
51-
info = status_msg.info
52-
hostname = (
53-
info.hostname if isinstance(info, messages.ServiceInfo) else info.get("hostname")
54-
)
55-
if hostname:
56-
active_clients[service_name] = hostname
57-
return active_clients
43+
def _get_active_client_metrics(self) -> messages.DynamicMetricDict:
44+
clients = [
45+
(name, status)
46+
for (name, status) in self.atlas_connector.scihub.service_status.items()
47+
if not (self._is_internal_service(name) or status.status != messages.BECStatus.RUNNING)
48+
]
49+
ipython_clients = len(list(filter(lambda ns: "BECIPythonClient" in ns[0], clients)))
50+
bec_clients = len(list(filter(lambda ns: "BECClient" in ns[0], clients)))
51+
52+
def _from_console(name_msg: tuple[str, messages.StatusMessage]):
53+
msg = name_msg[1]
54+
if not isinstance(msg.info, messages.ServiceInfo):
55+
return False
56+
return "cons" in msg.info.hostname
57+
58+
from_consoles = len(list(filter(_from_console, clients)))
59+
60+
return {
61+
"total_connected": len(clients),
62+
"bec_clients": bec_clients,
63+
"ipython_clients": ipython_clients,
64+
"from_consoles": from_consoles,
65+
}
5866

5967
@classmethod
6068
def _is_internal_service(cls, service_name: str) -> bool:

bec_server/bec_server/scihub/atlas/atlas_connector.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import os
4+
from enum import StrEnum
45
from typing import TYPE_CHECKING
56

67
from dotenv import dotenv_values
@@ -22,8 +23,29 @@
2223
logger = bec_logger.logger
2324

2425

25-
class AtlasConnector:
26+
class AtlasHost(StrEnum):
27+
NotSet = "none"
28+
Local = "localhost"
29+
Dev = "bec-atlas-dev"
30+
Prod = "bec-atlas-prod"
31+
Qa = "bec-atlas-qa"
32+
Unknown = "unknown"
33+
34+
@classmethod
35+
def translate_host(cls, hostname: str | None):
36+
if not hostname:
37+
return cls.NotSet
38+
for opt in [cls.Local, cls.Dev, cls.Prod, cls.Qa]:
39+
if opt in hostname:
40+
return opt
41+
return cls.Unknown
42+
43+
@classmethod
44+
def fmt_for_metric(cls, hostname: str | None) -> messages.StringMetricDict:
45+
return {"value": cls.translate_host(hostname), "possible_values": list(cls)}
2646

47+
48+
class AtlasConnector:
2749
def __init__(
2850
self, scihub: SciHub, connector: RedisConnector, redis_atlas: RedisConnector | None = None
2951
) -> None:
@@ -54,7 +76,8 @@ def start(self):
5476
self.update_acls()
5577
self.update_available_endpoints()
5678
self.connector.publish_metrics(
57-
"atlas_connection", {"connected": self.connected_to_atlas, "host": self.host or ""}
79+
"atlas_connection",
80+
{"connected": self.connected_to_atlas, "host": AtlasHost.fmt_for_metric(self.host)},
5881
)
5982

6083
@property

bec_server/tests/tests_scihub/test_atlas_active_client_emitter.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,18 @@ def test_get_active_client_metrics_filters_internal_services(atlas_connector):
3636
) as mock_service_status:
3737
mock_service_status.return_value = service_status
3838
assert emitter._get_active_client_metrics() == {
39-
"BECIPythonClient/client-1": "client-host-1",
40-
"BECClient/client-2": "client-host-2",
39+
"bec_clients": 1,
40+
"from_consoles": 0,
41+
"ipython_clients": 1,
42+
"total_connected": 2,
4143
}
4244

4345

4446
def test_handle_service_status_emits_active_client_metric(atlas_connector):
4547
emitter = atlas_connector.active_client_emitter
4648
service_status = {
4749
"BECIPythonClient/client-1": create_status_message(
48-
"BECIPythonClient/client-1", hostname="client-host-1"
50+
"BECIPythonClient/client-1", hostname="client-cons-01"
4951
),
5052
"ScanServer": create_status_message("ScanServer", hostname="server-host"),
5153
}
@@ -59,7 +61,8 @@ def test_handle_service_status_emits_active_client_metric(atlas_connector):
5961
mock_service_status.return_value = service_status
6062
emitter._handle_service_status(None)
6163
mock_publish_metrics.assert_called_once_with(
62-
"active_clients", {"BECIPythonClient/client-1": "client-host-1"}
64+
"active_clients",
65+
{"total_connected": 1, "bec_clients": 0, "ipython_clients": 1, "from_consoles": 1},
6366
)
6467

6568

bec_server/tests/tests_scihub/test_atlas_connector.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from bec_lib import messages
66
from bec_lib.endpoints import MessageEndpoints
7-
from bec_server.scihub.atlas.atlas_connector import AtlasConnector
7+
from bec_server.scihub.atlas.atlas_connector import AtlasConnector, AtlasHost
88

99

1010
def test_atlas_connector_load_env(SciHubMock, connected_atlas_connector):
@@ -103,3 +103,20 @@ def test_atlas_connector_retries_without_ssl_if_tls_fails(SciHubMock, connected_
103103
mock_auth.side_effect = [Exception("SSL connection failed"), mock.DEFAULT]
104104
atlas_connector.connect_to_atlas()
105105
assert atlas_connector.use_tls is False
106+
107+
108+
def test_host_enum_can_be_used_for_string_metric():
109+
poss_strs = ["none", "localhost", "bec-atlas-dev", "bec-atlas-prod", "bec-atlas-qa", "unknown"]
110+
formatted = AtlasHost.fmt_for_metric("bec-atlas-dev.psi.ch")
111+
assert formatted == {"value": AtlasHost.Dev, "possible_values": poss_strs}
112+
msg_value = messages._StrDynamicMetricValue.model_validate(formatted)
113+
assert msg_value.possible_values == poss_strs
114+
115+
116+
def test_translate_atlas_host():
117+
assert AtlasHost.translate_host(None) == "none"
118+
assert AtlasHost.translate_host("some_other_host") == "unknown"
119+
assert AtlasHost.translate_host("bec-atlas-dev.psi.ch") == "bec-atlas-dev"
120+
assert AtlasHost.translate_host("bec-atlas-prod.psi.ch") == "bec-atlas-prod"
121+
assert AtlasHost.translate_host("bec-atlas-qa.psi.ch") == "bec-atlas-qa"
122+
assert AtlasHost.translate_host("localhost") == "localhost"

0 commit comments

Comments
 (0)