Skip to content

Commit ef3fb80

Browse files
paddymulclaude
andauthored
feat(js): StateOrchestrator for JS-driven progressive-stats protocol (#810)
Phase 3 of plans/js-driven-stat-debounce.md. Client-side scheduler that owns the timing for the progressive-stats protocol. Server-side plumbing is #809 (Phase 2); wiring this into BuckarooView is Phase 4. Responsibilities of the orchestrator: - Bump a state token on every user-initiated state change. - Ship the ``state_change`` message immediately (server replies with cheap scalar stats). - Schedule per-scope ``compute_stat_group`` requests behind an adaptive debounce. Default: 2× the last observed aggregate compute time for that scope, clamped to [minDebounceMs, maxDebounceMs]. - Back-to-back state_changes cancel pending aggregate timers and restart the debounce — rapid typing produces exactly one aggregate request, with the latest token. - Stale ``stat_group_result`` messages (mismatched token) are silently dropped. Transport-agnostic — takes any ``{ send(s: string): void }`` so the existing buckaroo WS connection fits without changes. 12 jest tests pin: token semantics, debounce timing, back-to-back cancellation, rapid-typing single-request guarantee, stale-result drop, min/max debounce clamping, seeded baselines, dispose cleanup, multi-scope scheduling. Not yet wired into ``BuckarooView`` — that's a separate PR (Phase 4) once the server WS handler from #809 is in place. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 0426989 commit ef3fb80

2 files changed

Lines changed: 341 additions & 0 deletions

File tree

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
import { StateOrchestrator, WsLike } from "./StateOrchestrator";
2+
3+
class FakeWs implements WsLike {
4+
sent: string[] = [];
5+
send(message: string): void {
6+
this.sent.push(message);
7+
}
8+
last(): Record<string, unknown> {
9+
return JSON.parse(this.sent[this.sent.length - 1]);
10+
}
11+
byType(type: string): Record<string, unknown>[] {
12+
return this.sent
13+
.map((s) => JSON.parse(s))
14+
.filter((m) => m.type === type);
15+
}
16+
}
17+
18+
describe("StateOrchestrator", () => {
19+
let ws: FakeWs;
20+
let orch: StateOrchestrator;
21+
22+
beforeEach(() => {
23+
jest.useFakeTimers();
24+
ws = new FakeWs();
25+
orch = new StateOrchestrator({ ws, minDebounceMs: 10, maxDebounceMs: 5000 });
26+
});
27+
28+
afterEach(() => {
29+
orch.dispose();
30+
jest.useRealTimers();
31+
});
32+
33+
it("starts with token 0", () => {
34+
expect(orch.currentToken).toBe(0);
35+
});
36+
37+
it("onStateChange bumps token and ships state_change", () => {
38+
orch.onStateChange({ search_string: "x" });
39+
expect(orch.currentToken).toBe(1);
40+
const msg = ws.last();
41+
expect(msg.type).toBe("state_change");
42+
expect(msg.state_token).toBe(1);
43+
expect((msg.new_state as Record<string, unknown>).search_string).toBe("x");
44+
});
45+
46+
it("schedules compute_stat_group after the debounce", () => {
47+
orch.onStateChange({ search_string: "PIZZA" });
48+
// Only the immediate state_change has been sent so far.
49+
expect(ws.byType("compute_stat_group")).toHaveLength(0);
50+
51+
// Default baseline is 500ms × 2× = 1000ms; clamped to maxDebounceMs=5000.
52+
jest.advanceTimersByTime(999);
53+
expect(ws.byType("compute_stat_group")).toHaveLength(0);
54+
55+
jest.advanceTimersByTime(1);
56+
const reqs = ws.byType("compute_stat_group");
57+
expect(reqs).toHaveLength(1);
58+
expect(reqs[0].scope).toBe("filt");
59+
expect(reqs[0].group).toBe("aggregate");
60+
expect(reqs[0].state_token).toBe(1);
61+
});
62+
63+
it("back-to-back state_changes cancel the previous debounce timer", () => {
64+
orch.onStateChange({ search_string: "P" });
65+
jest.advanceTimersByTime(500);
66+
orch.onStateChange({ search_string: "PI" });
67+
// The first timer would have fired at t=1000ms; the second
68+
// resets it so at t=999ms from the SECOND call (= 1499 overall)
69+
// no compute_stat_group has fired yet.
70+
jest.advanceTimersByTime(998);
71+
expect(ws.byType("compute_stat_group")).toHaveLength(0);
72+
73+
// The second timer fires.
74+
jest.advanceTimersByTime(2);
75+
const reqs = ws.byType("compute_stat_group");
76+
expect(reqs).toHaveLength(1);
77+
expect(reqs[0].state_token).toBe(2); // second state_change's token
78+
});
79+
80+
it("rapid typing produces just one aggregate request, with the latest token", () => {
81+
// Simulate 5 keystrokes at 100ms intervals — typical typing cadence.
82+
for (let i = 0; i < 5; i++) {
83+
orch.onStateChange({ search_string: "P".repeat(i + 1) });
84+
jest.advanceTimersByTime(100);
85+
}
86+
// Default debounce = 1000ms after the last keystroke. No aggregate yet.
87+
expect(ws.byType("compute_stat_group")).toHaveLength(0);
88+
89+
jest.advanceTimersByTime(1000);
90+
const reqs = ws.byType("compute_stat_group");
91+
// Exactly one aggregate request, with the 5th (final) token.
92+
expect(reqs).toHaveLength(1);
93+
expect(reqs[0].state_token).toBe(5);
94+
});
95+
96+
it("onStatGroupResult with matching token updates the baseline", () => {
97+
orch.onStateChange({ search_string: "x" });
98+
const applied = orch.onStatGroupResult({
99+
type: "stat_group_result",
100+
state_token: 1,
101+
scope: "filt",
102+
group: "aggregate",
103+
elapsed_ms: 7500,
104+
});
105+
expect(applied).toBe(true);
106+
107+
// Next debounce is 2× 7500 = 15000ms, clamped to maxDebounceMs=5000.
108+
expect(orch.computeDebounce("filt")).toBe(5000);
109+
});
110+
111+
it("onStatGroupResult with stale token is silently dropped", () => {
112+
orch.onStateChange({ search_string: "x" });
113+
orch.onStateChange({ search_string: "xy" });
114+
// Token is now 2.
115+
const applied = orch.onStatGroupResult({
116+
type: "stat_group_result",
117+
state_token: 1, // stale
118+
scope: "filt",
119+
group: "aggregate",
120+
elapsed_ms: 9999,
121+
});
122+
expect(applied).toBe(false);
123+
// Baseline unchanged — debounce stays at the default.
124+
expect(orch.computeDebounce("filt")).toBe(1000);
125+
});
126+
127+
it("computeDebounce respects minDebounceMs floor", () => {
128+
const o = new StateOrchestrator({
129+
ws: new FakeWs(),
130+
minDebounceMs: 500,
131+
maxDebounceMs: 3000,
132+
multiplier: 2,
133+
});
134+
o.onStatGroupResult({
135+
type: "stat_group_result",
136+
state_token: 0,
137+
scope: "filt",
138+
group: "aggregate",
139+
elapsed_ms: 10, // 2× 10 = 20, well under floor
140+
});
141+
expect(o.computeDebounce("filt")).toBe(500);
142+
});
143+
144+
it("computeDebounce respects maxDebounceMs ceiling", () => {
145+
const o = new StateOrchestrator({
146+
ws: new FakeWs(),
147+
minDebounceMs: 200,
148+
maxDebounceMs: 3000,
149+
multiplier: 2,
150+
});
151+
o.onStatGroupResult({
152+
type: "stat_group_result",
153+
state_token: 0,
154+
scope: "filt",
155+
group: "aggregate",
156+
elapsed_ms: 6000, // 2× = 12000, hits ceiling
157+
});
158+
expect(o.computeDebounce("filt")).toBe(3000);
159+
});
160+
161+
it("initialAggregateMs seeds the baseline before any observed compute", () => {
162+
const o = new StateOrchestrator({
163+
ws: new FakeWs(),
164+
initialAggregateMs: { filt: 250 },
165+
minDebounceMs: 10,
166+
maxDebounceMs: 5000,
167+
multiplier: 2,
168+
});
169+
expect(o.computeDebounce("filt")).toBe(500); // 2× 250
170+
// Unrelated scope still uses fallback.
171+
expect(o.computeDebounce("clean")).toBe(1000); // 2× 500 (default)
172+
});
173+
174+
it("dispose cancels all pending aggregate timers", () => {
175+
orch.onStateChange({ search_string: "x" });
176+
orch.dispose();
177+
jest.advanceTimersByTime(10_000);
178+
expect(ws.byType("compute_stat_group")).toHaveLength(0);
179+
});
180+
181+
it("scopesForAggregate parameter overrides default ['filt']", () => {
182+
orch.onStateChange({ search_string: "x" }, { scopesForAggregate: ["filt", "clean"] });
183+
jest.advanceTimersByTime(10_000);
184+
const reqs = ws.byType("compute_stat_group");
185+
const scopes = reqs.map((r) => r.scope).sort();
186+
expect(scopes).toEqual(["clean", "filt"]);
187+
});
188+
});
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/**
2+
* Client-side scheduler for the JS-driven progressive-stats protocol.
3+
*
4+
* On a state_change, ships the cheap (scalar) request immediately so
5+
* df_meta + scalar pinned rows update fast. Schedules the expensive
6+
* (aggregate) compute per scope behind an adaptive debounce — by
7+
* default 2× the last observed aggregate compute time for that scope,
8+
* clamped to ``[minDebounceMs, maxDebounceMs]``.
9+
*
10+
* Token-based cancellation: every state_change bumps an internal
11+
* token; results carrying a stale token are dropped on arrival.
12+
*
13+
* See plans/js-driven-stat-debounce.md for the full protocol design.
14+
*
15+
* This module is transport-agnostic — pass any object with a
16+
* ``send(s: string)`` method. Existing buckaroo WS connections fit.
17+
*/
18+
19+
export type ScopeName = "raw" | "clean" | "filt";
20+
export type CostGroup = "scalar" | "aggregate";
21+
22+
export interface WsLike {
23+
send(message: string): void;
24+
}
25+
26+
export interface OrchestratorOptions {
27+
ws: WsLike;
28+
/** Lower bound on the per-scope debounce. Default 200 ms. */
29+
minDebounceMs?: number;
30+
/** Upper bound on the per-scope debounce. Default 3000 ms. */
31+
maxDebounceMs?: number;
32+
/** Multiplier on the last observed aggregate compute time. Default 2. */
33+
multiplier?: number;
34+
/** Initial aggregate baseline per scope (used until we observe a real one). */
35+
initialAggregateMs?: Partial<Record<ScopeName, number>>;
36+
}
37+
38+
export interface StatGroupResult {
39+
type: "stat_group_result";
40+
state_token: number;
41+
scope: ScopeName;
42+
group: CostGroup;
43+
elapsed_ms: number;
44+
stats?: unknown;
45+
}
46+
47+
export class StateOrchestrator {
48+
private token = 0;
49+
private aggregateTimers = new Map<ScopeName, ReturnType<typeof setTimeout>>();
50+
private lastAggregateMs = new Map<ScopeName, number>();
51+
private readonly ws: WsLike;
52+
private readonly minDebounceMs: number;
53+
private readonly maxDebounceMs: number;
54+
private readonly multiplier: number;
55+
/** Fallback baseline before any real aggregate compute has been observed. */
56+
private readonly defaultBaselineMs = 500;
57+
58+
constructor(opts: OrchestratorOptions) {
59+
this.ws = opts.ws;
60+
this.minDebounceMs = opts.minDebounceMs ?? 200;
61+
this.maxDebounceMs = opts.maxDebounceMs ?? 3000;
62+
this.multiplier = opts.multiplier ?? 2;
63+
if (opts.initialAggregateMs) {
64+
for (const [scope, ms] of Object.entries(opts.initialAggregateMs)) {
65+
if (ms != null) this.lastAggregateMs.set(scope as ScopeName, ms);
66+
}
67+
}
68+
}
69+
70+
/**
71+
* Current state-change token. Tests inspect this; production
72+
* code doesn't usually need it.
73+
*/
74+
get currentToken(): number {
75+
return this.token;
76+
}
77+
78+
/**
79+
* Compute the debounce delay (ms) for a given scope based on
80+
* the last observed aggregate compute time, clamped to
81+
* ``[minDebounceMs, maxDebounceMs]``.
82+
*/
83+
computeDebounce(scope: ScopeName): number {
84+
const last = this.lastAggregateMs.get(scope) ?? this.defaultBaselineMs;
85+
const raw = last * this.multiplier;
86+
return Math.max(this.minDebounceMs, Math.min(this.maxDebounceMs, raw));
87+
}
88+
89+
/**
90+
* Drive a single user-initiated state change.
91+
*
92+
* 1. Bump the state token.
93+
* 2. Cancel any pending aggregate timers from the previous change.
94+
* 3. Ship the ``state_change`` message (server will reply with
95+
* the scalar stats fast).
96+
* 4. For each scope expected to have aggregate work, schedule a
97+
* debounced ``compute_stat_group`` request. Timer fires the
98+
* request only if no further state_change has bumped the
99+
* token meanwhile.
100+
*/
101+
onStateChange(
102+
newState: Record<string, unknown>,
103+
opts?: { scopesForAggregate?: ScopeName[] },
104+
): void {
105+
const token = ++this.token;
106+
for (const t of this.aggregateTimers.values()) clearTimeout(t);
107+
this.aggregateTimers.clear();
108+
109+
this.ws.send(JSON.stringify({
110+
type: "state_change",
111+
state_token: token,
112+
new_state: newState,
113+
}));
114+
115+
const scopes = opts?.scopesForAggregate ?? ["filt"];
116+
for (const scope of scopes) {
117+
const delay = this.computeDebounce(scope);
118+
const tid = setTimeout(() => {
119+
// If a newer state_change bumped the token while we
120+
// were waiting, drop the request — sending it would
121+
// produce a stat_group_aborted from the server anyway.
122+
if (token !== this.token) return;
123+
this.ws.send(JSON.stringify({
124+
type: "compute_stat_group",
125+
state_token: token,
126+
scope,
127+
group: "aggregate",
128+
}));
129+
}, delay);
130+
this.aggregateTimers.set(scope, tid);
131+
}
132+
}
133+
134+
/**
135+
* Process a ``stat_group_result`` from the server. Stale results
136+
* (mismatched token) are silently ignored. Successful results
137+
* update the per-scope aggregate baseline used by the next
138+
* debounce.
139+
*
140+
* Returns true if the result was applied, false if it was stale.
141+
*/
142+
onStatGroupResult(msg: StatGroupResult): boolean {
143+
if (msg.state_token !== this.token) return false;
144+
this.lastAggregateMs.set(msg.scope, msg.elapsed_ms);
145+
return true;
146+
}
147+
148+
/** Cancel all pending aggregate timers. Call on widget unmount. */
149+
dispose(): void {
150+
for (const t of this.aggregateTimers.values()) clearTimeout(t);
151+
this.aggregateTimers.clear();
152+
}
153+
}

0 commit comments

Comments
 (0)