Skip to content

Commit af6a223

Browse files
authored
Create proximity.py
1 parent eb1cb6a commit af6a223

1 file changed

Lines changed: 201 additions & 0 deletions

File tree

src/ohip_interfaces/proximity.py

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
"""
2+
Proximity interface models for IX-HapticSight.
3+
4+
This module defines backend-agnostic normalized structures for short-range
5+
proximity sensing. The goal is to expose near-contact and corridor-clearance
6+
state in a form that runtime safety and contact-governance logic can reason
7+
about without depending on device-specific payloads.
8+
9+
This module does not talk to hardware directly.
10+
It defines normalized payloads that adapters or simulators should emit.
11+
"""
12+
13+
from __future__ import annotations
14+
15+
from dataclasses import dataclass, field
16+
from typing import Iterable, Optional
17+
18+
from ohip.schemas import Vector3
19+
20+
from .signal_health import FreshnessPolicy, SignalQuality
21+
22+
23+
@dataclass(frozen=True)
24+
class ProximityReturn:
25+
"""
26+
One normalized proximity return.
27+
28+
Fields:
29+
- `zone_id`: logical sensing zone or ray identifier
30+
- `distance_mm`: measured distance to nearest return
31+
- `direction_xyz`: normalized or device-defined sensing direction
32+
- `point_xyz`: optional nearest-point estimate in the named frame
33+
- `confidence`: optional [0, 1] confidence-like score from the adapter
34+
"""
35+
36+
zone_id: str
37+
distance_mm: float
38+
direction_xyz: Vector3
39+
point_xyz: Optional[Vector3] = None
40+
confidence: float = 1.0
41+
42+
def to_dict(self) -> dict:
43+
return {
44+
"zone_id": self.zone_id,
45+
"distance_mm": float(self.distance_mm),
46+
"direction_xyz": self.direction_xyz.as_list(),
47+
"point_xyz": self.point_xyz.as_list() if self.point_xyz is not None else None,
48+
"confidence": float(self.confidence),
49+
}
50+
51+
52+
@dataclass(frozen=True)
53+
class ProximityFrame:
54+
"""
55+
Normalized proximity frame for one sensing surface, ring, or viewpoint.
56+
57+
This representation supports multiple per-zone returns rather than a raw
58+
vendor-specific array layout.
59+
"""
60+
61+
sensor_name: str
62+
frame: str
63+
quality: SignalQuality
64+
returns: tuple[ProximityReturn, ...] = field(default_factory=tuple)
65+
66+
def return_count(self) -> int:
67+
return len(self.returns)
68+
69+
def has_returns(self) -> bool:
70+
return self.return_count() > 0
71+
72+
def min_distance_mm(self) -> Optional[float]:
73+
if not self.returns:
74+
return None
75+
return float(min(ret.distance_mm for ret in self.returns))
76+
77+
def max_distance_mm(self) -> Optional[float]:
78+
if not self.returns:
79+
return None
80+
return float(max(ret.distance_mm for ret in self.returns))
81+
82+
def nearest_return(self) -> Optional[ProximityReturn]:
83+
if not self.returns:
84+
return None
85+
return min(self.returns, key=lambda ret: ret.distance_mm)
86+
87+
def is_fresh(self, policy: FreshnessPolicy, *, now_utc_s: Optional[float] = None) -> bool:
88+
return self.quality.is_fresh(policy, now_utc_s=now_utc_s)
89+
90+
def is_usable(self, policy: Optional[FreshnessPolicy] = None, *, now_utc_s: Optional[float] = None) -> bool:
91+
return self.quality.is_usable(policy, now_utc_s=now_utc_s)
92+
93+
def to_dict(self) -> dict:
94+
return {
95+
"sensor_name": self.sensor_name,
96+
"frame": self.frame,
97+
"quality": self.quality.freshness_summary(
98+
FreshnessPolicy(max_age_ms=0, required=False),
99+
now_utc_s=self.quality.sample_timestamp_utc_s,
100+
),
101+
"returns": [ret.to_dict() for ret in self.returns],
102+
}
103+
104+
105+
@dataclass(frozen=True)
106+
class ProximityAssessment:
107+
"""
108+
Compact proximity summary suitable for runtime safety and pre-contact checks.
109+
"""
110+
111+
object_detected: bool
112+
near_contact: bool
113+
corridor_clear: bool
114+
return_count: int
115+
nearest_distance_mm: Optional[float]
116+
caution_distance_mm: float
117+
stop_distance_mm: float
118+
119+
def to_dict(self) -> dict:
120+
return {
121+
"object_detected": bool(self.object_detected),
122+
"near_contact": bool(self.near_contact),
123+
"corridor_clear": bool(self.corridor_clear),
124+
"return_count": int(self.return_count),
125+
"nearest_distance_mm": None if self.nearest_distance_mm is None else float(self.nearest_distance_mm),
126+
"caution_distance_mm": float(self.caution_distance_mm),
127+
"stop_distance_mm": float(self.stop_distance_mm),
128+
}
129+
130+
131+
def make_proximity_return(
132+
*,
133+
zone_id: str,
134+
distance_mm: float,
135+
direction_xyz: Iterable[float],
136+
point_xyz: Optional[Iterable[float]] = None,
137+
confidence: float = 1.0,
138+
) -> ProximityReturn:
139+
direction = list(direction_xyz)
140+
point = list(point_xyz) if point_xyz is not None else None
141+
142+
if len(direction) != 3:
143+
raise ValueError("direction_xyz must contain exactly 3 elements")
144+
if point is not None and len(point) != 3:
145+
raise ValueError("point_xyz must contain exactly 3 elements when provided")
146+
if distance_mm < 0.0:
147+
raise ValueError("distance_mm must be non-negative")
148+
if not (0.0 <= float(confidence) <= 1.0):
149+
raise ValueError("confidence must be between 0.0 and 1.0")
150+
151+
return ProximityReturn(
152+
zone_id=str(zone_id),
153+
distance_mm=float(distance_mm),
154+
direction_xyz=Vector3.from_list(direction),
155+
point_xyz=Vector3.from_list(point) if point is not None else None,
156+
confidence=float(confidence),
157+
)
158+
159+
160+
def assess_proximity(
161+
proximity_frame: ProximityFrame,
162+
*,
163+
caution_distance_mm: float = 120.0,
164+
stop_distance_mm: float = 40.0,
165+
) -> ProximityAssessment:
166+
"""
167+
Derive a compact proximity assessment from a normalized proximity frame.
168+
169+
Semantics:
170+
- `object_detected`: there is at least one return
171+
- `near_contact`: nearest distance is at or below caution distance
172+
- `corridor_clear`: no return is at or below stop distance
173+
"""
174+
if caution_distance_mm < 0.0:
175+
raise ValueError("caution_distance_mm must be non-negative")
176+
if stop_distance_mm < 0.0:
177+
raise ValueError("stop_distance_mm must be non-negative")
178+
if stop_distance_mm > caution_distance_mm:
179+
raise ValueError("stop_distance_mm must be <= caution_distance_mm")
180+
181+
nearest = proximity_frame.min_distance_mm()
182+
object_detected = nearest is not None
183+
184+
return ProximityAssessment(
185+
object_detected=object_detected,
186+
near_contact=bool(object_detected and nearest <= float(caution_distance_mm)),
187+
corridor_clear=bool((nearest is None) or (nearest > float(stop_distance_mm))),
188+
return_count=proximity_frame.return_count(),
189+
nearest_distance_mm=None if nearest is None else float(nearest),
190+
caution_distance_mm=float(caution_distance_mm),
191+
stop_distance_mm=float(stop_distance_mm),
192+
)
193+
194+
195+
__all__ = [
196+
"ProximityReturn",
197+
"ProximityFrame",
198+
"ProximityAssessment",
199+
"make_proximity_return",
200+
"assess_proximity",
201+
]

0 commit comments

Comments
 (0)