-
Notifications
You must be signed in to change notification settings - Fork 203
Expand file tree
/
Copy pathSegmentDestination.ts
More file actions
153 lines (134 loc) · 4.85 KB
/
SegmentDestination.ts
File metadata and controls
153 lines (134 loc) · 4.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import { DestinationPlugin } from '../plugin';
import {
PluginType,
SegmentAPIIntegration,
SegmentAPISettings,
SegmentEvent,
UpdateType,
} from '../types';
import { chunk, createPromise, getURL } from '../util';
import { uploadEvents } from '../api';
import type { SegmentClient } from '../analytics';
import { DestinationMetadataEnrichment } from './DestinationMetadataEnrichment';
import { QueueFlushingPlugin } from './QueueFlushingPlugin';
import { defaultApiHost } from '../constants';
import { checkResponseForErrors, translateHTTPError } from '../errors';
import { defaultConfig } from '../constants';
const MAX_EVENTS_PER_BATCH = 100;
const MAX_PAYLOAD_SIZE_IN_KB = 500;
export const SEGMENT_DESTINATION_KEY = 'Segment.io';
export class SegmentDestination extends DestinationPlugin {
type = PluginType.destination;
key = SEGMENT_DESTINATION_KEY;
private apiHost?: string;
private settingsResolve: () => void;
private settingsPromise: Promise<void>;
constructor() {
super();
// We don't timeout this promise. We strictly need the response from Segment before sending things
const { promise, resolve } = createPromise<void>();
this.settingsPromise = promise;
this.settingsResolve = resolve;
}
private sendEvents = async (events: SegmentEvent[]): Promise<void> => {
if (events.length === 0) {
return Promise.resolve();
}
// We're not sending events until Segment has loaded all settings
await this.settingsPromise;
const config = this.analytics?.getConfig() ?? defaultConfig;
const chunkedEvents: SegmentEvent[][] = chunk(
events,
config.maxBatchSize ?? MAX_EVENTS_PER_BATCH,
MAX_PAYLOAD_SIZE_IN_KB
);
let sentEvents: SegmentEvent[] = [];
let numFailedEvents = 0;
await Promise.all(
chunkedEvents.map(async (batch: SegmentEvent[]) => {
try {
const res = await uploadEvents({
writeKey: config.writeKey,
url: this.getEndpoint(),
events: batch,
});
checkResponseForErrors(res);
sentEvents = sentEvents.concat(batch);
} catch (e) {
this.analytics?.reportInternalError(translateHTTPError(e));
this.analytics?.logger.warn(e);
numFailedEvents += batch.length;
} finally {
await this.queuePlugin.dequeue(sentEvents);
}
})
);
if (sentEvents.length) {
if (config.debug === true) {
this.analytics?.logger.info(`Sent ${sentEvents.length} events`);
}
}
if (numFailedEvents) {
this.analytics?.logger.error(`Failed to send ${numFailedEvents} events.`);
}
return Promise.resolve();
};
private readonly queuePlugin = new QueueFlushingPlugin(this.sendEvents);
private getEndpoint(): string {
const config = this.analytics?.getConfig();
const hasProxy = !!(config?.proxy ?? '');
const useSegmentEndpoints = Boolean(config?.useSegmentEndpoints);
let baseURL = '';
let endpoint = '';
if (hasProxy) {
//baseURL is always config?.proxy if hasProxy
baseURL = config?.proxy ?? '';
if (useSegmentEndpoints) {
const isProxyEndsWithSlash = baseURL.endsWith('/');
endpoint = isProxyEndsWithSlash ? 'b' : '/b';
}
} else {
baseURL = this.apiHost ?? defaultApiHost;
}
try {
return getURL(baseURL, endpoint);
} catch (error) {
console.error('Error in getEndpoint:', `fallback to ${defaultApiHost}`);
return defaultApiHost;
}
}
configure(analytics: SegmentClient): void {
super.configure(analytics);
// If the client has a proxy we don't need to await for settings apiHost, we can send events directly
// Important! If new settings are required in the future you probably want to change this!
if (analytics.getConfig().proxy !== undefined) {
this.settingsResolve();
}
// Enrich events with the Destination metadata
this.add(new DestinationMetadataEnrichment(SEGMENT_DESTINATION_KEY));
this.add(this.queuePlugin);
}
// We block sending stuff to segment until we get the settings
update(settings: SegmentAPISettings, _type: UpdateType): void {
const segmentSettings = settings.integrations[
this.key
] as SegmentAPIIntegration;
if (
segmentSettings?.apiHost !== undefined &&
segmentSettings?.apiHost !== null
) {
//assign the api host from segment settings (domain/v1)
this.apiHost = `https://${segmentSettings.apiHost}/b`;
}
this.settingsResolve();
}
execute(event: SegmentEvent): Promise<SegmentEvent | undefined> {
// Execute the internal timeline here, the queue plugin will pick up the event and add it to the queue automatically
const enrichedEvent = super.execute(event);
return enrichedEvent;
}
async flush() {
// Wait until the queue is done restoring before flushing
return this.queuePlugin.flush();
}
}