Skip to content

Commit d33b0e6

Browse files
committed
Measure live telemetry freshness at frame receipt instead of the producer clock
Pecan showed STOPPED on every data row whenever the car/ECU clock was skewed from the viewing browser. getFrequency anchored its 2-second liveness window to the browser's Date.now() while frames carry the car's absolute timestamps, so a clock skew greater than the window dropped every frame outside it and reported 0 Hz even while data streamed. Track each message buffer's wall-clock receipt time (lastReceivedAt) separately from the producer timestamp, and anchor the live window to newest-sample timestamp plus the time elapsed since it arrived. Steady skew now cancels out so a streaming feed reads live, while the window still empties to 0 Hz once frames genuinely stop. Replay path is unchanged. Adds regression tests for both cases.
1 parent 7b0e2ea commit d33b0e6

2 files changed

Lines changed: 67 additions & 2 deletions

File tree

pecan/src/lib/DataStore.test.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,51 @@ describe('DataStore', () => {
7171
expect(dataStore.getFrequency(msgID, FREQUENCY_WINDOW_MS)).toBe(0.0);
7272
});
7373

74+
it('reports live frequency when producer timestamps lag the browser clock (clock skew)', () => {
75+
// Regression: the car/ECU stamps frames with its own absolute epoch-ms clock.
76+
// If that clock runs behind this browser by more than the freshness window,
77+
// every frame is >2s "old" by wall clock even though data is streaming live.
78+
const browserNow = Date.now();
79+
const SKEW = 30_000; // car clock is 30s behind the browser
80+
const msgID = '0xSKEW';
81+
82+
// 5 frames arriving in real time, each carrying a producer timestamp 30s in the past.
83+
for (let i = 0; i < 5; i++) {
84+
vi.setSystemTime(browserNow + i * 200); // wall-clock receipt time
85+
dataStore.ingestMessage({
86+
msgID,
87+
messageName: 'Skewed',
88+
data: {},
89+
rawData: '00',
90+
timestamp: browserNow + i * 200 - SKEW, // producer clock, lagging
91+
});
92+
}
93+
94+
// Frames are flowing → must read as live, not STOPPED (hz 0).
95+
expect(dataStore.getFrequency(msgID, FREQUENCY_WINDOW_MS)).toBeGreaterThan(0);
96+
});
97+
98+
it('still reports STOPPED (0 Hz) when a skewed feed genuinely stops', () => {
99+
const browserNow = Date.now();
100+
const SKEW = 30_000;
101+
const msgID = '0xSKEWSTOP';
102+
103+
for (let i = 0; i < 5; i++) {
104+
vi.setSystemTime(browserNow + i * 200);
105+
dataStore.ingestMessage({
106+
msgID,
107+
messageName: 'Skewed',
108+
data: {},
109+
rawData: '00',
110+
timestamp: browserNow + i * 200 - SKEW,
111+
});
112+
}
113+
114+
// No new frames for 3 wall-clock seconds — the feed has truly stopped.
115+
vi.setSystemTime(browserNow + 800 + 3000);
116+
expect(dataStore.getFrequency(msgID, FREQUENCY_WINDOW_MS)).toBe(0);
117+
});
118+
74119
it('should handle multiple message IDs independently', () => {
75120
const now = Date.now();
76121
vi.setSystemTime(now);

pecan/src/lib/DataStore.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ export interface TelemetrySample {
5757
interface MessageBuffer {
5858
samples: TelemetrySample[];
5959
lastUpdated: number;
60+
// Wall-clock time (Date.now()) when the newest sample was ingested. Tracked
61+
// separately from sample.timestamp (the producer's clock) so that liveness is
62+
// measured at receipt — immune to clock skew between the car/ECU and this browser.
63+
lastReceivedAt: number;
6064
}
6165

6266
export type TelemetrySource = "live" | "replay";
@@ -551,6 +555,9 @@ class DataStore {
551555
preserveTimestamp?: boolean;
552556
source?: TelemetrySource;
553557
}): void {
558+
// Wall-clock receipt time, captured before any timestamp rewriting below.
559+
const receivedAt = Date.now();
560+
554561
// Fix for old timestamps from recorded data
555562
// If timestamp is more than 1 hour old, use current time
556563
let timestamp = message.timestamp || Date.now();
@@ -597,6 +604,7 @@ class DataStore {
597604
buffers.byMsgId.set(msgID, {
598605
samples: [],
599606
lastUpdated: timestamp,
607+
lastReceivedAt: receivedAt,
600608
});
601609
}
602610

@@ -605,6 +613,7 @@ class DataStore {
605613
// Add new sample
606614
messageBuffer.samples.push(sample);
607615
messageBuffer.lastUpdated = timestamp;
616+
messageBuffer.lastReceivedAt = receivedAt;
608617

609618
// Prune old samples (rolling window)
610619
this.pruneOldSamples(msgID, source);
@@ -644,6 +653,9 @@ class DataStore {
644653

645654
let newestTimestampBySource: Partial<Record<TelemetrySource, number>> = {};
646655

656+
// Wall-clock receipt time for this batch, captured before timestamp rewriting.
657+
const receivedAt = Date.now();
658+
647659
for (const message of messages) {
648660
// Fix for old timestamps from recorded data unless explicitly preserved.
649661
let timestamp = message.timestamp || Date.now();
@@ -684,12 +696,14 @@ class DataStore {
684696
buffers.byMsgId.set(sample.msgID, {
685697
samples: [],
686698
lastUpdated: timestamp,
699+
lastReceivedAt: receivedAt,
687700
});
688701
}
689702

690703
const messageBuffer = buffers.byMsgId.get(sample.msgID)!;
691704
messageBuffer.samples.push(sample);
692705
messageBuffer.lastUpdated = timestamp;
706+
messageBuffer.lastReceivedAt = receivedAt;
693707
this.pruneOldSamples(sample.msgID, source);
694708

695709
buffers.trace.push(sample);
@@ -928,9 +942,15 @@ class DataStore {
928942
const messageBuffer = this.getSourceBuffers(source).byMsgId.get(msgID);
929943
if (!messageBuffer || messageBuffer.samples.length === 0) return 0;
930944

945+
const newestTimestamp = messageBuffer.samples[messageBuffer.samples.length - 1].timestamp;
946+
// Live "now" is anchored to the newest sample's producer timestamp plus the
947+
// wall-clock time elapsed since it arrived — i.e. liveness is measured at
948+
// receipt, not against the car/ECU clock. This keeps a streaming feed from
949+
// reading STOPPED when the producer's clock is skewed from the browser, while
950+
// still letting the window empty (→ 0 Hz) once frames genuinely stop arriving.
931951
const now = source === "replay"
932-
? messageBuffer.samples[messageBuffer.samples.length - 1].timestamp
933-
: Date.now();
952+
? newestTimestamp
953+
: newestTimestamp + (Date.now() - messageBuffer.lastReceivedAt);
934954
const cutoffTime = now - windowMs;
935955
const startIdx = binarySearchFirstGte(messageBuffer.samples, cutoffTime);
936956
const count = messageBuffer.samples.length - startIdx;

0 commit comments

Comments
 (0)