Skip to content
163 changes: 163 additions & 0 deletions event-processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import { SDK_VERSION } from './utils/version';
import { ensureTrailingSlash } from './utils/ensureTrailingSlash';
import { LikeFetch } from './flagsmith-core';
import { IFlagsmithValue } from './types';

export const FLAG_EXPOSURE_EVENT = '$flag_exposure';
export const DEFAULT_EVENTS_API_URL = 'https://events.api.flagsmith.com/';
const DEFAULT_MAX_BUFFER = 1000;
const DEFAULT_FLUSH_INTERVAL = 10000;
const DEFAULT_RETRY_BACKOFF_MS = 1000;

export interface IEvent {
event: string;
feature_name: string | null;
identifier: string | null;
value: string | null;
traits: Record<string, IFlagsmithValue> | null;
metadata: Record<string, unknown> | null;
timestamp: number;
}

export interface EventProcessorOptions {
environmentKey: string;
fetch: LikeFetch;
eventsApiUrl?: string;
maxBuffer?: number;
flushInterval?: number;
log?: (...args: any[]) => void;
retryBackoffMs?: number;
}

interface EventArgs {
identifier: string | null;
value: IFlagsmithValue;
traits: Record<string, IFlagsmithValue> | null;
metadata: Record<string, unknown> | null;
}

interface TrackEventArgs extends EventArgs {
event: string;
}

interface TrackExposureArgs extends EventArgs {
featureName: string;
}

