Skip to content

Commit fd52094

Browse files
authored
Reactivity hardening (sequenced bench bisect) (#207)
1 parent 428e17e commit fd52094

7 files changed

Lines changed: 205 additions & 96 deletions

File tree

packages/reactivity/src/dependency.js

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,7 @@ export class Dependency {
4242
}
4343
}
4444

45-
// called after flush
46-
cleanUp(reaction) {
47-
this.subscribers.delete(reaction);
48-
}
49-
50-
// identical for now but called from stop()
51-
unsubscribe(reaction) {
45+
remove(reaction) {
5246
this.subscribers.delete(reaction);
5347
}
5448
}

packages/reactivity/src/reaction.js

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,27 +85,25 @@ export class Reaction {
8585
Scheduler.current = this;
8686
try {
8787
for (const dep of this.dependencies) {
88-
dep.cleanUp(this);
88+
dep.remove(this);
8989
}
9090
this.dependencies.clear();
9191
this.callback(this);
92-
this.firstRun = false;
9392
}
9493
finally {
94+
// firstRun advances even on throw so a re-invalidation re-tracks from a known baseline
95+
this.firstRun = false;
9596
Scheduler.current = previousReaction;
9697
}
9798
}
9899

99100
invalidate(context) {
100-
// Set this reaction as active and about to be run
101-
this.active = true;
102-
103-
// Pass through trace for debugging
101+
if (!this.active) {
102+
return;
103+
}
104104
if (context) {
105105
this.addContext(context);
106106
}
107-
108-
// Schedule this reaction to occur in the next flush
109107
Scheduler.scheduleReaction(this);
110108
}
111109

@@ -114,7 +112,9 @@ export class Reaction {
114112
return;
115113
}
116114
this.active = false;
117-
this.dependencies.forEach(dep => dep.unsubscribe(this));
115+
Scheduler.pendingReactions.delete(this);
116+
this.dependencies.forEach(dep => dep.remove(this));
117+
this.dependencies.clear();
118118
this.fireCleanups();
119119
}
120120

