Skip to content

Commit 1ea86db

Browse files
authored
Create signal_health.py
1 parent 30fa87e commit 1ea86db

1 file changed

Lines changed: 178 additions & 0 deletions

File tree

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
"""
2+
Signal health and freshness models for IX-HapticSight.
3+
4+
This module defines backend-agnostic metadata structures that can be shared by:
5+
- force/torque interfaces
6+
- tactile interfaces
7+
- proximity interfaces
8+
- thermal interfaces
9+
- future scene-perception interfaces
10+
11+
The goal is to make signal trust explicit before those signals are consumed by
12+
runtime coordination or safety logic.
13+
"""
14+
15+
from __future__ import annotations
16+
17+
from dataclasses import dataclass, field
18+
from enum import Enum
19+
from time import time
20+
from typing import Optional
21+
22+
23+
class SignalHealth(str, Enum):
24+
"""
25+
Health classification for one normalized signal source.
26+
"""
27+
28+
NOMINAL = "NOMINAL"
29+
DEGRADED = "DEGRADED"
30+
INVALID = "INVALID"
31+
UNAVAILABLE = "UNAVAILABLE"
32+
33+
34+
class SignalSourceMode(str, Enum):
35+
"""
36+
Source labeling to prevent confusion between live, replay, and simulated data.
37+
"""
38+
39+
LIVE = "LIVE"
40+
REPLAY = "REPLAY"
41+
SIMULATION = "SIMULATION"
42+
BENCHMARK = "BENCHMARK"
43+
UNKNOWN = "UNKNOWN"
44+
45+
46+
@dataclass(frozen=True)
47+
class FreshnessPolicy:
48+
"""
49+
Freshness expectations for a signal stream.
50+
51+
`max_age_ms` is the maximum acceptable age for the most recent sample.
52+
When `required` is False, staleness may narrow behavior rather than block it.
53+
"""
54+
55+
max_age_ms: int
56+
required: bool = True
57+
58+
def is_fresh(self, *, sample_timestamp_utc_s: float, now_utc_s: Optional[float] = None) -> bool:
59+
now = float(now_utc_s if now_utc_s is not None else time())
60+
age_ms = (now - float(sample_timestamp_utc_s)) * 1000.0
61+
return age_ms <= float(self.max_age_ms)
62+
63+
64+
@dataclass(frozen=True)
65+
class SignalQuality:
66+
"""
67+
Quality metadata for a normalized signal sample.
68+
69+
This structure is intentionally generic so multiple sensing modalities can
70+
reuse it without inventing separate freshness or health semantics.
71+
"""
72+
73+
source_mode: SignalSourceMode
74+
health: SignalHealth
75+
sample_timestamp_utc_s: float
76+
received_timestamp_utc_s: float = field(default_factory=time)
77+
sequence_id: Optional[int] = None
78+
source_name: str = ""
79+
frame: str = ""
80+
note: str = ""
81+
82+
def age_ms(self, *, now_utc_s: Optional[float] = None) -> float:
83+
now = float(now_utc_s if now_utc_s is not None else time())
84+
return max(0.0, (now - float(self.sample_timestamp_utc_s)) * 1000.0)
85+
86+
def transport_latency_ms(self) -> float:
87+
return max(0.0, (float(self.received_timestamp_utc_s) - float(self.sample_timestamp_utc_s)) * 1000.0)
88+
89+
def is_fresh(self, policy: FreshnessPolicy, *, now_utc_s: Optional[float] = None) -> bool:
90+
return policy.is_fresh(
91+
sample_timestamp_utc_s=self.sample_timestamp_utc_s,
92+
now_utc_s=now_utc_s,
93+
)
94+
95+
def is_usable(self, policy: Optional[FreshnessPolicy] = None, *, now_utc_s: Optional[float] = None) -> bool:
96+
if self.health in {SignalHealth.INVALID, SignalHealth.UNAVAILABLE}:
97+
return False
98+
if policy is None:
99+
return self.health in {SignalHealth.NOMINAL, SignalHealth.DEGRADED}
100+
fresh = self.is_fresh(policy, now_utc_s=now_utc_s)
101+
if policy.required:
102+
return fresh and self.health in {SignalHealth.NOMINAL, SignalHealth.DEGRADED}
103+
return self.health in {SignalHealth.NOMINAL, SignalHealth.DEGRADED}
104+
105+
def freshness_summary(
106+
self,
107+
policy: FreshnessPolicy,
108+
*,
109+
now_utc_s: Optional[float] = None,
110+
) -> dict[str, object]:
111+
return {
112+
"source_mode": self.source_mode.value,
113+
"health": self.health.value,
114+
"sample_timestamp_utc_s": float(self.sample_timestamp_utc_s),
115+
"received_timestamp_utc_s": float(self.received_timestamp_utc_s),
116+
"age_ms": self.age_ms(now_utc_s=now_utc_s),
117+
"max_age_ms": int(policy.max_age_ms),
118+
"required": bool(policy.required),
119+
"fresh": bool(self.is_fresh(policy, now_utc_s=now_utc_s)),
120+
"usable": bool(self.is_usable(policy, now_utc_s=now_utc_s)),
121+
"sequence_id": self.sequence_id,
122+
"source_name": self.source_name,
123+
"frame": self.frame,
124+
"note": self.note,
125+
}
126+
127+
128+
@dataclass(frozen=True)
129+
class MultiSignalFreshness:
130+
"""
131+
Compact multi-modality freshness summary for runtime checks.
132+
133+
This is a generic interface-side summary and does not replace the runtime's
134+
richer SignalFreshness model. It exists so interface layers can expose a
135+
normalized freshness snapshot before runtime code consumes signal payloads.
136+
"""
137+
138+
force_torque: bool = False
139+
tactile: bool = False
140+
proximity: bool = False
141+
thermal: bool = False
142+
scene: bool = False
143+
144+
def all_required(
145+
self,
146+
*,
147+
require_force_torque: bool = False,
148+
require_tactile: bool = False,
149+
require_proximity: bool = False,
150+
require_thermal: bool = False,
151+
require_scene: bool = False,
152+
) -> bool:
153+
checks = [
154+
(require_force_torque, self.force_torque),
155+
(require_tactile, self.tactile),
156+
(require_proximity, self.proximity),
157+
(require_thermal, self.thermal),
158+
(require_scene, self.scene),
159+
]
160+
return all(actual for required, actual in checks if required)
161+
162+
def any_available(self) -> bool:
163+
return any([
164+
self.force_torque,
165+
self.tactile,
166+
self.proximity,
167+
self.thermal,
168+
self.scene,
169+
])
170+
171+
172+
__all__ = [
173+
"SignalHealth",
174+
"SignalSourceMode",
175+
"FreshnessPolicy",
176+
"SignalQuality",
177+
"MultiSignalFreshness",
178+
]

0 commit comments

Comments
 (0)