diff --git a/packages/buckaroo-js-core/src/server/StateOrchestrator.test.ts b/packages/buckaroo-js-core/src/server/StateOrchestrator.test.ts new file mode 100644 index 000000000..94c2ccef1 --- /dev/null +++ b/packages/buckaroo-js-core/src/server/StateOrchestrator.test.ts @@ -0,0 +1,188 @@ +import { StateOrchestrator, WsLike } from "./StateOrchestrator"; + +class FakeWs implements WsLike { + sent: string[] = []; + send(message: string): void { + this.sent.push(message); + } + last(): Record { + return JSON.parse(this.sent[this.sent.length - 1]); + } + byType(type: string): Record[] { + return this.sent + .map((s) => JSON.parse(s)) + .filter((m) => m.type === type); + } +} + +describe("StateOrchestrator", () => { + let ws: FakeWs; + let orch: StateOrchestrator; + + beforeEach(() => { + jest.useFakeTimers(); + ws = new FakeWs(); + orch = new StateOrchestrator({ ws, minDebounceMs: 10, maxDebounceMs: 5000 }); + }); + + afterEach(() => { + orch.dispose(); + jest.useRealTimers(); + }); + + it("starts with token 0", () => { + expect(orch.currentToken).toBe(0); + }); + + it("onStateChange bumps token and ships state_change", () => { + orch.onStateChange({ search_string: "x" }); + expect(orch.currentToken).toBe(1); + const msg = ws.last(); + expect(msg.type).toBe("state_change"); + expect(msg.state_token).toBe(1); + expect((msg.new_state as Record).search_string).toBe("x"); + }); + + it("schedules compute_stat_group after the debounce", () => { + orch.onStateChange({ search_string: "PIZZA" }); + // Only the immediate state_change has been sent so far. + expect(ws.byType("compute_stat_group")).toHaveLength(0); + + // Default baseline is 500ms × 2× = 1000ms; clamped to maxDebounceMs=5000. + jest.advanceTimersByTime(999); + expect(ws.byType("compute_stat_group")).toHaveLength(0); + + jest.advanceTimersByTime(1); + const reqs = ws.byType("compute_stat_group"); + expect(reqs).toHaveLength(1); + expect(reqs[0].scope).toBe("filt"); + expect(reqs[0].group).toBe("aggregate"); + expect(reqs[0].state_token).toBe(1); + }); + + it("back-to-back state_changes cancel the previous debounce timer", () => { + orch.onStateChange({ search_string: "P" }); + jest.advanceTimersByTime(500); + orch.onStateChange({ search_string: "PI" }); + // The first timer would have fired at t=1000ms; the second + // resets it so at t=999ms from the SECOND call (= 1499 overall) + // no compute_stat_group has fired yet. + jest.advanceTimersByTime(998); + expect(ws.byType("compute_stat_group")).toHaveLength(0); + + // The second timer fires. + jest.advanceTimersByTime(2); + const reqs = ws.byType("compute_stat_group"); + expect(reqs).toHaveLength(1); + expect(reqs[0].state_token).toBe(2); // second state_change's token + }); + + it("rapid typing produces just one aggregate request, with the latest token", () => { + // Simulate 5 keystrokes at 100ms intervals — typical typing cadence. + for (let i = 0; i < 5; i++) { + orch.onStateChange({ search_string: "P".repeat(i + 1) }); + jest.advanceTimersByTime(100); + } + // Default debounce = 1000ms after the last keystroke. No aggregate yet. + expect(ws.byType("compute_stat_group")).toHaveLength(0); + + jest.advanceTimersByTime(1000); + const reqs = ws.byType("compute_stat_group"); + // Exactly one aggregate request, with the 5th (final) token. + expect(reqs).toHaveLength(1); + expect(reqs[0].state_token).toBe(5); + }); + + it("onStatGroupResult with matching token updates the baseline", () => { + orch.onStateChange({ search_string: "x" }); + const applied = orch.onStatGroupResult({ + type: "stat_group_result", + state_token: 1, + scope: "filt", + group: "aggregate", + elapsed_ms: 7500, + }); + expect(applied).toBe(true); + + // Next debounce is 2× 7500 = 15000ms, clamped to maxDebounceMs=5000. + expect(orch.computeDebounce("filt")).toBe(5000); + }); + + it("onStatGroupResult with stale token is silently dropped", () => { + orch.onStateChange({ search_string: "x" }); + orch.onStateChange({ search_string: "xy" }); + // Token is now 2. + const applied = orch.onStatGroupResult({ + type: "stat_group_result", + state_token: 1, // stale + scope: "filt", + group: "aggregate", + elapsed_ms: 9999, + }); + expect(applied).toBe(false); + // Baseline unchanged — debounce stays at the default. + expect(orch.computeDebounce("filt")).toBe(1000); + }); + + it("computeDebounce respects minDebounceMs floor", () => { + const o = new StateOrchestrator({ + ws: new FakeWs(), + minDebounceMs: 500, + maxDebounceMs: 3000, + multiplier: 2, + }); + o.onStatGroupResult({ + type: "stat_group_result", + state_token: 0, + scope: "filt", + group: "aggregate", + elapsed_ms: 10, // 2× 10 = 20, well under floor + }); + expect(o.computeDebounce("filt")).toBe(500); + }); + + it("computeDebounce respects maxDebounceMs ceiling", () => { + const o = new StateOrchestrator({ + ws: new FakeWs(), + minDebounceMs: 200, + maxDebounceMs: 3000, + multiplier: 2, + }); + o.onStatGroupResult({ + type: "stat_group_result", + state_token: 0, + scope: "filt", + group: "aggregate", + elapsed_ms: 6000, // 2× = 12000, hits ceiling + }); + expect(o.computeDebounce("filt")).toBe(3000); + }); + + it("initialAggregateMs seeds the baseline before any observed compute", () => { + const o = new StateOrchestrator({ + ws: new FakeWs(), + initialAggregateMs: { filt: 250 }, + minDebounceMs: 10, + maxDebounceMs: 5000, + multiplier: 2, + }); + expect(o.computeDebounce("filt")).toBe(500); // 2× 250 + // Unrelated scope still uses fallback. + expect(o.computeDebounce("clean")).toBe(1000); // 2× 500 (default) + }); + + it("dispose cancels all pending aggregate timers", () => { + orch.onStateChange({ search_string: "x" }); + orch.dispose(); + jest.advanceTimersByTime(10_000); + expect(ws.byType("compute_stat_group")).toHaveLength(0); + }); + + it("scopesForAggregate parameter overrides default ['filt']", () => { + orch.onStateChange({ search_string: "x" }, { scopesForAggregate: ["filt", "clean"] }); + jest.advanceTimersByTime(10_000); + const reqs = ws.byType("compute_stat_group"); + const scopes = reqs.map((r) => r.scope).sort(); + expect(scopes).toEqual(["clean", "filt"]); + }); +}); diff --git a/packages/buckaroo-js-core/src/server/StateOrchestrator.ts b/packages/buckaroo-js-core/src/server/StateOrchestrator.ts new file mode 100644 index 000000000..e096ac606 --- /dev/null +++ b/packages/buckaroo-js-core/src/server/StateOrchestrator.ts @@ -0,0 +1,153 @@ +/** + * Client-side scheduler for the JS-driven progressive-stats protocol. + * + * On a state_change, ships the cheap (scalar) request immediately so + * df_meta + scalar pinned rows update fast. Schedules the expensive + * (aggregate) compute per scope behind an adaptive debounce — by + * default 2× the last observed aggregate compute time for that scope, + * clamped to ``[minDebounceMs, maxDebounceMs]``. + * + * Token-based cancellation: every state_change bumps an internal + * token; results carrying a stale token are dropped on arrival. + * + * See plans/js-driven-stat-debounce.md for the full protocol design. + * + * This module is transport-agnostic — pass any object with a + * ``send(s: string)`` method. Existing buckaroo WS connections fit. + */ + +export type ScopeName = "raw" | "clean" | "filt"; +export type CostGroup = "scalar" | "aggregate"; + +export interface WsLike { + send(message: string): void; +} + +export interface OrchestratorOptions { + ws: WsLike; + /** Lower bound on the per-scope debounce. Default 200 ms. */ + minDebounceMs?: number; + /** Upper bound on the per-scope debounce. Default 3000 ms. */ + maxDebounceMs?: number; + /** Multiplier on the last observed aggregate compute time. Default 2. */ + multiplier?: number; + /** Initial aggregate baseline per scope (used until we observe a real one). */ + initialAggregateMs?: Partial>; +} + +export interface StatGroupResult { + type: "stat_group_result"; + state_token: number; + scope: ScopeName; + group: CostGroup; + elapsed_ms: number; + stats?: unknown; +} + +export class StateOrchestrator { + private token = 0; + private aggregateTimers = new Map>(); + private lastAggregateMs = new Map(); + private readonly ws: WsLike; + private readonly minDebounceMs: number; + private readonly maxDebounceMs: number; + private readonly multiplier: number; + /** Fallback baseline before any real aggregate compute has been observed. */ + private readonly defaultBaselineMs = 500; + + constructor(opts: OrchestratorOptions) { + this.ws = opts.ws; + this.minDebounceMs = opts.minDebounceMs ?? 200; + this.maxDebounceMs = opts.maxDebounceMs ?? 3000; + this.multiplier = opts.multiplier ?? 2; + if (opts.initialAggregateMs) { + for (const [scope, ms] of Object.entries(opts.initialAggregateMs)) { + if (ms != null) this.lastAggregateMs.set(scope as ScopeName, ms); + } + } + } + + /** + * Current state-change token. Tests inspect this; production + * code doesn't usually need it. + */ + get currentToken(): number { + return this.token; + } + + /** + * Compute the debounce delay (ms) for a given scope based on + * the last observed aggregate compute time, clamped to + * ``[minDebounceMs, maxDebounceMs]``. + */ + computeDebounce(scope: ScopeName): number { + const last = this.lastAggregateMs.get(scope) ?? this.defaultBaselineMs; + const raw = last * this.multiplier; + return Math.max(this.minDebounceMs, Math.min(this.maxDebounceMs, raw)); + } + + /** + * Drive a single user-initiated state change. + * + * 1. Bump the state token. + * 2. Cancel any pending aggregate timers from the previous change. + * 3. Ship the ``state_change`` message (server will reply with + * the scalar stats fast). + * 4. For each scope expected to have aggregate work, schedule a + * debounced ``compute_stat_group`` request. Timer fires the + * request only if no further state_change has bumped the + * token meanwhile. + */ + onStateChange( + newState: Record, + opts?: { scopesForAggregate?: ScopeName[] }, + ): void { + const token = ++this.token; + for (const t of this.aggregateTimers.values()) clearTimeout(t); + this.aggregateTimers.clear(); + + this.ws.send(JSON.stringify({ + type: "state_change", + state_token: token, + new_state: newState, + })); + + const scopes = opts?.scopesForAggregate ?? ["filt"]; + for (const scope of scopes) { + const delay = this.computeDebounce(scope); + const tid = setTimeout(() => { + // If a newer state_change bumped the token while we + // were waiting, drop the request — sending it would + // produce a stat_group_aborted from the server anyway. + if (token !== this.token) return; + this.ws.send(JSON.stringify({ + type: "compute_stat_group", + state_token: token, + scope, + group: "aggregate", + })); + }, delay); + this.aggregateTimers.set(scope, tid); + } + } + + /** + * Process a ``stat_group_result`` from the server. Stale results + * (mismatched token) are silently ignored. Successful results + * update the per-scope aggregate baseline used by the next + * debounce. + * + * Returns true if the result was applied, false if it was stale. + */ + onStatGroupResult(msg: StatGroupResult): boolean { + if (msg.state_token !== this.token) return false; + this.lastAggregateMs.set(msg.scope, msg.elapsed_ms); + return true; + } + + /** Cancel all pending aggregate timers. Call on widget unmount. */ + dispose(): void { + for (const t of this.aggregateTimers.values()) clearTimeout(t); + this.aggregateTimers.clear(); + } +}