-
Notifications
You must be signed in to change notification settings - Fork 203
Expand file tree
/
Copy pathQueueFlushingPlugin.ts
More file actions
149 lines (133 loc) · 4.66 KB
/
QueueFlushingPlugin.ts
File metadata and controls
149 lines (133 loc) · 4.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import { createStore, Store } from '@segment/sovran-react-native';
import type { SegmentClient } from '../analytics';
import { defaultConfig } from '../constants';
import { UtilityPlugin } from '../plugin';
import { PluginType, SegmentEvent } from '../types';
import { createPromise } from '../util';
import { ErrorType, SegmentError } from '../errors';
/**
* This plugin manages a queue where all events get added to after timeline processing.
* It takes a onFlush callback to trigger any action particular to your destination sending events.
* It can autotrigger a flush of the queue when it reaches the config flushAt limit.
*/
export class QueueFlushingPlugin extends UtilityPlugin {
// Gets executed last to keep the queue after all timeline processing is done
type = PluginType.after;
private storeKey: string;
private isPendingUpload = false;
private queueStore: Store<{ events: SegmentEvent[] }> | undefined;
private onFlush: (events: SegmentEvent[]) => Promise<void>;
private isRestoredResolve: () => void;
private isRestored: Promise<void>;
private timeoutWarned = false;
/**
* @param onFlush callback to execute when the queue is flushed (either by reaching the limit or manually) e.g. code to upload events to your destination
* @param storeKey key to store the queue in the store. Must be unique per destination instance
* @param restoreTimeout time in ms to wait for the queue to be restored from the store before uploading events (default: 500ms)
*/
constructor(
onFlush: (events: SegmentEvent[]) => Promise<void>,
storeKey = 'events',
restoreTimeout = 1000
) {
super();
this.onFlush = onFlush;
this.storeKey = storeKey;
const { promise, resolve } = createPromise<void>(restoreTimeout);
this.isRestored = promise;
this.isRestoredResolve = resolve;
}
configure(analytics: SegmentClient): void {
super.configure(analytics);
const config = analytics?.getConfig() ?? defaultConfig;
// Create its own storage per SegmentDestination instance to support multiple instances
this.queueStore = createStore(
{ events: [] as SegmentEvent[] },
{
persist: {
storeId: `${config.writeKey}-${this.storeKey}`,
persistor: config.storePersistor,
saveDelay: config.storePersistorSaveDelay ?? 0,
onInitialized: () => {
this.isRestoredResolve();
},
},
}
);
}
async execute(event: SegmentEvent): Promise<SegmentEvent | undefined> {
await this.queueStore?.dispatch((state) => {
const events = [...state.events, event];
return { events };
});
return event;
}
/**
* Calls the onFlush callback with the events in the queue
*/
async flush() {
// Wait for the queue to be restored
try {
await this.isRestored;
if (this.timeoutWarned === true) {
this.analytics?.logger.info('Flush triggered successfully.');
this.timeoutWarned = false;
}
} catch (e) {
// If the queue is not restored before the timeout, we will notify but not block flushing events
if (this.timeoutWarned === false) {
this.analytics?.reportInternalError(
new SegmentError(
ErrorType.InitializationError,
'Queue restoration timeout',
e
)
);
this.analytics?.logger.warn(
'Flush triggered but queue restoration and settings loading not complete. Flush will be retried.',
e
);
this.timeoutWarned = true;
}
}
const events = (await this.queueStore?.getState(true))?.events ?? [];
if (!this.isPendingUpload) {
try {
this.isPendingUpload = true;
await this.onFlush(events);
} finally {
this.isPendingUpload = false;
}
}
}
/**
* Removes one or multiple events from the queue
* @param events events to remove
*/
async dequeue(events: SegmentEvent | SegmentEvent[]) {
await this.queueStore?.dispatch((state) => {
const eventsToRemove = Array.isArray(events) ? events : [events];
if (eventsToRemove.length === 0 || state.events.length === 0) {
return state;
}
const setToRemove = new Set(eventsToRemove);
const filteredEvents = state.events.filter((e) => !setToRemove.has(e));
return { events: filteredEvents };
});
}
/**
* Clear all events from the queue
*/
async dequeueEvents() {
await this.queueStore?.dispatch(() => {
return { events: [] };
});
}
/**
* * Returns the count of items in the queue
*/
async pendingEvents() {
const events = (await this.queueStore?.getState(true))?.events ?? [];
return events.length;
}
}