packages/reactivity/src/scheduler.js

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export class Scheduler {
77
static pendingReactions = new Set();
88
static afterFlushCallbacks = [];
99
static isFlushScheduled = false;
10+
static isFlushing = false;
1011

1112
static scheduleReaction(reaction) {
1213
Scheduler.pendingReactions.add(reaction);
@@ -24,43 +25,66 @@ export class Scheduler {
2425

2526
static flush() {
2627
Scheduler.isFlushScheduled = false;
28+
Scheduler.isFlushing = true;
2729
// capture first error but finish draining so one faulty reaction or afterFlush callback can't jam the queue
2830
let firstError;
2931
let iterations = 0;
30-
while (Scheduler.pendingReactions.size > 0) {
31-
if (++iterations > Scheduler.maxFlushIterations) {
32-
console.error('Reactive cycle detected: flush exceeded maximum iterations');
33-
Scheduler.pendingReactions.clear();
34-
break;
35-
}
36-
const reactions = [...Scheduler.pendingReactions];
37-
Scheduler.pendingReactions.clear();
38-
for (let i = 0; i < reactions.length; i++) {
39-
try {
40-
reactions[i].run();
32+
try {
33+
// alternate: drain all pending reactions, then run one snapshot of afterFlush callbacks.
34+
// afterFlushes registered during the batch land in the next alternation, which drains
35+
// any reactions they queued before the next batch runs.
36+
while (Scheduler.pendingReactions.size > 0 || Scheduler.afterFlushCallbacks.length > 0) {
37+
while (Scheduler.pendingReactions.size > 0) {
38+
if (++iterations > Scheduler.maxFlushIterations) {
39+
console.error('Reactive cycle detected: flush exceeded maximum iterations');
40+
Scheduler.pendingReactions.clear();
41+
Scheduler.afterFlushCallbacks.length = 0;
42+
break;
43+
}
44+
// set-swap: avoid the per-pass array spread. new invalidations land in the next pass.
45+
const toRun = Scheduler.pendingReactions;
46+
Scheduler.pendingReactions = new Set();
47+
for (const r of toRun) {
48+
if (r.stopped) { continue; }
49+
try {
50+
r.run();
51+
}
52+
catch (e) {
53+
if (!firstError) { firstError = e; }
54+
}
55+
}
4156
}
42-
catch (e) {
43-
if (!firstError) { firstError = e; }
57+
if (Scheduler.afterFlushCallbacks.length > 0) {
58+
if (++iterations > Scheduler.maxFlushIterations) {
59+
console.error('Reactive cycle detected: flush exceeded maximum iterations');
60+
Scheduler.afterFlushCallbacks.length = 0;
61+
break;
62+
}
63+
const callbacks = Scheduler.afterFlushCallbacks;
64+
Scheduler.afterFlushCallbacks = [];
65+
for (let i = 0; i < callbacks.length; i++) {
66+
try {
67+
callbacks[i]();
68+
}
69+
catch (e) {
70+
if (!firstError) { firstError = e; }
71+
}
72+
}
4473
}
4574
}
4675
}
47-
48-
const callbacks = Scheduler.afterFlushCallbacks;
49-
Scheduler.afterFlushCallbacks = [];
50-
for (let i = 0; i < callbacks.length; i++) {
51-
try {
52-
callbacks[i]();
53-
}
54-
catch (e) {
55-
if (!firstError) { firstError = e; }
56-
}
76+
finally {
77+
Scheduler.isFlushing = false;
5778
}
5879

5980
if (firstError) { throw firstError; }
6081
}
6182

6283
static afterFlush(callback) {
6384
Scheduler.afterFlushCallbacks.push(callback);
85+
if (!Scheduler.isFlushing) {
86+
Scheduler.scheduleFlush();
87+
}
6488
}
6589

6690
static getSource() {

packages/reactivity/src/signal.js

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -145,52 +145,44 @@ export class Signal {
145145
// derive a new signal from this signal's value
146146
derive(computeFn, options = {}) {
147147
const derivedSignal = new Signal(undefined, options);
148+
// weak so the reaction's closure doesn't pin derived through source.dep.subscribers
149+
const derivedRef = new WeakRef(derivedSignal);
150+
const source = this;
148151

149-
// check if signal has been garbage collected
150-
// if it has we need to clean up reaction
151-
const sourceRef = new WeakRef(this);
152-
153-
// Create reaction that updates the derived signal
154152
const reaction = Reaction.create(() => {
155-
const source = sourceRef.deref();
156-
if (!source) {
157-
reaction.stop(); // Auto-cleanup if source is gone
153+
const d = derivedRef.deref();
154+
if (!d) {
155+
reaction.stop();
158156
return;
159157
}
160-
const result = computeFn(source.get());
161-
derivedSignal.set(result);
158+
d.set(computeFn(source.get()));
162159
});
163160

164-
// scope to parent reaction when called from inside one
165161
if (Reaction.current) {
166162
Reaction.current.onCleanup(() => reaction.stop());
167163
}
168164

169-
// Store reaction reference for potential cleanup
170-
derivedSignal._derivedReaction = reaction;
171-
172165
return derivedSignal;
173166
}
174167

175168
// static method for computing from multiple signals
176169
static computed(computeFn, options = {}) {
177170
const computedSignal = new Signal(undefined, options);
171+
const computedRef = new WeakRef(computedSignal);
178172

179-
// Create reaction that updates the computed signal
180-
// No WeakRef needed - computed signal and reaction have same lifecycle
181173
const reaction = Reaction.create(() => {
182-
const result = computeFn();
183-
computedSignal.set(result);
174+
const c = computedRef.deref();
175+
if (!c) {
176+
reaction.stop();
177+
return;
178+
}
179+
c.set(computeFn());
184180
});
185181

186-
// scope to parent reaction when called from inside one
187182
if (Reaction.current) {
188183
Reaction.current.onCleanup(() => reaction.stop());
189184
}
190185

191-
// Store reaction reference for potential cleanup
192-
computedSignal._computedReaction = reaction;
193-
194186
return computedSignal;
195187
}
196188

packages/reactivity/test/unit/internals.test.js

Lines changed: 112 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,27 @@ describe('Scheduler — flush', () => {
119119
errorSpy.mockRestore();
120120
});
121121

122+
// the cycle cap spans both queues with a unified iteration counter — a reaction
123+
// that schedules an afterFlush that re-invalidates the reaction must also hit it
124+
it('breaks a reaction↔afterFlush cycle and logs an error', () => {
125+
const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
126+
const trigger = new Signal(0);
127+
128+
Reaction.create(() => {
129+
trigger.get();
130+
Reaction.afterFlush(() => trigger.set(trigger.peek() + 1));
131+
});
132+
133+
trigger.set(1);
134+
Reaction.flush();
135+
136+
expect(errorSpy).toHaveBeenCalled();
137+
expect(errorSpy.mock.calls.some(c => /cycle detected/i.test(c[0]))).toBe(true);
138+
expect(Scheduler.pendingReactions.size).toBe(0);
139+
expect(Scheduler.afterFlushCallbacks.length).toBe(0);
140+
errorSpy.mockRestore();
141+
});
142+
122143
// exception in one reaction must not silently swallow others in the same batch
123144
// either it propagates or framework isolates each, this test pins which
124145
it('continues processing remaining reactions when one throws', () => {
@@ -210,8 +231,7 @@ describe('Scheduler — flush', () => {
210231
expect(settled).toHaveBeenCalledTimes(1);
211232
});
212233

213-
// late-registered afterFlush queues for the next flush, otherwise self-registering callbacks would infinite-loop
214-
it('does not run afterFlush callbacks registered DURING afterFlush in the same pass', () => {
234+
it('drains afterFlush callbacks registered during afterFlush in the same flush', () => {
215235
let runCount = 0;
216236
const recursive = () => {
217237
runCount++;
@@ -222,10 +242,45 @@ describe('Scheduler — flush', () => {
222242
Reaction.afterFlush(recursive);
223243

224244
Reaction.flush();
225-
expect(runCount).toBe(1);
245+
expect(runCount).toBe(5);
246+
});
247+
248+
it('schedules a flush when afterFlush registers with no pending work', async () => {
249+
const cb = vi.fn();
250+
Reaction.afterFlush(cb);
251+
await Promise.resolve();
252+
expect(cb).toHaveBeenCalledTimes(1);
253+
});
254+
255+
it('keeps draining when an afterFlush callback throws', () => {
256+
const survivor = vi.fn();
257+
Reaction.afterFlush(() => {
258+
throw new Error('boom');
259+
});
260+
Reaction.afterFlush(survivor);
261+
262+
expect(() => Reaction.flush()).toThrow('boom');
263+
expect(survivor).toHaveBeenCalledTimes(1);
264+
});
265+
266+
it('drains reactions queued by an afterFlush callback before the next callback runs', () => {
267+
const source = new Signal('initial');
268+
const derived = new Signal('initial');
269+
270+
Reaction.create(() => {
271+
derived.set(`reaction-saw-${source.get()}`);
272+
});
273+
274+
let observedInCb2;
275+
Reaction.afterFlush(() => {
276+
source.set('updated-by-cb1');
277+
Reaction.afterFlush(() => {
278+
observedInCb2 = derived.peek();
279+
});
280+
});
226281

227282
Reaction.flush();
228-
expect(runCount).toBe(2);
283+
expect(observedInCb2).toBe('reaction-saw-updated-by-cb1');
229284
});
230285
});
231286

@@ -262,6 +317,32 @@ describe('Scheduler — current reaction context', () => {
262317
expect(s.hasDependents()).toBe(false);
263318
});
264319

320+
it('advances firstRun even when the callback throws, so re-invalidation tracks fresh deps', () => {
321+
const trigger = new Signal(0);
322+
let throwOnce = true;
323+
const callback = vi.fn();
324+
let reaction;
325+
326+
// Reaction.create throws because the first run does, so capture the instance via the callback arg
327+
expect(() => {
328+
Reaction.create((r) => {
329+
reaction = r;
330+
trigger.get();
331+
callback(r.firstRun);
332+
if (throwOnce) {
333+
throwOnce = false;
334+
throw new Error('first run throws');
335+
}
336+
});
337+
}).toThrow('first run throws');
338+
339+
expect(reaction.firstRun).toBe(false);
340+
341+
trigger.set(1);
342+
Reaction.flush();
343+
expect(callback).toHaveBeenLastCalledWith(false);
344+
});
345+
265346
it('nonreactive nests correctly — restores outer reaction when inner returns', () => {
266347
const outer = new Signal('outer');
267348
const inner = new Signal('inner');
@@ -467,6 +548,33 @@ describe('Reaction.guard', () => {
467548
outer.stop();
468549
expect(counter.dependency.subscribers.size).toBe(0);
469550
});
551+
552+
it('propagates value changes after the first f() throws', () => {
553+
const source = new Signal('first');
554+
let throwOnce = true;
555+
const downstream = vi.fn();
556+
557+
expect(() => {
558+
Reaction.create(() => {
559+
const v = Reaction.guard(() => {
560+
const x = source.get();
561+
if (throwOnce) {
562+
throwOnce = false;
563+
throw new Error('first run throws');
564+
}
565+
return x;
566+
});
567+
downstream(v);
568+
});
569+
}).toThrow('first run throws');
570+
571+
expect(downstream).not.toHaveBeenCalled();
572+
573+
// a signal change re-fires the inner guard, which now succeeds and propagates upward
574+
source.set('second');
575+
Reaction.flush();
576+
expect(downstream).toHaveBeenCalledWith('second');
577+
});
470578
});
471579

472580
/*******************************

0 commit comments

Comments
 (0)