Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 188 additions & 0 deletions packages/buckaroo-js-core/src/server/StateOrchestrator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import { StateOrchestrator, WsLike } from "./StateOrchestrator";

class FakeWs implements WsLike {
sent: string[] = [];
send(message: string): void {
this.sent.push(message);
}
last(): Record<string, unknown> {
return JSON.parse(this.sent[this.sent.length - 1]);
}
byType(type: string): Record<string, unknown>[] {
return this.sent
.map((s) => JSON.parse(s))
.filter((m) => m.type === type);
}
}

describe("StateOrchestrator", () => {
let ws: FakeWs;
let orch: StateOrchestrator;

beforeEach(() => {
jest.useFakeTimers();
ws = new FakeWs();
orch = new StateOrchestrator({ ws, minDebounceMs: 10, maxDebounceMs: 5000 });
});

afterEach(() => {
orch.dispose();
jest.useRealTimers();
});

it("starts with token 0", () => {
expect(orch.currentToken).toBe(0);
});

it("onStateChange bumps token and ships state_change", () => {
orch.onStateChange({ search_string: "x" });
expect(orch.currentToken).toBe(1);
const msg = ws.last();
expect(msg.type).toBe("state_change");
expect(msg.state_token).toBe(1);
expect((msg.new_state as Record<string, unknown>).search_string).toBe("x");
});

it("schedules compute_stat_group after the debounce", () => {
orch.onStateChange({ search_string: "PIZZA" });
// Only the immediate state_change has been sent so far.
expect(ws.byType("compute_stat_group")).toHaveLength(0);

// Default baseline is 500ms × 2× = 1000ms; clamped to maxDebounceMs=5000.
jest.advanceTimersByTime(999);
expect(ws.byType("compute_stat_group")).toHaveLength(0);

jest.advanceTimersByTime(1);
const reqs = ws.byType("compute_stat_group");
expect(reqs).toHaveLength(1);
expect(reqs[0].scope).toBe("filt");
expect(reqs[0].group).toBe("aggregate");
expect(reqs[0].state_token).toBe(1);
});

it("back-to-back state_changes cancel the previous debounce timer", () => {
orch.onStateChange({ search_string: "P" });
jest.advanceTimersByTime(500);
orch.onStateChange({ search_string: "PI" });
// The first timer would have fired at t=1000ms; the second
// resets it so at t=999ms from the SECOND call (= 1499 overall)
// no compute_stat_group has fired yet.
jest.advanceTimersByTime(998);
expect(ws.byType("compute_stat_group")).toHaveLength(0);

// The second timer fires.
jest.advanceTimersByTime(2);
const reqs = ws.byType("compute_stat_group");
expect(reqs).toHaveLength(1);
expect(reqs[0].state_token).toBe(2); // second state_change's token
});

it("rapid typing produces just one aggregate request, with the latest token", () => {
// Simulate 5 keystrokes at 100ms intervals — typical typing cadence.
for (let i = 0; i < 5; i++) {
orch.onStateChange({ search_string: "P".repeat(i + 1) });
jest.advanceTimersByTime(100);
}
// Default debounce = 1000ms after the last keystroke. No aggregate yet.
expect(ws.byType("compute_stat_group")).toHaveLength(0);

jest.advanceTimersByTime(1000);
const reqs = ws.byType("compute_stat_group");
// Exactly one aggregate request, with the 5th (final) token.
expect(reqs).toHaveLength(1);
expect(reqs[0].state_token).toBe(5);
});

it("onStatGroupResult with matching token updates the baseline", () => {
orch.onStateChange({ search_string: "x" });
const applied = orch.onStatGroupResult({
type: "stat_group_result",
state_token: 1,
scope: "filt",
group: "aggregate",
elapsed_ms: 7500,
});
expect(applied).toBe(true);

// Next debounce is 2× 7500 = 15000ms, clamped to maxDebounceMs=5000.
expect(orch.computeDebounce("filt")).toBe(5000);
});

it("onStatGroupResult with stale token is silently dropped", () => {
orch.onStateChange({ search_string: "x" });
orch.onStateChange({ search_string: "xy" });
// Token is now 2.
const applied = orch.onStatGroupResult({
type: "stat_group_result",
state_token: 1, // stale
scope: "filt",
group: "aggregate",
elapsed_ms: 9999,
});
expect(applied).toBe(false);
// Baseline unchanged — debounce stays at the default.
expect(orch.computeDebounce("filt")).toBe(1000);
});

it("computeDebounce respects minDebounceMs floor", () => {
const o = new StateOrchestrator({
ws: new FakeWs(),
minDebounceMs: 500,
maxDebounceMs: 3000,
multiplier: 2,
});
o.onStatGroupResult({
type: "stat_group_result",
state_token: 0,
scope: "filt",
group: "aggregate",
elapsed_ms: 10, // 2× 10 = 20, well under floor
});
expect(o.computeDebounce("filt")).toBe(500);
});

it("computeDebounce respects maxDebounceMs ceiling", () => {
const o = new StateOrchestrator({
ws: new FakeWs(),
minDebounceMs: 200,
maxDebounceMs: 3000,
multiplier: 2,
});
o.onStatGroupResult({
type: "stat_group_result",
state_token: 0,
scope: "filt",
group: "aggregate",
elapsed_ms: 6000, // 2× = 12000, hits ceiling
});
expect(o.computeDebounce("filt")).toBe(3000);
});

it("initialAggregateMs seeds the baseline before any observed compute", () => {
const o = new StateOrchestrator({
ws: new FakeWs(),
initialAggregateMs: { filt: 250 },
minDebounceMs: 10,
maxDebounceMs: 5000,
multiplier: 2,
});
expect(o.computeDebounce("filt")).toBe(500); // 2× 250
// Unrelated scope still uses fallback.
expect(o.computeDebounce("clean")).toBe(1000); // 2× 500 (default)
});

it("dispose cancels all pending aggregate timers", () => {
orch.onStateChange({ search_string: "x" });
orch.dispose();
jest.advanceTimersByTime(10_000);
expect(ws.byType("compute_stat_group")).toHaveLength(0);
});

it("scopesForAggregate parameter overrides default ['filt']", () => {
orch.onStateChange({ search_string: "x" }, { scopesForAggregate: ["filt", "clean"] });
jest.advanceTimersByTime(10_000);
const reqs = ws.byType("compute_stat_group");
const scopes = reqs.map((r) => r.scope).sort();
expect(scopes).toEqual(["clean", "filt"]);
});
});
153 changes: 153 additions & 0 deletions packages/buckaroo-js-core/src/server/StateOrchestrator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* Client-side scheduler for the JS-driven progressive-stats protocol.
*
* On a state_change, ships the cheap (scalar) request immediately so
* df_meta + scalar pinned rows update fast. Schedules the expensive
* (aggregate) compute per scope behind an adaptive debounce — by
* default 2× the last observed aggregate compute time for that scope,
* clamped to ``[minDebounceMs, maxDebounceMs]``.
*
* Token-based cancellation: every state_change bumps an internal
* token; results carrying a stale token are dropped on arrival.
*
* See plans/js-driven-stat-debounce.md for the full protocol design.
*
* This module is transport-agnostic — pass any object with a
* ``send(s: string)`` method. Existing buckaroo WS connections fit.
*/

