Skip to content

Commit 0cf9010

Browse files
committed
zen: tps rate limit
1 parent 812668a commit 0cf9010

6 files changed

Lines changed: 2871 additions & 4 deletions

File tree

packages/console/app/src/routes/zen/util/handler.ts

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import { Resource } from "@opencode-ai/console-resource"
4747
import { i18n, type Key } from "~/i18n"
4848
import { localeFromRequest } from "~/lib/language"
4949
import { createModelTpmLimiter } from "./modelTpmLimiter"
50+
import { createModelTpsLimiter } from "./modelTpsLimiter"
5051

5152
type ZenData = Awaited<ReturnType<typeof ZenData.list>>
5253
type RetryOptions = {
@@ -129,6 +130,8 @@ export async function handler(
129130
logger.metric({ source: billingSource })
130131
const modelTpmLimiter = createModelTpmLimiter(modelInfo.providers)
131132
const modelTpmLimits = await modelTpmLimiter?.check()
133+
const modelTpsLimiter = createModelTpsLimiter(modelInfo.providers)
134+
const modelTpsLimits = await modelTpsLimiter?.check()
132135

133136
const retriableRequest = async (retry: RetryOptions = { excludeProviders: [], retryCount: 0 }) => {
134137
const providerInfo = selectProvider(
@@ -142,6 +145,7 @@ export async function handler(
142145
retry,
143146
stickyProvider,
144147
modelTpmLimits,
148+
modelTpsLimits,
145149
)
146150
validateModelSettings(billingSource, authInfo)
147151
updateProviderKey(authInfo, providerInfo)
@@ -294,14 +298,17 @@ export async function handler(
294298

295299
let buffer = ""
296300
let responseLength = 0
301+
let timestampFirstByte = 0
302+
let timestampLastByte = 0
297303

298304
function pump(): Promise<void> {
299305
return (
300306
reader?.read().then(async ({ done, value: rawValue }) => {
301307
if (done) {
308+
const timestampLastByte = Date.now()
302309
logger.metric({
303310
response_length: responseLength,
304-
"timestamp.last_byte": Date.now(),
311+
"timestamp.last_byte": timestampLastByte,
305312
})
306313
dataDumper?.flush()
307314
await rateLimiter?.track()
@@ -311,6 +318,13 @@ export async function handler(
311318
const costInfo = calculateCost(modelInfo, usageInfo)
312319
await trialLimiter?.track(usageInfo)
313320
await modelTpmLimiter?.track(providerInfo.id, providerInfo.model, usageInfo)
321+
await modelTpsLimiter?.track(
322+
providerInfo.id,
323+
providerInfo.model,
324+
timestampFirstByte,
325+
timestampLastByte,
326+
usageInfo,
327+
)
314328
await trackUsage(sessionId, billingSource, authInfo, modelInfo, providerInfo, usageInfo, costInfo)
315329
await reload(billingSource, authInfo, costInfo)
316330
const cost = calculateOccurredCost(billingSource, costInfo)
@@ -321,10 +335,10 @@ export async function handler(
321335
}
322336

323337
if (responseLength === 0) {
324-
const now = Date.now()
338+
timestampFirstByte = Date.now()
325339
logger.metric({
326-
time_to_first_byte: now - startTimestamp,
327-
"timestamp.first_byte": now,
340+
time_to_first_byte: timestampFirstByte - startTimestamp,
341+
"timestamp.first_byte": timestampFirstByte,
328342
})
329343
}
330344

@@ -478,6 +492,7 @@ export async function handler(
478492
retry: RetryOptions,
479493
stickyProvider: string | undefined,
480494
modelTpmLimits: Record<string, number> | undefined,
495+
modelTpsLimits: Record<string, boolean> | undefined,
481496
) {
482497
const modelProvider = (() => {
483498
// Byok is top priority b/c if user set their own API key, we should use it
@@ -509,6 +524,11 @@ export async function handler(
509524
const usage = modelTpmLimits?.[`${provider.id}/${provider.model}`] ?? 0
510525
return usage < provider.tpmLimit * 1_000_000
511526
})
527+
.filter((provider) => {
528+
if (!provider.tpsGoal) return true
529+
const isLowTps = modelTpsLimits?.[`${provider.id}/${provider.model}`] ?? false
530+
return !isLowTps
531+
})
512532
.map((provider) => {
513533
topPriority = Math.min(topPriority, provider.priority)
514534
return provider
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import { and, Database, inArray, sql } from "@opencode-ai/console-core/drizzle/index.js"
2+
import { ModelTpsRateLimitTable } from "@opencode-ai/console-core/schema/ip.sql.js"
3+
import { UsageInfo } from "./provider/provider"
4+
5+
export function createModelTpsLimiter(providers: { id: string; model: string; tpsGoal?: number }[]) {
6+
const tpsGoals = Object.fromEntries(
7+
providers.flatMap((p) => {
8+
return p.tpsGoal ? [[`${p.id}/${p.model}`, p.tpsGoal]] : []
9+
}),
10+
)
11+
const ids = Object.keys(tpsGoals)
12+
if (ids.length === 0) return
13+
14+
const toInterval = (date: Date) =>
15+
parseInt(
16+
date
17+
.toISOString()
18+
.replace(/[^0-9]/g, "")
19+
.substring(0, 12),
20+
)
21+
const now = Date.now()
22+
const currInterval = toInterval(new Date(now))
23+
const prevInterval = toInterval(new Date(now - 60 * 1000))
24+
25+
return {
26+
check: async () => {
27+
const data = await Database.use((tx) =>
28+
tx
29+
.select()
30+
.from(ModelTpsRateLimitTable)
31+
.where(
32+
and(
33+
inArray(ModelTpsRateLimitTable.id, ids),
34+
inArray(ModelTpsRateLimitTable.interval, [currInterval, prevInterval]),
35+
),
36+
),
37+
)
38+
39+
// convert to map of model to summed count across current and previous intervals
40+
const result = data.reduce(
41+
(acc, curr) => {
42+
const existing = acc[curr.id] ?? { qualify: 0, unqualify: 0 }
43+
acc[curr.id] = {
44+
qualify: existing.qualify + curr.qualify,
45+
unqualify: existing.unqualify + curr.unqualify,
46+
}
47+
return acc
48+
},
49+
{} as Record<string, { qualify: number; unqualify: number }>,
50+
)
51+
52+
return Object.fromEntries(
53+
Object.entries(result).map(([id, { qualify, unqualify }]) => {
54+
const isLowTps = qualify + unqualify > 10 && qualify < unqualify
55+
return [id, isLowTps]
56+
}),
57+
)
58+
},
59+
track: async (provider: string, model: string, tsFirstByte: number, tsLastByte: number, usageInfo: UsageInfo) => {
60+
const id = `${provider}/${model}`
61+
if (!ids.includes(id)) return
62+
const tpsGoal = tpsGoals[id]
63+
if (!tpsGoal) return
64+
if (tsFirstByte <= 0 || tsLastByte <= 0) return
65+
const tokens = usageInfo.outputTokens
66+
if (tokens <= 10) return
67+
68+
const tps = (tokens / (tsLastByte - tsFirstByte)) * 1000
69+
const qualify = tps >= tpsGoal ? 1 : 0
70+
const unqualify = tps < tpsGoal ? 1 : 0
71+
await Database.use((tx) =>
72+
tx
73+
.insert(ModelTpsRateLimitTable)
74+
.values({
75+
id,
76+
interval: currInterval,
77+
qualify,
78+
unqualify,
79+
})
80+
.onDuplicateKeyUpdate({
81+
set: {
82+
qualify: sql`${ModelTpsRateLimitTable.qualify} + ${qualify}`,
83+
unqualify: sql`${ModelTpsRateLimitTable.unqualify} + ${unqualify}`,
84+
},
85+
}),
86+
)
87+
},
88+
}
89+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CREATE TABLE `model_tps_rate_limit` (
2+
`id` varchar(255) NOT NULL,
3+
`interval` bigint NOT NULL,
4+
`qualify` int NOT NULL,
5+
`unqualify` int NOT NULL,
6+
CONSTRAINT PRIMARY KEY(`id`,`interval`)
7+
);

0 commit comments

Comments
 (0)