|
| 1 | +""" |
| 2 | +Runtime configuration loading and component wiring for IX-HapticSight. |
| 3 | +
|
| 4 | +This module provides a disciplined path for loading the repository's YAML |
| 5 | +configuration files and constructing the baseline runtime-facing core objects: |
| 6 | +
|
| 7 | +- ConsentManager |
| 8 | +- ContactPlanner |
| 9 | +- SafetyGate |
| 10 | +
|
| 11 | +It is intentionally conservative: |
| 12 | +- no hidden global state |
| 13 | +- no ROS 2 assumptions |
| 14 | +- no backend transport code |
| 15 | +- explicit validation of key profile bindings |
| 16 | +
|
| 17 | +The goal is to reduce configuration ambiguity before deeper runtime and |
| 18 | +execution layers are added. |
| 19 | +""" |
| 20 | + |
| 21 | +from __future__ import annotations |
| 22 | + |
| 23 | +from dataclasses import dataclass |
| 24 | +from pathlib import Path |
| 25 | +from typing import Any, Optional |
| 26 | + |
| 27 | +import yaml |
| 28 | + |
| 29 | +from ohip.consent_manager import ConsentManager |
| 30 | +from ohip.contact_planner import ContactPlanner |
| 31 | +from ohip.safety_gate import HardwareInterface, SafetyGate |
| 32 | + |
| 33 | + |
| 34 | +@dataclass(frozen=True) |
| 35 | +class RuntimeComponentBundle: |
| 36 | + """ |
| 37 | + Convenience bundle for the baseline runtime core objects. |
| 38 | +
|
| 39 | + This is not a service container or dependency-injection framework. |
| 40 | + It is just a small explicit grouping that future runtime wrappers can use. |
| 41 | + """ |
| 42 | + |
| 43 | + consent_manager: ConsentManager |
| 44 | + contact_planner: ContactPlanner |
| 45 | + safety_gate: SafetyGate |
| 46 | + |
| 47 | + |
| 48 | +@dataclass(frozen=True) |
| 49 | +class RuntimeConfigBundle: |
| 50 | + """ |
| 51 | + Parsed runtime configuration bundle loaded from repository YAML files. |
| 52 | + """ |
| 53 | + |
| 54 | + force_limits: dict[str, Any] |
| 55 | + culture_profiles: dict[str, Any] |
| 56 | + force_limits_path: Path |
| 57 | + culture_profiles_path: Path |
| 58 | + |
| 59 | + @classmethod |
| 60 | + def from_files( |
| 61 | + cls, |
| 62 | + *, |
| 63 | + force_limits_path: str | Path, |
| 64 | + culture_profiles_path: str | Path, |
| 65 | + ) -> "RuntimeConfigBundle": |
| 66 | + force_path = Path(force_limits_path).expanduser().resolve() |
| 67 | + culture_path = Path(culture_profiles_path).expanduser().resolve() |
| 68 | + |
| 69 | + if not force_path.is_file(): |
| 70 | + raise FileNotFoundError(f"force limits config not found: {force_path}") |
| 71 | + if not culture_path.is_file(): |
| 72 | + raise FileNotFoundError(f"culture profiles config not found: {culture_path}") |
| 73 | + |
| 74 | + force_limits = _load_yaml_file(force_path) |
| 75 | + culture_profiles = _load_yaml_file(culture_path) |
| 76 | + |
| 77 | + bundle = cls( |
| 78 | + force_limits=force_limits, |
| 79 | + culture_profiles=culture_profiles, |
| 80 | + force_limits_path=force_path, |
| 81 | + culture_profiles_path=culture_path, |
| 82 | + ) |
| 83 | + bundle.validate() |
| 84 | + return bundle |
| 85 | + |
| 86 | + @classmethod |
| 87 | + def from_repo_root(cls, repo_root: str | Path) -> "RuntimeConfigBundle": |
| 88 | + root = Path(repo_root).expanduser().resolve() |
| 89 | + return cls.from_files( |
| 90 | + force_limits_path=root / "configs" / "force_limits.yaml", |
| 91 | + culture_profiles_path=root / "configs" / "culture_profiles.yaml", |
| 92 | + ) |
| 93 | + |
| 94 | + def validate(self) -> None: |
| 95 | + """ |
| 96 | + Validate key repository configuration assumptions. |
| 97 | +
|
| 98 | + This is intentionally targeted validation, not a full schema system. |
| 99 | + It catches the most dangerous mismatches early: |
| 100 | + - missing defaults |
| 101 | + - missing profile sections |
| 102 | + - broken profile bindings between culture and force configs |
| 103 | + """ |
| 104 | + _require_mapping(self.force_limits, "force_limits") |
| 105 | + _require_mapping(self.culture_profiles, "culture_profiles") |
| 106 | + |
| 107 | + force_profiles = self.force_limits.get("profiles") |
| 108 | + force_defaults = self.force_limits.get("defaults") |
| 109 | + culture_defaults = self.culture_profiles.get("defaults") |
| 110 | + culture_profiles = self.culture_profiles.get("profiles") |
| 111 | + |
| 112 | + _require_mapping(force_profiles, "force_limits.profiles") |
| 113 | + _require_mapping(force_defaults, "force_limits.defaults") |
| 114 | + _require_mapping(culture_defaults, "culture_profiles.defaults") |
| 115 | + _require_mapping(culture_profiles, "culture_profiles.profiles") |
| 116 | + |
| 117 | + social_profile = force_defaults.get("social_touch_profile") |
| 118 | + if not social_profile: |
| 119 | + raise ValueError("force_limits.defaults.social_touch_profile is required") |
| 120 | + if social_profile not in force_profiles: |
| 121 | + raise ValueError( |
| 122 | + f"force_limits.defaults.social_touch_profile references unknown profile: {social_profile}" |
| 123 | + ) |
| 124 | + |
| 125 | + object_profile = force_defaults.get("object_profile") |
| 126 | + if object_profile and object_profile not in force_profiles: |
| 127 | + raise ValueError( |
| 128 | + f"force_limits.defaults.object_profile references unknown profile: {object_profile}" |
| 129 | + ) |
| 130 | + |
| 131 | + inspection_profile = force_defaults.get("inspection_profile") |
| 132 | + if inspection_profile and inspection_profile not in force_profiles: |
| 133 | + raise ValueError( |
| 134 | + f"force_limits.defaults.inspection_profile references unknown profile: {inspection_profile}" |
| 135 | + ) |
| 136 | + |
| 137 | + default_binding = ( |
| 138 | + (culture_defaults.get("bindings") or {}).get("force_profile") |
| 139 | + if isinstance(culture_defaults.get("bindings"), dict) |
| 140 | + else None |
| 141 | + ) |
| 142 | + if default_binding and default_binding not in force_profiles: |
| 143 | + raise ValueError( |
| 144 | + f"culture_profiles.defaults.bindings.force_profile references unknown force profile: {default_binding}" |
| 145 | + ) |
| 146 | + |
| 147 | + for profile_name, profile_data in culture_profiles.items(): |
| 148 | + if not isinstance(profile_data, dict): |
| 149 | + raise ValueError(f"culture profile must be a mapping: {profile_name}") |
| 150 | + |
| 151 | + bindings = profile_data.get("bindings") or {} |
| 152 | + if bindings and not isinstance(bindings, dict): |
| 153 | + raise ValueError(f"culture profile bindings must be a mapping: {profile_name}") |
| 154 | + |
| 155 | + force_profile = bindings.get("force_profile") |
| 156 | + if force_profile and force_profile not in force_profiles: |
| 157 | + raise ValueError( |
| 158 | + f"culture profile '{profile_name}' references unknown force profile: {force_profile}" |
| 159 | + ) |
| 160 | + |
| 161 | + @property |
| 162 | + def force_profile_names(self) -> list[str]: |
| 163 | + profiles = self.force_limits.get("profiles", {}) |
| 164 | + return sorted(profiles.keys()) |
| 165 | + |
| 166 | + @property |
| 167 | + def culture_profile_names(self) -> list[str]: |
| 168 | + profiles = self.culture_profiles.get("profiles", {}) |
| 169 | + return sorted(profiles.keys()) |
| 170 | + |
| 171 | + @property |
| 172 | + def phrase_bank(self) -> dict[str, dict[str, str]]: |
| 173 | + phrase_bank = self.culture_profiles.get("phrase_bank", {}) |
| 174 | + if not isinstance(phrase_bank, dict): |
| 175 | + return {} |
| 176 | + return phrase_bank |
| 177 | + |
| 178 | + def get_culture_profile(self, profile_name: str = "default") -> dict[str, Any]: |
| 179 | + profiles = self.culture_profiles.get("profiles", {}) |
| 180 | + if profile_name not in profiles: |
| 181 | + available = ", ".join(sorted(profiles.keys())) |
| 182 | + raise KeyError(f"unknown culture profile '{profile_name}'. available: {available}") |
| 183 | + profile = profiles[profile_name] |
| 184 | + if not isinstance(profile, dict): |
| 185 | + raise ValueError(f"culture profile must be a mapping: {profile_name}") |
| 186 | + return profile |
| 187 | + |
| 188 | + def default_force_profile_name(self) -> str: |
| 189 | + defaults = self.force_limits.get("defaults", {}) |
| 190 | + profile_name = defaults.get("social_touch_profile") |
| 191 | + if not profile_name: |
| 192 | + raise ValueError("force_limits.defaults.social_touch_profile is missing") |
| 193 | + return str(profile_name) |
| 194 | + |
| 195 | + def bound_force_profile_for_culture(self, profile_name: str = "default") -> str: |
| 196 | + profile = self.get_culture_profile(profile_name) |
| 197 | + bindings = profile.get("bindings") or {} |
| 198 | + if isinstance(bindings, dict) and bindings.get("force_profile"): |
| 199 | + return str(bindings["force_profile"]) |
| 200 | + |
| 201 | + defaults = self.culture_profiles.get("defaults", {}) |
| 202 | + default_bindings = defaults.get("bindings") or {} |
| 203 | + if isinstance(default_bindings, dict) and default_bindings.get("force_profile"): |
| 204 | + return str(default_bindings["force_profile"]) |
| 205 | + |
| 206 | + return self.default_force_profile_name() |
| 207 | + |
| 208 | + def build_consent_manager( |
| 209 | + self, |
| 210 | + *, |
| 211 | + culture_profile_name: str = "default", |
| 212 | + institutional_policy_enabled: bool = False, |
| 213 | + ) -> ConsentManager: |
| 214 | + defaults = self.culture_profiles.get("defaults", {}) |
| 215 | + profile = self.get_culture_profile(culture_profile_name) |
| 216 | + |
| 217 | + manager = ConsentManager() |
| 218 | + manager.set_profile_from_dict(profile, defaults=defaults if isinstance(defaults, dict) else None) |
| 219 | + manager.enable_institutional_policy(institutional_policy_enabled) |
| 220 | + return manager |
| 221 | + |
| 222 | + def build_contact_planner(self) -> ContactPlanner: |
| 223 | + return ContactPlanner(self.force_limits) |
| 224 | + |
| 225 | + def build_safety_gate( |
| 226 | + self, |
| 227 | + *, |
| 228 | + hw_iface: Optional[HardwareInterface] = None, |
| 229 | + active_profile: Optional[str] = None, |
| 230 | + ) -> SafetyGate: |
| 231 | + return SafetyGate( |
| 232 | + self.force_limits, |
| 233 | + hw_iface=hw_iface, |
| 234 | + active_profile=active_profile, |
| 235 | + ) |
| 236 | + |
| 237 | + def build_runtime_components( |
| 238 | + self, |
| 239 | + *, |
| 240 | + culture_profile_name: str = "default", |
| 241 | + institutional_policy_enabled: bool = False, |
| 242 | + hw_iface: Optional[HardwareInterface] = None, |
| 243 | + ) -> RuntimeComponentBundle: |
| 244 | + """ |
| 245 | + Construct the baseline runtime core objects using repository configs. |
| 246 | +
|
| 247 | + The safety gate's active force profile is selected from the culture |
| 248 | + profile binding when available so runtime wiring stays consistent. |
| 249 | + """ |
| 250 | + active_force_profile = self.bound_force_profile_for_culture(culture_profile_name) |
| 251 | + |
| 252 | + return RuntimeComponentBundle( |
| 253 | + consent_manager=self.build_consent_manager( |
| 254 | + culture_profile_name=culture_profile_name, |
| 255 | + institutional_policy_enabled=institutional_policy_enabled, |
| 256 | + ), |
| 257 | + contact_planner=self.build_contact_planner(), |
| 258 | + safety_gate=self.build_safety_gate( |
| 259 | + hw_iface=hw_iface, |
| 260 | + active_profile=active_force_profile, |
| 261 | + ), |
| 262 | + ) |
| 263 | + |
| 264 | + |
| 265 | +def _load_yaml_file(path: Path) -> dict[str, Any]: |
| 266 | + with path.open("r", encoding="utf-8") as handle: |
| 267 | + data = yaml.safe_load(handle) or {} |
| 268 | + if not isinstance(data, dict): |
| 269 | + raise ValueError(f"expected top-level mapping in YAML file: {path}") |
| 270 | + return data |
| 271 | + |
| 272 | + |
| 273 | +def _require_mapping(value: Any, label: str) -> None: |
| 274 | + if not isinstance(value, dict): |
| 275 | + raise ValueError(f"{label} must be a mapping") |
| 276 | + |
| 277 | + |
| 278 | +__all__ = [ |
| 279 | + "RuntimeComponentBundle", |
| 280 | + "RuntimeConfigBundle", |
| 281 | +] |
0 commit comments