Skip to content

Commit 212e353

Browse files
committed
Change: Deliver events to worker in batches during restoring phase
1 parent c120991 commit 212e353

6 files changed

Lines changed: 74 additions & 14 deletions

File tree

src/workers/AbstractWorkerProjection.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ export abstract class AbstractWorkerProjection<TView>
2828

2929
const projectionMethodsToWire = [
3030
'project',
31-
'_project',
31+
'_projectBatch',
3232
'ping',
3333
'getLastProjectedEvent'
3434
] as Extract<keyof T, string>[];
@@ -62,9 +62,10 @@ export abstract class AbstractWorkerProjection<TView>
6262
return true;
6363
}
6464

65-
/** @internal Expose protected projection path for worker RPC wiring */
66-
public override _project(event: IEvent, meta?: Record<string, any>): Promise<void> {
67-
return super._project(event, meta);
65+
/** @internal Project restore events in batches to avoid one Comlink roundtrip per event */
66+
async _projectBatch(events: IEvent[]): Promise<void> {
67+
for (const event of events)
68+
await this._project(event);
6869
}
6970

7071
/**

src/workers/WorkerProxyProjection.ts

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import { Worker, MessageChannel } from 'node:worker_threads';
22
import type {
3-
IEvent, IEventStorageReader, IEventStore, IExtendableLogger, ILogger, IViewLocker
3+
IEvent,
4+
IEventSet,
5+
IEventStorageReader,
6+
IEventStore,
7+
IExtendableLogger,
8+
ILogger,
9+
IViewLocker
410
} from '../interfaces/index.ts';
511
import type { IProxyProjection, IWorkerProjection, ProxyProjectionParams } from './interfaces/index.ts';
612
import * as Comlink from 'comlink';
@@ -18,6 +24,8 @@ export class WorkerProxyProjection<
1824
TProjection extends IWorkerProjection<TView> = IWorkerProjection<TView>
1925
> implements IProxyProjection<TView> {
2026

27+
static RESTORE_BATCH_SIZE = 5_000;
28+
2129
#worker?: Worker;
2230
readonly #workerInit: Promise<Worker>;
2331
readonly #remoteProjection: Comlink.Remote<TProjection>;
@@ -83,7 +91,12 @@ export class WorkerProxyProjection<
8391
this.viewLocker.unlock();
8492
}
8593

86-
/** Restore view state from not-yet-projected events */
94+
/**
95+
* Restore view state from not-yet-projected events.
96+
*
97+
* Events are projected in batches to reduce worker RPC overhead.
98+
* The batch size can be configured through {@link WorkerProxyProjection.RESTORE_BATCH_SIZE}.
99+
*/
87100
protected async _restore(eventStore: IEventStorageReader): Promise<void> {
88101
if (!this.#worker)
89102
await this.#workerInit;
@@ -92,17 +105,25 @@ export class WorkerProxyProjection<
92105
const lastEvent = await this.#remoteProjection.getLastProjectedEvent();
93106

94107
this.#logger?.debug(`retrieving ${lastEvent ? `events after ${describe(lastEvent)}` : 'all events'}...`);
95-
96108
const eventsIterable = eventStore.getEventsByTypes(this.#messageTypes, { afterEvent: lastEvent });
97109

98110
let eventsCount = 0;
99111
const startTs = Date.now();
112+
const batch: IEvent[] = [];
100113

101114
for await (const event of eventsIterable) {
102-
await this._project(event);
115+
batch.push(event);
103116
eventsCount += 1;
117+
118+
if (batch.length >= WorkerProxyProjection.RESTORE_BATCH_SIZE) {
119+
await this._projectBatch(batch);
120+
batch.length = 0;
121+
}
104122
}
105123

124+
if (batch.length)
125+
await this._projectBatch(batch);
126+
106127
this.#logger?.info(`view restored from ${eventsCount} event(s) in ${Date.now() - startTs} ms`);
107128
}
108129

@@ -134,8 +155,8 @@ export class WorkerProxyProjection<
134155
return this.#remoteProjection.project(event);
135156
}
136157

137-
protected async _project(event: IEvent): Promise<void> {
138-
await this.#remoteProjection._project(event);
158+
protected _projectBatch(batch: IEventSet): Promise<void> {
159+
return this.remoteProjection._projectBatch(batch);
139160
}
140161

141162
dispose() {

src/workers/interfaces/IWorkerProjection.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { IEvent, IProjection } from '../../interfaces/index.js';
1+
import type { IEvent, IEventSet, IProjection } from '../../interfaces/index.js';
22

33
export interface IWorkerProjection<TView> extends IProjection<TView> {
44

@@ -8,11 +8,11 @@ export interface IWorkerProjection<TView> extends IProjection<TView> {
88
getLastProjectedEvent(): Promise<IEvent | undefined>;
99

1010
/**
11-
* Projects an event without waiting for view lock readiness.
11+
* Project restore events in batches to avoid one Comlink roundtrip per event
1212
*
13-
* @internal Expose protected projection path for worker RPC wiring
13+
* @internal
1414
*/
15-
_project(event: IEvent, meta?: Record<string, any>): Promise<void>;
15+
_projectBatch(events: IEventSet): Promise<void>;
1616
}
1717

1818
export interface IWorkerProjectionType<

tests/integration/workers/AbstractWorkerProjection.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,30 @@ describe('AbstractWorkerProjection', () => {
211211
expect(await projectionProxy.view.getCounter()).toBe(2);
212212
});
213213

214+
it('restores event store events through worker batches', async () => {
215+
216+
const restoreBatchSize = WorkerProxyProjection.RESTORE_BATCH_SIZE;
217+
WorkerProxyProjection.RESTORE_BATCH_SIZE = 2;
218+
219+
try {
220+
await projectionProxy.restore(createEventStore([
221+
{ id: '1', type: 'somethingHappened' },
222+
{ id: '2', type: 'somethingHappened' },
223+
{ id: '3', type: 'somethingElseHappened' },
224+
{ id: '4', type: 'somethingHappened' },
225+
{ id: '5', type: 'somethingHappened' },
226+
{ id: '6', type: 'somethingHappened' }
227+
]));
228+
}
229+
finally {
230+
WorkerProxyProjection.RESTORE_BATCH_SIZE = restoreBatchSize;
231+
}
232+
233+
expect(await projectionProxy.view.getBatchSizes()).toEqual([2, 2, 1]);
234+
expect((await projectionProxy.view.getLastEvent())?.id).toBe('6');
235+
expect(await projectionProxy.view.getCounter()).toBe(5);
236+
});
237+
214238
it('restores only events after getLastEvent', async () => {
215239

216240
await projectionProxy.restore(createEventStore([

tests/integration/workers/fixtures/ProjectionFixture.cjs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ class ProjectionFixture extends AbstractWorkerProjection {
3333
this.view.increment();
3434
}
3535

36+
async _projectBatch(events) {
37+
this.view.recordBatchSize(events.length);
38+
await super._projectBatch(events);
39+
}
40+
3641
async slowHappened() {
3742
await new Promise(resolve => setTimeout(resolve, 50));
3843
this.view.increment();

tests/integration/workers/fixtures/ViewFixture.cjs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ class ViewFixture {
66
lock: 0,
77
unlock: 0
88
};
9+
#batchSizes = [];
910
#lastEvent = null;
1011
#skipIds = new Set();
1112
#readyPromise = Promise.resolve();
@@ -33,6 +34,14 @@ class ViewFixture {
3334
return { ...this.#calls };
3435
}
3536

37+
recordBatchSize(size) {
38+
this.#batchSizes.push(size);
39+
}
40+
41+
getBatchSizes() {
42+
return [...this.#batchSizes];
43+
}
44+
3645
isReady() {
3746
return this.ready;
3847
}

0 commit comments

Comments
 (0)