Skip to content

Commit c00b204

Browse files
committed
Functional service implementation, tests
* fixed and tested config load * Currently the config merge order is the following. CLI overrides environment vars, environment vars override YAML config and YAML config overrides defaults. None of the separate configs has be complete, but the merge result does. It has to contain all required settings. The reasoning behind the merge order is convenience for Docker image usages. Docker image runs are often customized by `command` and `environment`. Probably the behaviour may be adjusted after some feedback * some automation to name environment variables according to config dataclasses. It's tedious to duplicate the names * added config tests to verify the merge * service implementations for server and client have common funtional of pumps. Those pumps is for pumping inbound WS to ZMQ sink and ZMQ source to outbound WS. Fixed wrong `WSFrame` usage, caught by tests (one shouldn't store `WSFrame` as per the doc). Fine tune finalizations when testing. * tests for multiple identity pipeline, includes no SSL option, multiple message streams, checking conditions when WS write pauses are possible * test fixtures are refactored to keep them in a more controlled and organized way * added Ruff check for sorded imports * added `pytest-xdist` to have parallel workers for tests and corresponding `-n 3` args in `pyproject.toml` in to run `pytest` * added `pytest-randomly` to control the random seed and be able to reproduce tests with randomized fixtures refactor tests cleaner pumps separation fix config and tests
1 parent ee31361 commit c00b204

26 files changed

Lines changed: 1524 additions & 753 deletions

.github/workflows/checks.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ jobs:
2020
run: pyrefly check --summarize-errors .
2121
- name: Lint code with Ruff
2222
run: ruff check --output-format=github
23+
- name: Check sorted imports with Ruff
24+
run: ruff check --select I --output-format=github
2325
- name: Check code formatting with Ruff
2426
run: ruff format --diff
2527
- name: Run pytest unit/integration tests

pyproject.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ project-excludes = [
1818
"**/*venv/**/*",
1919
]
2020

