Skip to content

Commit 0c3be70

Browse files
committed
zen: batch balance calculation
1 parent 74f7319 commit 0c3be70

2 files changed

Lines changed: 57 additions & 8 deletions

File tree

packages/console/app/src/routes/zen/util/handler.ts

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import { i18n, type Key } from "~/i18n"
4747
import { localeFromRequest } from "~/lib/language"
4848
import { createModelTpmLimiter } from "./modelTpmLimiter"
4949
import { createModelTpsLimiter } from "./modelTpsLimiter"
50+
import { accumulateUsage, HOT_WORKSPACES } from "./usageBatcher"
5051

5152
type ZenData = Awaited<ReturnType<typeof ZenData.list>>
5253
type RetryOptions = {
@@ -981,6 +982,19 @@ export async function handler(
981982
authInfo = authInfo!
982983

983984
const cost = centsToMicroCents(totalCostInCent)
985+
986+
// For hot workspaces, batch balance/usage updates through Redis to avoid
987+
// row-level lock contention on BillingTable/UserTable. Returns the amount
988+
// to flush this request, or null to skip the DB writes entirely.
989+
const balanceFlush = await (async () => {
990+
if (billingSource !== "subscription" && billingSource !== "lite" && HOT_WORKSPACES.has(authInfo.workspaceID)) {
991+
const workspaceCost = billingSource === "free" || billingSource === "byok" ? 0 : cost
992+
const flush = await accumulateUsage(authInfo.workspaceID, authInfo.user.id, workspaceCost, cost)
993+
return { batched: true as const, flush }
994+
}
995+
return { batched: false as const, flush: null }
996+
})()
997+
984998
await Database.use((db) =>
985999
Promise.all([
9861000
db.insert(UsageTable).values({
@@ -1082,18 +1096,22 @@ export async function handler(
10821096
]
10831097
}
10841098

1099+
// Batched hot workspace: skip DB writes unless this request is the flush.
1100+
if (balanceFlush.batched && !balanceFlush.flush) return []
1101+
1102+
const workspaceDelta = balanceFlush.flush?.workspaceCost ?? cost
1103+
const userDelta = balanceFlush.flush?.userCost ?? cost
1104+
const balanceDelta = billingSource === "free" || billingSource === "byok" ? 0 : workspaceDelta
1105+
10851106
return [
10861107
db
10871108
.update(BillingTable)
10881109
.set({
1089-
balance:
1090-
billingSource === "free" || billingSource === "byok"
1091-
? sql`${BillingTable.balance} - ${0}`
1092-
: sql`${BillingTable.balance} - ${cost}`,
1110+
balance: sql`${BillingTable.balance} - ${balanceDelta}`,
10931111
monthlyUsage: sql`
10941112
CASE
1095-
WHEN MONTH(${BillingTable.timeMonthlyUsageUpdated}) = MONTH(now()) AND YEAR(${BillingTable.timeMonthlyUsageUpdated}) = YEAR(now()) THEN ${BillingTable.monthlyUsage} + ${cost}
1096-
ELSE ${cost}
1113+
WHEN MONTH(${BillingTable.timeMonthlyUsageUpdated}) = MONTH(now()) AND YEAR(${BillingTable.timeMonthlyUsageUpdated}) = YEAR(now()) THEN ${BillingTable.monthlyUsage} + ${workspaceDelta}
1114+
ELSE ${workspaceDelta}
10971115
END
10981116
`,
10991117
timeMonthlyUsageUpdated: sql`now()`,
@@ -1104,8 +1122,8 @@ export async function handler(
11041122
.set({
11051123
monthlyUsage: sql`
11061124
CASE
1107-
WHEN MONTH(${UserTable.timeMonthlyUsageUpdated}) = MONTH(now()) AND YEAR(${UserTable.timeMonthlyUsageUpdated}) = YEAR(now()) THEN ${UserTable.monthlyUsage} + ${cost}
1108-
ELSE ${cost}
1125+
WHEN MONTH(${UserTable.timeMonthlyUsageUpdated}) = MONTH(now()) AND YEAR(${UserTable.timeMonthlyUsageUpdated}) = YEAR(now()) THEN ${UserTable.monthlyUsage} + ${userDelta}
1126+
ELSE ${userDelta}
11091127
END
11101128
`,
11111129
timeMonthlyUsageUpdated: sql`now()`,
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { Resource } from "@opencode-ai/console-resource"
2+
import { getRedis } from "./redis"
3+
4+
// Workspaces whose balance/usage updates should be batched in Redis to avoid
5+
// row-level lock contention on BillingTable / UserTable.
6+
export const HOT_WORKSPACES = new Set<string>([
7+
"wrk_01KJ8PX5CH50Y4YNGNS9ZR8YDC", // invoice
8+
])
9+
10+
// Probability that a given request flushes the accumulated totals to the DB.
11+
// Lower = fewer DB writes, more staleness. ~1 in 100 -> ~1% of requests write.
12+
const FLUSH_PROBABILITY = 1 / 100
13+
14+
export async function accumulateUsage(workspaceID: string, userID: string, workspaceCost: number, userCost: number) {
15+
const redis = getRedis()
16+
const wKey = `${Resource.App.stage}:usage:wrk:${workspaceID}`
17+
const uKey = `${Resource.App.stage}:usage:usr:${workspaceID}:${userID}`
18+
19+
await Promise.all([redis.incrby(wKey, workspaceCost), redis.incrby(uKey, userCost)])
20+
21+
if (Math.random() > FLUSH_PROBABILITY) return null
22+
23+
// Atomically take the current totals and reset to 0
24+
const [workspaceTotal, userTotal] = await Promise.all([redis.getdel<number>(wKey), redis.getdel<number>(uKey)])
25+
26+
const workspaceFlush = Number(workspaceTotal ?? 0)
27+
const userFlush = Number(userTotal ?? 0)
28+
if (workspaceFlush === 0 && userFlush === 0) return null
29+
30+
return { workspaceCost: workspaceFlush, userCost: userFlush }
31+
}

0 commit comments

Comments
 (0)