Skip to content

Commit 2702e3b

Browse files
authored
Add replay helpers for IX-HapticSight event logs
This module provides a backend-agnostic replay layer for IX-HapticSight structured event logs, supporting deterministic event ordering and filtering by various criteria.
1 parent b0cb8f2 commit 2702e3b

1 file changed

Lines changed: 270 additions & 0 deletions

File tree

src/ohip_logging/replay.py

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
"""
2+
Replay helpers for IX-HapticSight structured event logs.
3+
4+
This module provides a small, backend-agnostic replay layer built on top of the
5+
JSONL event format defined in `ohip_logging.jsonl`.
6+
7+
Design goals:
8+
- deterministic event ordering
9+
- easy filtering by session, request, or event kind
10+
- no confusion between live runtime state and replay artifacts
11+
- lightweight enough for tests, local debugging, and future benchmark runners
12+
"""
13+
14+
from __future__ import annotations
15+
16+
from dataclasses import dataclass
17+
from pathlib import Path
18+
from typing import Callable, Iterable, Iterator, Optional, Sequence
19+
20+
from .events import EventKind, EventRecord
21+
from .jsonl import load_event_log
22+
23+
24+
ReplayPredicate = Callable[[EventRecord], bool]
25+
26+
27+
@dataclass(frozen=True)
28+
class ReplayCursor:
29+
"""
30+
Lightweight position marker for replay iteration.
31+
"""
32+
33+
index: int = 0
34+
35+
36+
@dataclass(frozen=True)
37+
class ReplaySlice:
38+
"""
39+
A filtered replayable slice of events.
40+
41+
This is useful for:
42+
- one session
43+
- one request
44+
- one benchmark marker range
45+
- one event kind subset
46+
"""
47+
48+
name: str
49+
events: tuple[EventRecord, ...]
50+
51+
def __len__(self) -> int:
52+
return len(self.events)
53+
54+
def first(self) -> Optional[EventRecord]:
55+
return self.events[0] if self.events else None
56+
57+
def last(self) -> Optional[EventRecord]:
58+
return self.events[-1] if self.events else None
59+
60+
def kinds(self) -> list[str]:
61+
seen: list[str] = []
62+
for event in self.events:
63+
value = event.kind.value
64+
if value not in seen:
65+
seen.append(value)
66+
return seen
67+
68+
def session_ids(self) -> list[str]:
69+
seen: list[str] = []
70+
for event in self.events:
71+
if event.session_id and event.session_id not in seen:
72+
seen.append(event.session_id)
73+
return seen
74+
75+
def request_ids(self) -> list[str]:
76+
seen: list[str] = []
77+
for event in self.events:
78+
if event.request_id and event.request_id not in seen:
79+
seen.append(event.request_id)
80+
return seen
81+
82+
def to_list(self) -> list[EventRecord]:
83+
return list(self.events)
84+
85+
86+
class EventReplay:
87+
"""
88+
Deterministic in-memory replay view of a structured event sequence.
89+
90+
This class does not simulate physics or command execution. Its purpose is to
91+
replay the event trail itself in a predictable way for:
92+
- audit
93+
- debugging
94+
- benchmark comparisons
95+
- integration tests
96+
"""
97+
98+
def __init__(self, events: Sequence[EventRecord], *, source_label: str = "") -> None:
99+
self._events: tuple[EventRecord, ...] = tuple(events)
100+
self._source_label = source_label
101+
102+
@classmethod
103+
def from_jsonl(cls, path: str | Path) -> "EventReplay":
104+
replay_path = Path(path)
105+
return cls(load_event_log(replay_path), source_label=str(replay_path))
106+
107+
@property
108+
def source_label(self) -> str:
109+
return self._source_label
110+
111+
def __len__(self) -> int:
112+
return len(self._events)
113+
114+
def __iter__(self) -> Iterator[EventRecord]:
115+
return iter(self._events)
116+
117+
def all(self) -> list[EventRecord]:
118+
return list(self._events)
119+
120+
def first(self) -> Optional[EventRecord]:
121+
return self._events[0] if self._events else None
122+
123+
def last(self) -> Optional[EventRecord]:
124+
return self._events[-1] if self._events else None
125+
126+
def at(self, index: int) -> EventRecord:
127+
return self._events[index]
128+
129+
def next_from(self, cursor: ReplayCursor) -> tuple[Optional[EventRecord], ReplayCursor]:
130+
"""
131+
Return the next event and the advanced cursor.
132+
133+
If the cursor is at or beyond the end, returns (None, same cursor).
134+
"""
135+
if cursor.index < 0:
136+
raise ValueError("cursor index must be non-negative")
137+
if cursor.index >= len(self._events):
138+
return None, cursor
139+
event = self._events[cursor.index]
140+
return event, ReplayCursor(index=cursor.index + 1)
141+
142+
def filter(self, predicate: ReplayPredicate, *, name: str = "filtered") -> ReplaySlice:
143+
return ReplaySlice(
144+
name=name,
145+
events=tuple(event for event in self._events if predicate(event)),
146+
)
147+
148+
def by_session(self, session_id: str) -> ReplaySlice:
149+
return self.filter(
150+
lambda event: event.session_id == session_id,
151+
name=f"session:{session_id}",
152+
)
153+
154+
def by_request(self, request_id: str) -> ReplaySlice:
155+
return self.filter(
156+
lambda event: event.request_id == request_id,
157+
name=f"request:{request_id}",
158+
)
159+
160+
def by_kind(self, *kinds: EventKind | str) -> ReplaySlice:
161+
normalized = {
162+
kind.value if isinstance(kind, EventKind) else str(kind)
163+
for kind in kinds
164+
}
165+
return self.filter(
166+
lambda event: event.kind.value in normalized,
167+
name="kind:" + ",".join(sorted(normalized)),
168+
)
169+
170+
def between_event_ids(
171+
self,
172+
start_event_id: str,
173+
end_event_id: str,
174+
*,
175+
include_end: bool = True,
176+
name: str = "",
177+
) -> ReplaySlice:
178+
"""
179+
Return the contiguous event slice between two event IDs.
180+
181+
Raises ValueError if either boundary is missing or if the order is invalid.
182+
"""
183+
start_index = None
184+
end_index = None
185+
186+
for idx, event in enumerate(self._events):
187+
if start_index is None and event.event_id == start_event_id:
188+
start_index = idx
189+
if event.event_id == end_event_id:
190+
end_index = idx
191+
if start_index is not None:
192+
break
193+
194+
if start_index is None:
195+
raise ValueError(f"unknown start_event_id: {start_event_id}")
196+
if end_index is None:
197+
raise ValueError(f"unknown end_event_id: {end_event_id}")
198+
if end_index < start_index:
199+
raise ValueError("end_event_id appears before start_event_id")
200+
201+
stop = end_index + 1 if include_end else end_index
202+
label = name or f"range:{start_event_id}..{end_event_id}"
203+
return ReplaySlice(
204+
name=label,
205+
events=self._events[start_index:stop],
206+
)
207+
208+
def benchmark_markers(self) -> ReplaySlice:
209+
return self.by_kind(EventKind.BENCHMARK_MARKER)
210+
211+
def replay_markers(self) -> ReplaySlice:
212+
return self.by_kind(EventKind.REPLAY_MARKER)
213+
214+
def session_ids(self) -> list[str]:
215+
seen: list[str] = []
216+
for event in self._events:
217+
if event.session_id and event.session_id not in seen:
218+
seen.append(event.session_id)
219+
return seen
220+
221+
def request_ids(self) -> list[str]:
222+
seen: list[str] = []
223+
for event in self._events:
224+
if event.request_id and event.request_id not in seen:
225+
seen.append(event.request_id)
226+
return seen
227+
228+
def summary(self) -> dict[str, object]:
229+
kind_counts: dict[str, int] = {}
230+
for event in self._events:
231+
key = event.kind.value
232+
kind_counts[key] = kind_counts.get(key, 0) + 1
233+
234+
return {
235+
"source_label": self._source_label,
236+
"event_count": len(self._events),
237+
"session_ids": self.session_ids(),
238+
"request_ids": self.request_ids(),
239+
"kind_counts": kind_counts,
240+
"first_event_id": self.first().event_id if self.first() else None,
241+
"last_event_id": self.last().event_id if self.last() else None,
242+
}
243+
244+
245+
def merge_replay_streams(
246+
streams: Iterable[Sequence[EventRecord]],
247+
*,
248+
source_label: str = "merged",
249+
) -> EventReplay:
250+
"""
251+
Merge multiple already-ordered event sequences into one replay stream.
252+
253+
Ordering rule:
254+
- primary sort by created_at_utc_s
255+
- secondary sort by event_id for deterministic tie-breaking
256+
"""
257+
merged: list[EventRecord] = []
258+
for stream in streams:
259+
merged.extend(stream)
260+
261+
merged.sort(key=lambda event: (float(event.created_at_utc_s), event.event_id))
262+
return EventReplay(merged, source_label=source_label)
263+
264+
265+
__all__ = [
266+
"ReplayCursor",
267+
"ReplaySlice",
268+
"EventReplay",
269+
"merge_replay_streams",
270+
]

0 commit comments

Comments
 (0)