Skip to content

Commit 6bccd33

Browse files
committed
chore: reconfigured cors options and updated sse reconnection in sdk
1 parent de0adf1 commit 6bccd33

4 files changed

Lines changed: 209 additions & 8 deletions

File tree

apps/api/src/app.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { toNodeHandler } from "better-auth/node";
22
import cors from "cors";
33
import express, { type Express } from "express";
4+
import { corsOptions } from "@/config/cors";
45
import { auth } from "@/lib/auth/auth";
56
import { requireAuth } from "@/middleware/auth";
67
import flagRoutes from "@/routes/flag/route";
@@ -12,12 +13,7 @@ import trackRoutes from "@/routes/track/route";
1213
const createApp = (): Express => {
1314
const app: Express = express();
1415

15-
app.use(
16-
cors({
17-
origin: "http://localhost:3000",
18-
credentials: true,
19-
})
20-
);
16+
app.use(cors(corsOptions));
2117

2218
app.all("/api/auth/*splat", toNodeHandler(auth));
2319

apps/api/src/config/cors.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import type { CorsOptionsDelegate } from "cors";
2+
import type { Request } from "express";
3+
import { env } from "@/config/env";
4+
5+
const SDK_PREFIXES = ["/api/flag-config", "/api/track", "/api/sse"];
6+
const ALLOWED_ADMIN_ORIGINS = [env.FRONTEND_URL, "http://localhost:3000"];
7+
8+
export const corsOptions: CorsOptionsDelegate = (req, callback) => {
9+
const expressReq = req as Request;
10+
11+
const origin = expressReq.header("Origin");
12+
const isSdkRoute = SDK_PREFIXES.some((path) =>
13+
expressReq.path.startsWith(path)
14+
);
15+
16+
if (isSdkRoute) {
17+
return callback(null, {
18+
origin: origin || true,
19+
credentials: true,
20+
methods: ["GET", "POST", "OPTIONS"],
21+
allowedHeaders: ["Content-Type", "X-Api-Key"],
22+
maxAge: 86_400,
23+
});
24+
}
25+
26+
if (origin && ALLOWED_ADMIN_ORIGINS.includes(origin)) {
27+
return callback(null, {
28+
origin: true,
29+
credentials: true,
30+
});
31+
}
32+
33+
callback(null, { origin: false });
34+
};

apps/api/src/lib/sse/service.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export function sseHandler(req: Request, res: Response, environmentId: string) {
4141
res.setHeader("Content-Type", "text/event-stream");
4242
res.setHeader("Cache-Control", "no-cache");
4343
res.setHeader("Connection", "keep-alive");
44+
res.setHeader("X-Accel-Buffering", "no"); // Disable nginx buffering if present
4445
res.flushHeaders();
4546

4647
if (!clients.has(environmentId)) {
@@ -50,9 +51,47 @@ export function sseHandler(req: Request, res: Response, environmentId: string) {
5051

5152
res.write("event: connected\ndata: {}\n\n");
5253

54+
const keepaliveInterval = setInterval(() => {
55+
try {
56+
if (!res.writable || res.destroyed) {
57+
clearInterval(keepaliveInterval);
58+
clients.get(environmentId)?.delete(res);
59+
if (clients.get(environmentId)?.size === 0) {
60+
clients.delete(environmentId);
61+
}
62+
return;
63+
}
64+
65+
res.write(": keepalive\n\n");
66+
} catch (error) {
67+
console.error(
68+
`[SSE] Error sending keepalive to environment ${environmentId}:`,
69+
error
70+
);
71+
clearInterval(keepaliveInterval);
72+
clients.get(environmentId)?.delete(res);
73+
if (clients.get(environmentId)?.size === 0) {
74+
clients.delete(environmentId);
75+
}
76+
}
77+
}, 30_000);
78+
5379
req.on("close", () => {
80+
clearInterval(keepaliveInterval);
5481
console.log(`[SSE] Client disconnected from environment: ${environmentId}`);
82+
5583
clients.get(environmentId)?.delete(res);
84+
85+
if (clients.get(environmentId)?.size === 0) {
86+
clients.delete(environmentId);
87+
}
88+
});
89+
90+
req.on("error", (_error) => {
91+
clearInterval(keepaliveInterval);
92+
93+
clients.get(environmentId)?.delete(res);
94+
5695
if (clients.get(environmentId)?.size === 0) {
5796
clients.delete(environmentId);
5897
}

sdk/javascript/src/client.ts

Lines changed: 134 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ export class FlagixClient {
3030
private isInitialized = false;
3131
private sseConnection: FlagStreamConnection | null = null;
3232
private readonly emitter: FlagixEventEmitter;
33+
private reconnectAttempts = 0;
34+
private reconnectTimeoutId: ReturnType<typeof setTimeout> | null = null;
35+
private isReconnecting = false;
36+
private hasEstablishedConnection = false;
37+
private readonly maxReconnectAttempts = Number.POSITIVE_INFINITY;
38+
private readonly baseReconnectDelay = 1000;
39+
private readonly maxReconnectDelay = 30_000;
3340

3441
constructor(options: FlagixClientOptions) {
3542
this.apiKey = options.apiKey;
@@ -161,23 +168,87 @@ export class FlagixClient {
161168
}
162169

163170
private async setupSSEListener(): Promise<void> {
171+
if (this.sseConnection) {
172+
try {
173+
this.sseConnection.close();
174+
} catch (error) {
175+
log(
176+
"warn",
177+
"[Flagix SDK] Error closing existing SSE connection",
178+
error
179+
);
180+
}
181+
this.sseConnection = null;
182+
}
183+
164184
const url = `${this.apiBaseUrl}/api/sse/stream`;
165185

166186
const source = await createEventSource(url, this.apiKey);
167187
if (!source) {
188+
log("warn", "[Flagix SDK] Failed to create EventSource. Retrying...");
189+
this.scheduleReconnect();
168190
return;
169191
}
170192

171193
this.sseConnection = source;
172194

173195
source.onopen = () => {
196+
this.reconnectAttempts = 0;
197+
this.isReconnecting = false;
198+
if (this.reconnectTimeoutId) {
199+
clearTimeout(this.reconnectTimeoutId);
200+
this.reconnectTimeoutId = null;
201+
}
202+
203+
// If this is a reconnection and not the first connection, refresh the cache
204+
// this ensures we have the latest flag values that may have changed while disconnected
205+
if (this.hasEstablishedConnection && this.isInitialized) {
206+
log(
207+
"info",
208+
"[Flagix SDK] SSE reconnected. Refreshing cache to sync with server..."
209+
);
210+
this.fetchInitialConfig().catch((error) => {
211+
log(
212+
"error",
213+
"[Flagix SDK] Failed to refresh cache after reconnection",
214+
error
215+
);
216+
});
217+
} else {
218+
this.hasEstablishedConnection = true;
219+
}
220+
174221
log("info", "[Flagix SDK] SSE connection established.");
175222
};
176223

177224
source.onerror = (error) => {
178-
log("error", "[Flagix SDK] SSE error", error);
225+
const eventSource = error.target as EventSource;
226+
const readyState = eventSource?.readyState;
227+
228+
// EventSource.readyState: 0 = CONNECTING, 1 = OPEN, 2 = CLOSED
229+
if (readyState === 2) {
230+
log(
231+
"warn",
232+
"[Flagix SDK] SSE connection closed. Attempting to reconnect..."
233+
);
234+
this.handleReconnect();
235+
} else if (readyState === 0) {
236+
log(
237+
"warn",
238+
"[Flagix SDK] SSE connection error (connecting state)",
239+
error
240+
);
241+
} else {
242+
log("error", "[Flagix SDK] SSE error", error);
243+
this.handleReconnect();
244+
}
179245
};
180246

247+
// Listen for the "connected" event from the server
248+
source.addEventListener("connected", () => {
249+
log("info", "[Flagix SDK] SSE connection confirmed by server.");
250+
});
251+
181252
source.addEventListener(EVENT_TO_LISTEN, (event) => {
182253
try {
183254
const data = JSON.parse(event.data);
@@ -195,6 +266,54 @@ export class FlagixClient {
195266
});
196267
}
197268

269+
private handleReconnect(): void {
270+
if (this.isReconnecting || !this.isInitialized) {
271+
return;
272+
}
273+
274+
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
275+
log(
276+
"error",
277+
"[Flagix SDK] Max reconnection attempts reached. Stopping reconnection."
278+
);
279+
return;
280+
}
281+
282+
this.isReconnecting = true;
283+
this.scheduleReconnect();
284+
}
285+
286+
private scheduleReconnect(): void {
287+
if (this.reconnectTimeoutId) {
288+
clearTimeout(this.reconnectTimeoutId);
289+
}
290+
291+
// Calculate exponential backoff delay with jitter
292+
const delay = Math.min(
293+
this.baseReconnectDelay * 2 ** this.reconnectAttempts,
294+
this.maxReconnectDelay
295+
);
296+
// Add ±25% jitter to prevent thundering herd
297+
const jitter = delay * 0.25 * (Math.random() * 2 - 1);
298+
const finalDelay = Math.max(100, delay + jitter);
299+
300+
this.reconnectAttempts++;
301+
302+
log(
303+
"info",
304+
`[Flagix SDK] Scheduling SSE reconnection attempt ${this.reconnectAttempts} in ${Math.round(finalDelay)}ms...`
305+
);
306+
307+
this.reconnectTimeoutId = setTimeout(() => {
308+
this.isReconnecting = false;
309+
this.reconnectTimeoutId = null;
310+
this.setupSSEListener().catch((error) => {
311+
log("error", "[Flagix SDK] Failed to reconnect SSE", error);
312+
this.handleReconnect();
313+
});
314+
}, finalDelay);
315+
}
316+
198317
private async fetchSingleFlagConfig(
199318
flagKey: string,
200319
type: FlagUpdateType
@@ -281,8 +400,21 @@ export class FlagixClient {
281400
* Closes the Server-Sent Events (SSE) connection and cleans up resources.
282401
*/
283402
close(): void {
403+
if (this.reconnectTimeoutId) {
404+
clearTimeout(this.reconnectTimeoutId);
405+
this.reconnectTimeoutId = null;
406+
}
407+
408+
this.isReconnecting = false;
409+
this.reconnectAttempts = 0;
410+
this.hasEstablishedConnection = false;
411+
284412
if (this.sseConnection) {
285-
this.sseConnection.close();
413+
try {
414+
this.sseConnection.close();
415+
} catch (error) {
416+
log("warn", "[Flagix SDK] Error closing SSE connection", error);
417+
}
286418
this.sseConnection = null;
287419
log("info", "[Flagix SDK] SSE connection closed.");
288420
}

0 commit comments

Comments
 (0)