export class EventProcessor {
private endpoint: string;
private environmentKey: string;
private fetch: LikeFetch;
private log: (...args: any[]) => void;
private maxBuffer: number;
private flushInterval: number;
private retryBackoffMs: number;
private buffer: IEvent[] = [];
private dedupeKeys: Set<string> = new Set();
private timer: ReturnType<typeof setInterval> | null = null;

constructor(opts: EventProcessorOptions) {
const url = ensureTrailingSlash(opts.eventsApiUrl || DEFAULT_EVENTS_API_URL);
this.endpoint = `${url}v1/events`;
this.environmentKey = opts.environmentKey;
this.fetch = opts.fetch;
this.log = opts.log || (() => {});
this.maxBuffer = opts.maxBuffer ?? DEFAULT_MAX_BUFFER;
this.flushInterval = opts.flushInterval ?? DEFAULT_FLUSH_INTERVAL;
this.retryBackoffMs = opts.retryBackoffMs ?? DEFAULT_RETRY_BACKOFF_MS;
}

trackEvent(args: TrackEventArgs) {
this.bufferEvent(args.event, null, args.identifier, args.value, args.traits, args.metadata, false);
}

trackExposureEvent(args: TrackExposureArgs) {
this.bufferEvent(FLAG_EXPOSURE_EVENT, args.featureName, args.identifier, args.value, args.traits, args.metadata, true);
}

private bufferEvent(
event: string,
feature_name: string | null,
identifier: string | null,
value: IFlagsmithValue,
traits: Record<string, IFlagsmithValue> | null,
metadata: Record<string, unknown> | null,
dedupe: boolean,
) {
const stringValue = value != null ? String(value) : null;
if (dedupe) {
const key = JSON.stringify([event, feature_name, identifier, stringValue]);
if (this.dedupeKeys.has(key)) return;
this.dedupeKeys.add(key);
}
this.buffer.push({
event,
feature_name,
identifier,
value: stringValue,
traits,
metadata: { ...(metadata || {}), sdk_version: SDK_VERSION },
timestamp: Date.now(),
});
if (this.buffer.length >= this.maxBuffer) {
this.flush();
}
}

flush = async (): Promise<void> => {
if (!this.buffer.length) return;
const events = this.buffer;
this.buffer = [];
this.dedupeKeys.clear();
await this.postBatch(events, 0);
};

start() {
// stop() clears any existing timer and flushes; the flush is a no-op
// when the buffer is empty, which is the usual case on (re)start.
this.stop();
if (this.flushInterval > 0) {
this.timer = setInterval(this.flush, this.flushInterval);
this.timer?.unref?.();
}
}

stop() {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
// Best-effort, fire-and-forget: this flush is not awaited, so on rapid
// re-init buffered events may be lost. Callers needing a guarantee
// (SSR/teardown) should await flushEvents() instead.
this.flush();
}

private postBatch = async (events: IEvent[], attempt: number): Promise<void> => {
try {
const res = await this.fetch(this.endpoint, {
method: 'POST',
body: JSON.stringify({ events }),
headers: {
'Content-Type': 'application/json; charset=utf-8',
'X-Environment-Key': this.environmentKey,
...(SDK_VERSION ? { 'Flagsmith-SDK-User-Agent': `flagsmith-js-sdk/${SDK_VERSION}` } : {}),
},
});
if (!res.status || res.status < 200 || res.status >= 300) {
throw new Error(`Events: unexpected status ${res.status}`);
}
this.log('Events: flush successful');
} catch (err) {
if (attempt < 1) {
this.log('Events: flush failed, retrying', err);
await new Promise<void>((resolve) => {
const t = setTimeout(resolve, this.retryBackoffMs);
t?.unref?.();
});
return this.postBatch(events, attempt + 1);
}
this.log('Events: flush failed, dropping batch', err);
}
};
}
97 changes: 95 additions & 2 deletions flagsmith-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import {
IDatadogRum,
IFlags,
IFlagsmith,
IFlagsmithFeature,
IFlagsmithResponse,
IFlagsmithTrait,
IFlagsmithValue,
IInitConfig,
ISentryClient,
IState,
Expand All @@ -23,9 +25,10 @@ import getChanges from './utils/get-changes';
import angularFetch from './utils/angular-fetch';
import setDynatraceValue from './utils/set-dynatrace-value';
import { EvaluationContext } from './evaluation-context';
import { isTraitEvaluationContext, toEvaluationContext, toTraitEvaluationContextObject } from './utils/types';
import { isTraitEvaluationContext, resolveTraitValues, toEvaluationContext, toTraitEvaluationContextObject } from './utils/types';
import { ensureTrailingSlash } from './utils/ensureTrailingSlash';
import { SDK_VERSION } from './utils/version';
import { EventProcessor, FLAG_EXPOSURE_EVENT } from './event-processor';

export enum FlagSource {
"NONE" = "NONE",
Expand Down Expand Up @@ -119,7 +122,8 @@ const Flagsmith = class {
flags[feature.feature.name.toLowerCase().replace(/ /g, '_')] = {
id: feature.feature.id,
enabled: feature.enabled,
value: feature.feature_state_value
value: feature.feature_state_value,
...(feature.variant ? { variant: feature.variant } : {}),
};
});
traits.forEach(trait => {
Expand Down Expand Up @@ -290,7 +294,16 @@ const Flagsmith = class {
sentryClient: ISentryClient | null = null
withTraits?: ITraits|null= null
cacheOptions = {ttl:0, skipAPI: false, loadStale: false, storageKey: undefined as string|undefined}
private eventProcessor: EventProcessor | null = null
/**
* @experimental @internal Whether the events pipeline is enabled
* (enableEvents: true was passed to init).
*/
eventsEnabled = false
async init(config: IInitConfig) {
if (config.eventProcessorConfig && !config.enableEvents) {
throw new Error('Flagsmith: eventProcessorConfig requires enableEvents: true.');
}
const evaluationContext = toEvaluationContext(config.evaluationContext || this.evaluationContext);
try {
const {
Expand All @@ -308,6 +321,8 @@ const Flagsmith = class {
enableDynatrace,
enableLogs,
environmentID,
enableEvents,
eventProcessorConfig,
eventSourceUrl= "https://realtime.flagsmith.com/",
fetch: fetchImplementation,
headers,
Expand Down Expand Up @@ -441,6 +456,19 @@ const Flagsmith = class {
}
}

this.eventProcessor?.stop();
this.eventProcessor = null;
if (enableEvents) {
this.eventProcessor = new EventProcessor({
...(eventProcessorConfig || {}),
environmentKey: this.evaluationContext.environment!.apiKey,
fetch: _fetch,
log: (...args: any[]) => this.log(...args),
});
this.eventProcessor.start();
}
this.eventsEnabled = !!enableEvents;

//If the user specified default flags emit a changed event immediately
if (cacheFlags) {
if (AsyncStorage && this.canUseStorage) {
Expand Down Expand Up @@ -916,9 +944,74 @@ const Flagsmith = class {
}
this.evaluationEvent[this.evaluationContext.environment.apiKey][key] += 1;
}

this.updateEventStorage();
};

trackEvent = (event: string, opts?: {
identifier?: string | null;
value?: IFlagsmithValue;
traits?: ITraits;
metadata?: Record<string, unknown>;
}) => {
// No-op when events are disabled, mirroring enableAnalytics: false.
if (!this.eventProcessor) return;
if (event.startsWith('$')) {
throw new Error(`Flagsmith: event names starting with "$" are reserved; use trackExposureEvent to record "${FLAG_EXPOSURE_EVENT}".`);
}
this.eventProcessor.trackEvent({
event,
identifier: opts?.identifier ?? this.evaluationContext.identity?.identifier ?? null,
value: opts?.value ?? null,
traits: resolveTraitValues(opts?.traits ?? this.evaluationContext.identity?.traits),
metadata: opts?.metadata ?? null,
});
};

trackExposureEvent = (featureName: string, opts?: {
identifier?: string | null;
value?: IFlagsmithValue;
traits?: ITraits;
metadata?: Record<string, unknown>;
}) => {
// No-op when events are disabled, mirroring enableAnalytics: false.
if (!this.eventProcessor) return;
this.eventProcessor.trackExposureEvent({
featureName,
identifier: opts?.identifier ?? this.evaluationContext.identity?.identifier ?? null,
value: opts?.value ?? null,
traits: resolveTraitValues(opts?.traits ?? this.evaluationContext.identity?.traits),
metadata: opts?.metadata ?? null,
});
};

flushEvents = (): Promise<void> => this.eventProcessor ? this.eventProcessor.flush() : Promise.resolve();

getExperimentFlag = (featureName: string): IFlagsmithFeature | null => {
const key = featureName.toLowerCase().replace(/ /g, '_');
const flag = (this.flags && this.flags[key]) || null;
// When events are disabled this degrades to a plain flag read.
if (!this.eventProcessor) return flag;
const identifier = this.evaluationContext.identity?.identifier;
if (!identifier) {
this.log('Flagsmith: getExperimentFlag called without an identity; call identify() (optionally with transient: true) before using experiments to record an exposure. Returning environment flags; no exposure recorded.');
return flag;
}
if (!flag) {
this.log(`Flagsmith: getExperimentFlag called for "${featureName}" which does not exist. No exposure recorded.`);
return null;
}
if (!flag.variant) {
this.log(`Flagsmith: getExperimentFlag called for "${featureName}" which has no variant; experiments require a multivariate flag. No exposure recorded.`);
return flag;
}
if (this.loadingState.source !== FlagSource.SERVER) {
return flag;
}
this.trackExposureEvent(featureName, { value: flag.variant });
return flag;
};

private setLoadingState(loadingState: LoadingState) {
if (!deepEqual(loadingState, this.loadingState)) {
this.loadingState = { ...loadingState };
Expand Down
7 changes: 7 additions & 0 deletions react.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ export declare function useFlags<
F extends string | Record<string, any>,
T extends string = string
>(_flags: readonly (F | keyof F)[], _traits?: readonly T[]): UseFlagsReturn<F, T>;
/**
* Resolve an experiment flag for the identified user and record one
* `$flag_exposure` event. Returns the flag (or null) and never throws when
* events are disabled.
* @experimental @internal
*/
export declare function useExperiment(featureName: string): IFlagsmithFeature | null;
export declare const useFlagsmith: <F extends string | Record<string, any>,
T extends string = string>() => IFlagsmith<F, T>;
export declare const useFlagsmithLoading: () => LoadingState | undefined;
Loading
Loading