21+
[tool.pytest.ini_options]
22+
addopts = "-n 3"
23+
24+
2125
[project.optional-dependencies]
2226
prerequisite_platform = [
2327
# This is just for visibilty, to track prerequisite packages.
@@ -29,8 +33,10 @@ prerequisite_platform = [
2933
dev = [
3034
"faker>=37.11.0",
3135
"filelock>=3.20.0",
32-
"pyrefly>=0.37.0",
36+
"pyrefly>=0.39.0",
3337
"pytest>=8.4.2",
3438
"pytest-asyncio>=1.2.0",
39+
"pytest-randomly>=4.0.1",
40+
"pytest-xdist>=3.8.0",
3541
"ruff>=0.14.1",
3642
]

savant_cloudpin/cfg/_bootstrap.py

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
1-
import dataclasses
1+
import os.path
22
import re
3-
from pathlib import Path
43
from typing import Any
54

65
from omegaconf import OmegaConf
76
from omegaconf.base import SCMode
87
from omegaconf.dictconfig import DictConfig
98

9+
from savant_cloudpin.cfg import _utils as utils
1010
from savant_cloudpin.cfg._defaults import (
1111
DEFAULT_CLIENT_CONFIG,
1212
DEFAULT_LOAD_CONFIG,
1313
DEFAULT_SERVER_CONFIG,
14-
NULL_SSL_SERVER_CONFIG,
1514
)
1615
from savant_cloudpin.cfg._models import ClientServiceConfig, ServerServiceConfig
1716

@@ -41,35 +40,51 @@ def validated_dataclass[T: ClientServiceConfig | ServerServiceConfig](
4140
return config
4241

4342

43+
def merge_env_config(
44+
defaults: ServerServiceConfig | ClientServiceConfig,
45+
yml_cfg: DictConfig | dict,
46+
cli_cfg: DictConfig | dict,
47+
) -> DictConfig:
48+
default_cfg = OmegaConf.structured(defaults)
49+
env_cfg = utils.as_value_dict(utils.env_override(defaults, "null"))
50+
env_cfg = OmegaConf.to_container(OmegaConf.create(env_cfg), resolve=True)
51+
assert isinstance(env_cfg, dict)
52+
env_cfg = utils.drop_none_values(env_cfg)
53+
cfg = OmegaConf.merge(yml_cfg, env_cfg, cli_cfg)
54+
55+
ssl = "websockets" in cfg and "ssl" in cfg.websockets
56+
ssl = cfg.websockets.ssl if ssl else {}
57+
58+
cfg = OmegaConf.merge(default_cfg, cfg)
59+
assert isinstance(cfg, DictConfig)
60+
if not any(val is not None for val in ssl.values()):
61+
cfg.websockets.ssl = None
62+
return cfg
63+
64+
4465
def load_config(
4566
args_list: list[str] | None = None,
4667
) -> ClientServiceConfig | ServerServiceConfig:
47-
cli_config = OmegaConf.from_cli(args_list)
48-
load_config = OmegaConf.merge(DEFAULT_LOAD_CONFIG, cli_config)
68+
cli_cfg = OmegaConf.from_cli(args_list)
69+
env_cfg = utils.as_value_dict(utils.env_override(DEFAULT_LOAD_CONFIG))
70+
cfg = OmegaConf.merge(env_cfg, cli_cfg)
71+
72+
yml_exists = cfg.config and os.path.exists(cfg.config)
73+
yml_cfg = OmegaConf.load(cfg.config) if yml_exists else OmegaConf.create({})
4974

50-
if load_config.config and Path(load_config.config).exists():
51-
yaml_config = OmegaConf.load(load_config.config)
52-
else:
53-
yaml_config = OmegaConf.create()
75+
cfg = OmegaConf.merge(yml_cfg, env_cfg, cli_cfg)
76+
assert isinstance(cfg, DictConfig) and isinstance(yml_cfg, DictConfig)
5477

55-
config = OmegaConf.merge(DEFAULT_LOAD_CONFIG, yaml_config, cli_config)
56-
assert isinstance(config, DictConfig)
78+
cli_cfg.pop("config", None)
79+
cli_cfg.pop("mode", None)
80+
yml_cfg.pop("mode", None)
5781

58-
config.pop("config", None)
59-
match config.pop("mode", None):
82+
match cfg.mode:
6083
case "server":
61-
ssl_config = OmegaConf.merge(
62-
dataclasses.asdict(NULL_SSL_SERVER_CONFIG), config
63-
).websockets.ssl
64-
assert isinstance(ssl_config, DictConfig)
65-
missing_ssl = all(v is None for _, v in ssl_config.items())
66-
67-
config = OmegaConf.merge(DEFAULT_SERVER_CONFIG, config)
68-
if missing_ssl:
69-
config.websockets.ssl = None
70-
return validated_dataclass(config, ServerServiceConfig)
84+
cfg = merge_env_config(DEFAULT_SERVER_CONFIG, yml_cfg, cli_cfg)
85+
return validated_dataclass(cfg, ServerServiceConfig)
7186
case "client" | None:
72-
config = OmegaConf.merge(DEFAULT_CLIENT_CONFIG, config)
73-
return validated_dataclass(config, ClientServiceConfig)
87+
cfg = merge_env_config(DEFAULT_CLIENT_CONFIG, yml_cfg, cli_cfg)
88+
return validated_dataclass(cfg, ClientServiceConfig)
7489
case _:
7590
raise ValueError("Invalid service mode")

savant_cloudpin/cfg/_defaults.py

Lines changed: 20 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
import copy
2-
3-
from omegaconf import SI
4-
51
from savant_cloudpin.cfg._models import (
62
ClientServiceConfig,
73
ClientSSLConfig,
@@ -15,74 +11,48 @@
1511
)
1612

1713
DEFAULT_SOURCE_CONFIG = ReaderConfig(
18-
results_queue_size=SI("${oc.env:CLOUDPIN_SOURCE_RESULTS_QUEUE_SIZE,100}"),
19-
url="${oc.env:CLOUDPIN_SOURCE_URL,???}",
20-
receive_timeout=SI("${oc.env:CLOUDPIN_SOURCE_RECEIVE_TIMEOUT,null}"),
21-
receive_hwm=SI("${oc.env:CLOUDPIN_SOURCE_RECEIVE_HWM,null}"),
22-
topic_prefix_spec="${oc.env:CLOUDPIN_SOURCE_TOPIC_PREFIX_SPEC,null}",
23-
routing_ids_cache_size=SI("${oc.env:CLOUDPIN_ROUTING_IDS_CACHE_SIZE,null}"),
24-
fix_ipc_permissions=("${oc.env:CLOUDPIN_SOURCE_FIX_IPC_PERMISSIONS,null}"),
25-
source_blacklist_size=SI("${oc.env:CLOUDPIN_SOURCE_SOURCE_BLACKLIST_SIZE,null}"),
26-
source_blacklist_ttl=SI("${oc.env:CLOUDPIN_SOURCE_SOURCE_BLACKLIST_TTL,null}"),
14+
results_queue_size=100,
15+
url="???",
2716
)
2817
DEFAULT_SINK_CONFIG = WriterConfig(
29-
max_inflight_messages=SI("${oc.env:CLOUDPIN_SINK_MAX_INFLIGHT_MESSAGES,100}"),
30-
url="${oc.env:CLOUDPIN_SINK_URL,???}",
31-
send_timeout=SI("${oc.env:CLOUDPIN_SINK_SEND_TIMEOUT,null}"),
32-
send_retries=SI("${oc.env:CLOUDPIN_SINK_SEND_RETRIES,null}"),
33-
send_hwm=SI("${oc.env:CLOUDPIN_SINK_SEND_HWM,null}"),
34-
receive_timeout=SI("${oc.env:CLOUDPIN_SINK_RECEIVE_TIMEOUT,null}"),
35-
receive_retries=SI("${oc.env:CLOUDPIN_SINK_RECEIVE_RETRIES,null}"),
36-
receive_hwm=SI("${oc.env:CLOUDPIN_SINK_RECEIVE_HWM,null}"),
37-
fix_ipc_permissions=SI("${oc.env:CLOUDPIN_SINK_FIX_IPC_PERMISSIONS,null}"),
18+
max_inflight_messages=100,
19+
url="???",
3820
)
3921

4022

4123
DEFAULT_LOAD_CONFIG = LoadConfig(
42-
config="${oc.env:CLOUDPIN_CONFIG_FILE,./cloudpin.yml}",
43-
mode="${oc.env:CLOUDPIN_MODE,client}",
24+
config="./cloudpin.yml",
25+
mode="client",
4426
)
4527

4628
DEFAULT_CLIENT_CONFIG = ClientServiceConfig(
4729
websockets=ClientWSConfig(
48-
server_url="${oc.env:CLOUDPIN_WEBSOCKETS_SERVER_URL,???}",
49-
api_key="${oc.env:CLOUDPIN_WEBSOCKETS_API_KEY,???}",
30+
server_url="???",
31+
api_key="???",
5032
ssl=ClientSSLConfig(
51-
ca_file="${oc.env:CLOUDPIN_WEBSOCKETS_SSL_CA_FILE,null}",
52-
cert_file="${oc.env:CLOUDPIN_WEBSOCKETS_SSL_CERT_FILE,???}",
53-
key_file="${oc.env:CLOUDPIN_WEBSOCKETS_SSL_KEY_FILE,???}",
54-
check_hostname=SI("${oc.env:CLOUDPIN_WEBSOCKETS_SSL_CHECK_HOSTNAME,false}"),
33+
ca_file=None,
34+
cert_file="???",
35+
key_file="???",
36+
check_hostname=False,
5537
),
5638
),
57-
io_timeout=SI("${oc.env:CLOUDPIN_IO_TIMEOUT,0.1}"),
39+
io_timeout=0.1,
5840
source=DEFAULT_SOURCE_CONFIG,
5941
sink=DEFAULT_SINK_CONFIG,
6042
)
6143

6244
DEFAULT_SERVER_CONFIG = ServerServiceConfig(
6345
websockets=ServerWSConfig(
64-
server_url="${oc.env:CLOUDPIN_WEBSOCKETS_SERVER_URL,???}",
65-
api_key="${oc.env:CLOUDPIN_WEBSOCKETS_API_KEY,???}",
46+
server_url="???",
47+
api_key="???",
6648
ssl=ServerSSLConfig(
67-
ca_file="${oc.env:CLOUDPIN_WEBSOCKETS_SSL_CA_FILE,null}",
68-
cert_file="${oc.env:CLOUDPIN_WEBSOCKETS_SSL_CERT_FILE,???}",
69-
key_file="${oc.env:CLOUDPIN_WEBSOCKETS_SSL_KEY_FILE,???}",
70-
client_cert_required=SI(
71-
"${oc.env:CLOUDPIN_WEBSOCKETS_SSL_CLIENT_CERT_REQUIRED,true}"
72-
),
49+
ca_file=None,
50+
cert_file="???",
51+
key_file="???",
52+
client_cert_required=True,
7353
),
7454
),
75-
io_timeout=SI("${oc.env:CLOUDPIN_IO_TIMEOUT,0.1}"),
55+
io_timeout=0.1,
7656
source=DEFAULT_SOURCE_CONFIG,
7757
sink=DEFAULT_SINK_CONFIG,
7858
)
79-
80-
NULL_SSL_SERVER_CONFIG = copy.deepcopy(DEFAULT_SERVER_CONFIG)
81-
NULL_SSL_SERVER_CONFIG.websockets.ssl = ServerSSLConfig(
82-
ca_file="${oc.env:CLOUDPIN_WEBSOCKETS_SSL_CA_FILE,null}",
83-
cert_file="${oc.env:CLOUDPIN_WEBSOCKETS_SSL_CERT_FILE,null}",
84-
key_file="${oc.env:CLOUDPIN_WEBSOCKETS_SSL_KEY_FILE,null}",
85-
client_cert_required=SI(
86-
"${oc.env:CLOUDPIN_WEBSOCKETS_SSL_CLIENT_CERT_REQUIRED,null}"
87-
),
88-
)

savant_cloudpin/cfg/_models.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from dataclasses import dataclass, replace
2-
from typing import Self, TypedDict
2+
from typing import Self
33

44
from savant_rs import zmq
55

@@ -21,13 +21,10 @@ class ReaderConfig:
2121
def as_router(self) -> Self:
2222
return replace(self, url="router+" + self.url.split("+")[-1])
2323

24-
def blocking_params(self) -> zmq.ReaderConfig:
24+
def to_args(self) -> tuple[zmq.ReaderConfig, int]:
2525
cfg = zmq.ReaderConfigBuilder(self.url)
2626
cfg.with_map_config(to_map_config(self, excluded=("url", "results_queue_size")))
27-
return cfg.build()
28-
29-
def nonblocking_params(self) -> tuple[zmq.ReaderConfig, int]:
30-
return self.blocking_params(), self.results_queue_size
27+
return cfg.build(), self.results_queue_size
3128

3229

3330
@dataclass
@@ -45,7 +42,7 @@ class WriterConfig:
4542
def as_dealer(self) -> Self:
4643
return replace(self, url="dealer+" + self.url.split("+")[-1])
4744

48-
def nonblocking_params(self) -> tuple[zmq.WriterConfig, int]:
45+
def to_args(self) -> tuple[zmq.WriterConfig, int]:
4946
cfg = zmq.WriterConfigBuilder(self.url)
5047
cfg.with_map_config(
5148
to_map_config(self, excluded=("url", "max_inflight_messages"))
@@ -73,7 +70,7 @@ class ClientSSLConfig:
7370
class ServerWSConfig:
7471
server_url: str
7572
api_key: str
76-
ssl: ServerSSLConfig | None
73+
ssl: ServerSSLConfig | None = None
7774

7875

7976
@dataclass
@@ -100,6 +97,7 @@ class ClientServiceConfig(BaseServiceConfig):
10097
websockets: ClientWSConfig
10198

10299

103-
class LoadConfig(TypedDict):
100+
@dataclass
101+
class LoadConfig:
104102
config: str | None
105103
mode: str | None

savant_cloudpin/cfg/_utils.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
import dataclasses
2+
from collections.abc import Mapping
13
from dataclasses import asdict
24
from typing import Any
35

6+
ENV_PREFIX = "CLOUDPIN"
7+
48

59
def to_map_config(
610
dataclass_config: Any, /, excluded: tuple[str, ...]
@@ -11,3 +15,46 @@ def to_map_config(
1115
if isinstance(key, str) and isinstance(val, (str, int))
1216
if key not in excluded
1317
}
18+
19+
20+
def drop_none_values[K](dct: Mapping[K, Any]) -> dict[K, Any]:
21+
result = dict[K, Any]()
22+
for key, val in dct.items():
23+
if isinstance(val, dict):
24+
result[key] = drop_none_values(val)
25+
elif val is not None:
26+
result[key] = val
27+
return result
28+
29+
30+
def as_value_dict(obj: Any) -> dict[str, Any]:
31+
dct = dataclasses.asdict(obj)
32+
return drop_none_values(dct)
33+
34+
35+
def env_override(
36+
obj: Any,
37+
default: str | None = None,
38+
prefix: str = ENV_PREFIX,
39+
) -> Any:
40+
if isinstance(obj, type):
41+
raise ValueError("Instance is expected")
42+
updates = dict[str, Any]()
43+
for field in dataclasses.fields(obj):
44+
name, val = field.name, getattr(obj, field.name)
45+
env_name = f"{prefix}_{name.upper()}"
46+
match val:
47+
case str() | int() | float() if default is None:
48+
updates[name] = f"${{oc.env:{env_name},{val}}}"
49+
case str() | int() | float():
50+
updates[name] = f"${{oc.env:{env_name},{default}}}"
51+
case bool() if default is None:
52+
updates[name] = f"${{oc.env:{env_name},{str(val).lower()}}}"
53+
case bool():
54+
updates[name] = f"${{oc.env:{env_name},{default}}}"
55+
case None:
56+
continue
57+
case _:
58+
updates[name] = env_override(val, default, env_name)
59+
60+
return dataclasses.replace(obj, **updates)

0 commit comments

Comments
 (0)