Skip to content

Commit 563f1e2

Browse files
authored
Create state.py
1 parent 82201c3 commit 563f1e2

1 file changed

Lines changed: 278 additions & 0 deletions

File tree

src/ohip_runtime/state.py

Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
"""
2+
Runtime state models for IX-HapticSight.
3+
4+
This module introduces explicit runtime/session ownership structures without
5+
changing the existing protocol core in ``src/ohip``. The goal is to make
6+
interaction flow, fault latching, and runtime health visible before adding
7+
coordinator or backend code.
8+
9+
This module is intentionally backend-agnostic and ROS-free.
10+
"""
11+
12+
from __future__ import annotations
13+
14+
from dataclasses import dataclass, field
15+
from enum import Enum
16+
from time import time
17+
from typing import Optional
18+
19+
from ohip.schemas import SafetyLevel
20+
21+
22+
class InteractionState(str, Enum):
23+
"""
24+
High-level runtime interaction states.
25+
26+
These states are intentionally conservative and align with the repository's
27+
documented approach / verify / contact / retreat / safe-hold semantics.
28+
"""
29+
30+
IDLE = "IDLE"
31+
VERIFY = "VERIFY"
32+
APPROACH = "APPROACH"
33+
PRECONTACT = "PRECONTACT"
34+
CONTACT = "CONTACT"
35+
RETREAT = "RETREAT"
36+
SAFE_HOLD = "SAFE_HOLD"
37+
FAULT_LATCHED = "FAULT_LATCHED"
38+
39+
40+
class ExecutionState(str, Enum):
41+
"""
42+
Backend-facing execution states.
43+
44+
These are kept separate from interaction states so a reviewer can see
45+
whether the system is waiting on policy, executing motion, retreating,
46+
or faulted at the runtime layer.
47+
"""
48+
49+
IDLE = "IDLE"
50+
READY = "READY"
51+
EXECUTING = "EXECUTING"
52+
ABORTING = "ABORTING"
53+
RETREATING = "RETREATING"
54+
SAFE_HOLD = "SAFE_HOLD"
55+
FAULTED = "FAULTED"
56+
UNAVAILABLE = "UNAVAILABLE"
57+
58+
59+
class FaultSeverity(str, Enum):
60+
INFO = "INFO"
61+
DEGRADED = "DEGRADED"
62+
BLOCKING = "BLOCKING"
63+
ABORT = "ABORT"
64+
CRITICAL = "CRITICAL"
65+
66+
67+
class FaultDisposition(str, Enum):
68+
"""
69+
What the runtime is expected to do in response to a fault.
70+
"""
71+
72+
LOG_ONLY = "LOG_ONLY"
73+
NARROW_BEHAVIOR = "NARROW_BEHAVIOR"
74+
REJECT_ACTION = "REJECT_ACTION"
75+
ABORT = "ABORT"
76+
RETREAT = "RETREAT"
77+
SAFE_HOLD = "SAFE_HOLD"
78+
LATCH = "LATCH"
79+
80+
81+
class RuntimeHealth(str, Enum):
82+
NOMINAL = "NOMINAL"
83+
DEGRADED = "DEGRADED"
84+
BLOCKED = "BLOCKED"
85+
FAULTED = "FAULTED"
86+
87+
88+
@dataclass(frozen=True)
89+
class RuntimeFault:
90+
"""
91+
Structured runtime fault record.
92+
93+
This is not yet a full event-log object; it is a lightweight runtime model
94+
suitable for coordinators, tests, and future event serialization.
95+
"""
96+
97+
fault_id: str
98+
reason_code: str
99+
severity: FaultSeverity
100+
disposition: FaultDisposition
101+
source: str
102+
latched: bool = False
103+
requires_abort: bool = False
104+
requires_retreat: bool = False
105+
requires_safe_hold: bool = False
106+
created_at_utc_s: float = field(default_factory=time)
107+
details: str = ""
108+
109+
def blocks_new_actions(self) -> bool:
110+
return self.severity in {
111+
FaultSeverity.BLOCKING,
112+
FaultSeverity.ABORT,
113+
FaultSeverity.CRITICAL,
114+
} or self.latched
115+
116+
def runtime_health(self) -> RuntimeHealth:
117+
if self.latched or self.severity == FaultSeverity.CRITICAL:
118+
return RuntimeHealth.FAULTED
119+
if self.severity == FaultSeverity.ABORT:
120+
return RuntimeHealth.FAULTED
121+
if self.severity == FaultSeverity.BLOCKING:
122+
return RuntimeHealth.BLOCKED
123+
if self.severity == FaultSeverity.DEGRADED:
124+
return RuntimeHealth.DEGRADED
125+
return RuntimeHealth.NOMINAL
126+
127+
128+
@dataclass
129+
class SignalFreshness:
130+
"""
131+
Snapshot of whether safety-relevant signal classes are currently fresh.
132+
133+
The runtime will eventually derive this from actual sensing interfaces.
134+
For now, it provides a clean contract for coordinator logic and tests.
135+
"""
136+
137+
force_torque_fresh: bool = False
138+
tactile_fresh: bool = False
139+
proximity_fresh: bool = False
140+
thermal_fresh: bool = False
141+
scene_fresh: bool = False
142+
143+
def all_required_fresh(
144+
self,
145+
*,
146+
require_force_torque: bool = False,
147+
require_tactile: bool = False,
148+
require_proximity: bool = False,
149+
require_thermal: bool = False,
150+
require_scene: bool = False,
151+
) -> bool:
152+
checks = [
153+
(require_force_torque, self.force_torque_fresh),
154+
(require_tactile, self.tactile_fresh),
155+
(require_proximity, self.proximity_fresh),
156+
(require_thermal, self.thermal_fresh),
157+
(require_scene, self.scene_fresh),
158+
]
159+
return all(actual for required, actual in checks if required)
160+
161+
162+
@dataclass
163+
class InteractionSession:
164+
"""
165+
Runtime-owned interaction session snapshot.
166+
167+
This object is intentionally small and explicit. It does not replace the
168+
canonical OHIP schemas; it tracks the runtime's current authority, state,
169+
and safety posture for one interaction session.
170+
"""
171+
172+
session_id: str
173+
subject_id: Optional[str] = None
174+
interaction_state: InteractionState = InteractionState.IDLE
175+
execution_state: ExecutionState = ExecutionState.IDLE
176+
runtime_health: RuntimeHealth = RuntimeHealth.NOMINAL
177+
safety_level: SafetyLevel = SafetyLevel.YELLOW
178+
active_plan_id: Optional[str] = None
179+
active_fault: Optional[RuntimeFault] = None
180+
signal_freshness: SignalFreshness = field(default_factory=SignalFreshness)
181+
consent_valid: bool = False
182+
consent_fresh: bool = False
183+
last_transition_utc_s: float = field(default_factory=time)
184+
last_update_utc_s: float = field(default_factory=time)
185+
186+
def set_interaction_state(self, state: InteractionState) -> None:
187+
self.interaction_state = state
188+
now = time()
189+
self.last_transition_utc_s = now
190+
self.last_update_utc_s = now
191+
192+
def set_execution_state(self, state: ExecutionState) -> None:
193+
self.execution_state = state
194+
self.last_update_utc_s = time()
195+
196+
def apply_fault(self, fault: RuntimeFault) -> None:
197+
"""
198+
Apply a fault conservatively to the session snapshot.
199+
200+
This method updates runtime health and pushes the interaction /
201+
execution state toward safer, more explicit outcomes.
202+
"""
203+
self.active_fault = fault
204+
self.runtime_health = fault.runtime_health()
205+
self.last_update_utc_s = time()
206+
207+
if fault.latched or fault.disposition == FaultDisposition.LATCH:
208+
self.interaction_state = InteractionState.FAULT_LATCHED
209+
self.execution_state = ExecutionState.FAULTED
210+
self.last_transition_utc_s = self.last_update_utc_s
211+
return
212+
213+
if fault.requires_safe_hold or fault.disposition == FaultDisposition.SAFE_HOLD:
214+
self.interaction_state = InteractionState.SAFE_HOLD
215+
self.execution_state = ExecutionState.SAFE_HOLD
216+
self.last_transition_utc_s = self.last_update_utc_s
217+
return
218+
219+
if fault.requires_retreat or fault.disposition == FaultDisposition.RETREAT:
220+
self.interaction_state = InteractionState.RETREAT
221+
self.execution_state = ExecutionState.RETREATING
222+
self.last_transition_utc_s = self.last_update_utc_s
223+
return
224+
225+
if fault.requires_abort or fault.disposition == FaultDisposition.ABORT:
226+
self.execution_state = ExecutionState.ABORTING
227+
self.last_transition_utc_s = self.last_update_utc_s
228+
return
229+
230+
def clear_non_latched_fault(self) -> bool:
231+
"""
232+
Clear the active fault only when it is not latched.
233+
234+
Returns True if a fault was cleared.
235+
"""
236+
if self.active_fault is None:
237+
return False
238+
if self.active_fault.latched:
239+
return False
240+
self.active_fault = None
241+
self.runtime_health = RuntimeHealth.NOMINAL
242+
self.last_update_utc_s = time()
243+
return True
244+
245+
def can_begin_approach(self) -> bool:
246+
"""
247+
Conservative gate for beginning an approach action.
248+
249+
This does not replace the consent manager or safety gate. It provides
250+
a runtime-side summary check that higher-level coordinators can use.
251+
"""
252+
if self.active_fault and self.active_fault.blocks_new_actions():
253+
return False
254+
if not self.consent_valid or not self.consent_fresh:
255+
return False
256+
if self.safety_level == SafetyLevel.RED:
257+
return False
258+
if self.runtime_health in {RuntimeHealth.BLOCKED, RuntimeHealth.FAULTED}:
259+
return False
260+
return self.interaction_state in {
261+
InteractionState.IDLE,
262+
InteractionState.VERIFY,
263+
}
264+
265+
def mark_updated(self) -> None:
266+
self.last_update_utc_s = time()
267+
268+
269+
__all__ = [
270+
"InteractionState",
271+
"ExecutionState",
272+
"FaultSeverity",
273+
"FaultDisposition",
274+
"RuntimeHealth",
275+
"RuntimeFault",
276+
"SignalFreshness",
277+
"InteractionSession",
278+
]

0 commit comments

Comments
 (0)