export type ScopeName = "raw" | "clean" | "filt";
export type CostGroup = "scalar" | "aggregate";

export interface WsLike {
send(message: string): void;
}

export interface OrchestratorOptions {
ws: WsLike;
/** Lower bound on the per-scope debounce. Default 200 ms. */
minDebounceMs?: number;
/** Upper bound on the per-scope debounce. Default 3000 ms. */
maxDebounceMs?: number;
/** Multiplier on the last observed aggregate compute time. Default 2. */
multiplier?: number;
/** Initial aggregate baseline per scope (used until we observe a real one). */
initialAggregateMs?: Partial<Record<ScopeName, number>>;
}

export interface StatGroupResult {
type: "stat_group_result";
state_token: number;
scope: ScopeName;
group: CostGroup;
elapsed_ms: number;
stats?: unknown;
}

export class StateOrchestrator {
private token = 0;
private aggregateTimers = new Map<ScopeName, ReturnType<typeof setTimeout>>();
private lastAggregateMs = new Map<ScopeName, number>();
private readonly ws: WsLike;
private readonly minDebounceMs: number;
private readonly maxDebounceMs: number;
private readonly multiplier: number;
/** Fallback baseline before any real aggregate compute has been observed. */
private readonly defaultBaselineMs = 500;

constructor(opts: OrchestratorOptions) {
this.ws = opts.ws;
this.minDebounceMs = opts.minDebounceMs ?? 200;
this.maxDebounceMs = opts.maxDebounceMs ?? 3000;
this.multiplier = opts.multiplier ?? 2;
if (opts.initialAggregateMs) {
for (const [scope, ms] of Object.entries(opts.initialAggregateMs)) {
if (ms != null) this.lastAggregateMs.set(scope as ScopeName, ms);
}
}
}

/**
* Current state-change token. Tests inspect this; production
* code doesn't usually need it.
*/
get currentToken(): number {
return this.token;
}

/**
* Compute the debounce delay (ms) for a given scope based on
* the last observed aggregate compute time, clamped to
* ``[minDebounceMs, maxDebounceMs]``.
*/
computeDebounce(scope: ScopeName): number {
const last = this.lastAggregateMs.get(scope) ?? this.defaultBaselineMs;
const raw = last * this.multiplier;
return Math.max(this.minDebounceMs, Math.min(this.maxDebounceMs, raw));
}

/**
* Drive a single user-initiated state change.
*
* 1. Bump the state token.
* 2. Cancel any pending aggregate timers from the previous change.
* 3. Ship the ``state_change`` message (server will reply with
* the scalar stats fast).
* 4. For each scope expected to have aggregate work, schedule a
* debounced ``compute_stat_group`` request. Timer fires the
* request only if no further state_change has bumped the
* token meanwhile.
*/
onStateChange(
newState: Record<string, unknown>,
opts?: { scopesForAggregate?: ScopeName[] },
): void {
const token = ++this.token;
for (const t of this.aggregateTimers.values()) clearTimeout(t);
this.aggregateTimers.clear();

this.ws.send(JSON.stringify({
type: "state_change",
state_token: token,
new_state: newState,
}));

const scopes = opts?.scopesForAggregate ?? ["filt"];
for (const scope of scopes) {
const delay = this.computeDebounce(scope);
const tid = setTimeout(() => {
// If a newer state_change bumped the token while we
// were waiting, drop the request — sending it would
// produce a stat_group_aborted from the server anyway.
if (token !== this.token) return;
this.ws.send(JSON.stringify({
type: "compute_stat_group",
state_token: token,
scope,
group: "aggregate",
}));
}, delay);
this.aggregateTimers.set(scope, tid);
}
}

/**
* Process a ``stat_group_result`` from the server. Stale results
* (mismatched token) are silently ignored. Successful results
* update the per-scope aggregate baseline used by the next
* debounce.
*
* Returns true if the result was applied, false if it was stale.
*/
onStatGroupResult(msg: StatGroupResult): boolean {
if (msg.state_token !== this.token) return false;
this.lastAggregateMs.set(msg.scope, msg.elapsed_ms);
Comment on lines +143 to +144

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Ignore scalar results when updating aggregate debounce baseline

onStatGroupResult updates lastAggregateMs for every matching-token stat_group_result, but this message type includes both scalar and aggregate groups. If callers pass through the cheap scalar response from state_change, its much smaller elapsed_ms will overwrite the aggregate baseline and make later computeDebounce delays far too short, increasing aggregate request frequency and defeating the intended adaptive backoff. Gate the baseline update on msg.group === "aggregate".

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid P1. onStatGroupResult should gate on msg.group === 'aggregate' before updating lastAggregateMs — scalar results have much smaller elapsed_ms and would shrink the next aggregate debounce. Fix: if (msg.group === 'aggregate') this.lastAggregateMs.set(msg.scope, msg.elapsed_ms);. Adding before the Phase-4 wire-up so the bug never ships to BuckarooView.

return true;
}

/** Cancel all pending aggregate timers. Call on widget unmount. */
dispose(): void {
for (const t of this.aggregateTimers.values()) clearTimeout(t);
this.aggregateTimers.clear();
}
}
Loading