Skip to content

Commit fcc9afb

Browse files
committed
feat: refactotr
1 parent ae160f1 commit fcc9afb

10 files changed

Lines changed: 772 additions & 536 deletions
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
import {
2+
CIRCUIT_COOLDOWN_MS,
3+
CIRCUIT_FAILURE_THRESHOLD,
4+
CIRCUIT_FAILURE_WINDOW_MS,
5+
DEFAULT_FALLBACK_CHAINS,
6+
} from "./traffic-constants";
7+
import type {
8+
CircuitState,
9+
CircuitStateStatus,
10+
DispatchDecision,
11+
QueuedRequest,
12+
} from "./traffic-controller-internal";
13+
import { extractStatusCode } from "./traffic-error-utils";
14+
import { CircuitBreakerOpenError } from "./traffic-errors";
15+
import type { TrafficRequestMetadata } from "./traffic-types";
16+
17+
export class TrafficCircuitBreaker {
18+
private readonly circuitBreakers = new Map<string, CircuitState>();
19+
private readonly fallbackChains: Map<string, string[]>;
20+
private readonly buildRateLimitKey: (metadata?: TrafficRequestMetadata) => string;
21+
22+
constructor(options: {
23+
fallbackChains?: Record<string, string[]>;
24+
buildRateLimitKey: (metadata?: TrafficRequestMetadata) => string;
25+
}) {
26+
this.buildRateLimitKey = options.buildRateLimitKey;
27+
const chains = options.fallbackChains ?? DEFAULT_FALLBACK_CHAINS;
28+
this.fallbackChains = new Map(Object.entries(chains));
29+
}
30+
31+
resolve(next: QueuedRequest): DispatchDecision | null {
32+
const visited = new Set<string>();
33+
34+
while (true) {
35+
const key = this.buildRateLimitKey(next.request.metadata);
36+
next.circuitKey = key;
37+
38+
const model = next.request.metadata?.model;
39+
if (model) visited.add(model);
40+
41+
const evaluation = this.evaluateCircuitState(key);
42+
next.circuitStatus = evaluation.state;
43+
44+
if (evaluation.allowRequest) return null;
45+
46+
const fallback = this.findFallbackModel(next.request.metadata, visited);
47+
if (!fallback || !next.request.createFallbackRequest) {
48+
next.reject(
49+
new CircuitBreakerOpenError(
50+
`Circuit open for ${key}`,
51+
next.request.metadata,
52+
evaluation.retryAfterMs,
53+
),
54+
);
55+
return { kind: "skip" };
56+
}
57+
58+
const fallbackRequest = next.request.createFallbackRequest(fallback);
59+
if (!fallbackRequest) return { kind: "skip" };
60+
61+
next.request = fallbackRequest;
62+
next.attempt = 1;
63+
next.rateLimitKey = undefined;
64+
next.etaMs = undefined;
65+
next.circuitKey = undefined;
66+
next.circuitStatus = undefined;
67+
}
68+
}
69+
70+
markTrial(item: QueuedRequest): void {
71+
const key = item.circuitKey;
72+
if (!key) return;
73+
const state = this.circuitBreakers.get(key);
74+
if (state && state.status === "half-open" && !state.trialInFlight) {
75+
state.trialInFlight = true;
76+
}
77+
}
78+
79+
recordSuccess(metadata?: TrafficRequestMetadata): void {
80+
const key = this.buildRateLimitKey(metadata);
81+
this.circuitBreakers.delete(key);
82+
}
83+
84+
recordFailure(metadata: TrafficRequestMetadata | undefined, error: unknown): void {
85+
const status = extractStatusCode(error);
86+
if (!this.isCircuitBreakerStatus(status)) {
87+
this.circuitBreakers.delete(this.buildRateLimitKey(metadata));
88+
return;
89+
}
90+
91+
const key = this.buildRateLimitKey(metadata);
92+
const now = Date.now();
93+
const state =
94+
this.circuitBreakers.get(key) ??
95+
({ status: "closed", failureTimestamps: [] } as CircuitState);
96+
97+
state.failureTimestamps = state.failureTimestamps.filter(
98+
(t) => now - t <= CIRCUIT_FAILURE_WINDOW_MS,
99+
);
100+
state.failureTimestamps.push(now);
101+
102+
if (
103+
state.status === "half-open" ||
104+
state.failureTimestamps.length >= CIRCUIT_FAILURE_THRESHOLD
105+
) {
106+
state.status = "open";
107+
state.openedAt = now;
108+
state.trialInFlight = false;
109+
}
110+
111+
this.circuitBreakers.set(key, state);
112+
}
113+
114+
private evaluateCircuitState(key: string): {
115+
allowRequest: boolean;
116+
state: CircuitStateStatus;
117+
retryAfterMs?: number;
118+
} {
119+
const state = this.circuitBreakers.get(key);
120+
if (!state) return { allowRequest: true, state: "closed" };
121+
122+
const now = Date.now();
123+
124+
if (state.status === "open") {
125+
const elapsed = state.openedAt ? now - state.openedAt : 0;
126+
if (elapsed >= CIRCUIT_COOLDOWN_MS) {
127+
state.status = "half-open";
128+
state.trialInFlight = false;
129+
state.failureTimestamps = [];
130+
return { allowRequest: true, state: "half-open" };
131+
}
132+
return {
133+
allowRequest: false,
134+
state: "open",
135+
retryAfterMs: CIRCUIT_COOLDOWN_MS - elapsed,
136+
};
137+
}
138+
139+
if (state.status === "half-open" && state.trialInFlight) {
140+
return { allowRequest: false, state: "half-open" };
141+
}
142+
143+
return { allowRequest: true, state: state.status };
144+
}
145+
146+
private findFallbackModel(
147+
metadata: TrafficRequestMetadata | undefined,
148+
visitedModels: Set<string>,
149+
): string | undefined {
150+
const currentModel = metadata?.model;
151+
if (!currentModel) {
152+
return undefined;
153+
}
154+
155+
const chain = this.fallbackChains.get(currentModel);
156+
if (!chain) {
157+
return undefined;
158+
}
159+
160+
const provider = metadata?.provider;
161+
for (const candidate of chain) {
162+
if (visitedModels.has(candidate)) {
163+
continue;
164+
}
165+
166+
const candidateKey = this.buildRateLimitKey({
167+
provider,
168+
model: candidate,
169+
});
170+
171+
const evaluation = this.evaluateCircuitState(candidateKey);
172+
if (evaluation.allowRequest) {
173+
visitedModels.add(candidate);
174+
return candidate;
175+
}
176+
}
177+
178+
return undefined;
179+
}
180+
181+
private isCircuitBreakerStatus(status?: number): boolean {
182+
return status === 429 || (status !== undefined && status >= 500);
183+
}
184+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
export const MAX_RETRY_ATTEMPTS = 3;
2+
export const TIMEOUT_RETRY_ATTEMPTS = 2;
3+
4+
export const RATE_LIMIT_BASE_BACKOFF_MS = 500;
5+
export const SERVER_ERROR_BASE_BACKOFF_MS = 1000;
6+
export const TIMEOUT_BASE_BACKOFF_MS = 750;
7+
8+
export const RATE_LIMIT_JITTER_FACTOR = 0.35;
9+
export const SERVER_ERROR_JITTER_FACTOR = 0.8;
10+
export const TIMEOUT_JITTER_FACTOR = 0.5;
11+
12+
export const CIRCUIT_FAILURE_THRESHOLD = 5;
13+
export const CIRCUIT_FAILURE_WINDOW_MS = 10_000;
14+
export const CIRCUIT_COOLDOWN_MS = 30_000;
15+
16+
export const RATE_LIMIT_EXHAUSTION_BUFFER = 1;
17+
export const RATE_LIMIT_PROBE_DELAY_MS = 50;
18+
export const RATE_LIMIT_MIN_PACE_INTERVAL_MS = 10;
19+
export const RATE_LIMIT_NEXT_ALLOWED_UPDATE_THRESHOLD_MS = 10;
20+
21+
export const DEFAULT_FALLBACK_CHAINS: Record<string, string[]> = {
22+
"gpt-4o": ["gpt-4o-mini", "gpt-3.5"],
23+
};
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import type {
2+
TrafficPriority,
3+
TrafficRequest,
4+
TrafficRequestMetadata,
5+
TrafficRequestType,
6+
} from "./traffic-types";
7+
8+
export type Scheduler = (callback: () => void) => void;
9+
10+
export type DispatchDecision =
11+
| { kind: "dispatch" }
12+
| { kind: "skip" }
13+
| { kind: "wait"; wakeUpAt?: number };
14+
15+
export type CircuitStateStatus = "closed" | "open" | "half-open";
16+
17+
export interface CircuitState {
18+
status: CircuitStateStatus;
19+
failureTimestamps: number[];
20+
openedAt?: number;
21+
trialInFlight?: boolean;
22+
}
23+
24+
export interface RateLimitWindowState {
25+
limit: number;
26+
remaining: number;
27+
resetAt: number;
28+
reserved: number;
29+
nextAllowedAt: number;
30+
}
31+
32+
type BivariantHandler<TArgs extends unknown[]> = {
33+
bivarianceHack(...args: TArgs): void;
34+
}["bivarianceHack"];
35+
36+
export interface QueuedRequest<TResponse = unknown> {
37+
type: TrafficRequestType;
38+
request: TrafficRequest<TResponse>;
39+
resolve: BivariantHandler<[TResponse | PromiseLike<TResponse>]>;
40+
reject: BivariantHandler<[reason?: unknown]>;
41+
attempt: number;
42+
priority: TrafficPriority;
43+
tenantId: string;
44+
45+
rateLimitKey?: string;
46+
etaMs?: number;
47+
48+
circuitKey?: string;
49+
circuitStatus?: CircuitStateStatus;
50+
51+
extractUsage?: TrafficRequest<TResponse>["extractUsage"];
52+
}

0 commit comments

Comments
 (0)