-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathAbstractWorkerProjection.ts
More file actions
77 lines (67 loc) · 2.53 KB
/
Copy pathAbstractWorkerProjection.ts
File metadata and controls
77 lines (67 loc) · 2.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import { isMainThread } from 'node:worker_threads';
import { AbstractProjection } from '../AbstractProjection.ts';
import {
workerProxyFactory as createWorkerProxyFactory,
createWorkerInstance as createWorkerProjectionInstance,
type ProjectionView
} from './utils/index.ts';
import type { IWorkerProjection, IWorkerProjectionType } from './interfaces/index.ts';
import type { IContainer, IEvent } from '../interfaces/index.ts';
/**
* Projection base class that can run projection handlers and the associated view in a worker thread
* to isolate CPU-heavy work and keep the main thread responsive
*/
export abstract class AbstractWorkerProjection<TView>
extends AbstractProjection<TView>
implements IWorkerProjection<TView> {
/**
* In a worker thread, creates and exposes the projection singleton.
*/
static createInstanceInWorkerThread<V, T extends AbstractWorkerProjection<V>>(
this: new () => T,
factory?: () => T
): T | undefined {
if (isMainThread)
return undefined;
const projectionMethodsToWire = [
'project',
'_projectBatch',
'ping',
'getLastProjectedEvent'
] as Extract<keyof T, string>[];
if (factory)
return createWorkerProjectionInstance(factory, projectionMethodsToWire);
return createWorkerProjectionInstance(this, projectionMethodsToWire);
}
/**
* Creates a factory that returns a `WorkerProxyProjection` for this projection type.
* Use it in the main thread (for example, `builder.registerProjection(MyProjection.workerProxyFactory)`),
* so events are proxied to the worker instance while exposing the remote view API.
*/
static workerProxyFactory<
TProjection extends IWorkerProjection<any>,
TContainer extends IContainer = IContainer,
TView = ProjectionView<TProjection>
>(this: IWorkerProjectionType<TView, TProjection>, container?: TContainer) {
return createWorkerProxyFactory(this)(container);
}
static get workerModulePath(): string {
throw new Error('not implemented');
}
/** @internal Responds to a ping from the main thread to confirm the worker is alive */
// eslint-disable-next-line class-methods-use-this
public ping(): true {
return true;
}
/** @internal Project restore events in batches to avoid one Comlink roundtrip per event */
async _projectBatch(events: IEvent[]): Promise<void> {
for (const event of events)
await this._project(event);
}
/**
* Returns the last projected event if the view implements IEventLocker, otherwise undefined.
*/
public async getLastProjectedEvent(): Promise<IEvent | undefined> {
return this._eventLocker?.getLastEvent();
}
}