Skip to content

Commit 17db5bf

Browse files
committed
perf(execution): parallelize preflight gates, cache deployed state, memoize Anthropic client
- Memoize Anthropic + Azure-Anthropic SDK clients (new client-cache.ts) keyed by apiKey (+beta header; +baseURL/version/pinnedIP for Azure) so HTTP keep-alive connections are reused instead of a fresh TLS handshake per call. apiKey is the tenant boundary. - Parallelize the read-only preflight gates in preprocessing.ts (ban + subscription, then usage + org-member + rate-limit) while preserving exact error precedence (ban 403 -> usage 402 -> rate 429) and keeping the sole write (admission reservation) last. - Parallelize the independent workflow-state and env-var loads in execution-core. - Cache deployed workflow state by immutable deploymentVersionId with deep-clone-on-read, oldest-first eviction, and a 5-min TTL bounding the credential-mapping edge across ECS tasks. - Parallelize the independent personal-subscription + membership queries in getHighestPrioritySubscription. - BYOK: drop the redundant getWorkspaceById existence check (auth already validates the workspace); read the key list fresh every call for zero cross-instance staleness. Billing/usage/ban/permission reads stay fresh on the primary (no cache, no replica). Adds tests for every new mechanism and fixes a pre-existing vitest class-mock incompatibility that had execution-core.test.ts fully red on staging.
1 parent 2c1392e commit 17db5bf

15 files changed

Lines changed: 1157 additions & 284 deletions

File tree

apps/sim/lib/api-key/byok.test.ts

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
*/
44
import { beforeEach, describe, expect, it, vi } from 'vitest'
55

6-
const { mockOrderBy, mockGetWorkspaceById, mockDecryptSecret } = vi.hoisted(() => ({
6+
const { mockOrderBy, mockDecryptSecret } = vi.hoisted(() => ({
77
mockOrderBy: vi.fn(),
8-
mockGetWorkspaceById: vi.fn(),
98
mockDecryptSecret: vi.fn(),
109
}))
1110

@@ -19,10 +18,6 @@ vi.mock('@sim/db', () => ({
1918
},
2019
}))
2120

22-
vi.mock('@/lib/workspaces/permissions/utils', () => ({
23-
getWorkspaceById: mockGetWorkspaceById,
24-
}))
25-
2621
vi.mock('@/lib/core/security/encryption', () => ({
2722
decryptSecret: mockDecryptSecret,
2823
}))
@@ -70,7 +65,6 @@ const storedKey = (id: string) => ({ id, encryptedApiKey: `encrypted-${id}` })
7065
describe('getBYOKKey', () => {
7166
beforeEach(() => {
7267
vi.clearAllMocks()
73-
mockGetWorkspaceById.mockResolvedValue({ id: 'workspace' })
7468
mockOrderBy.mockResolvedValue([])
7569
mockDecryptSecret.mockImplementation(async (encrypted: string) => ({
7670
decrypted: encrypted.replace('encrypted-', 'decrypted-'),
@@ -80,13 +74,6 @@ describe('getBYOKKey', () => {
8074
it('returns null when no workspaceId is provided', async () => {
8175
expect(await getBYOKKey(undefined, 'openai')).toBeNull()
8276
expect(await getBYOKKey(null, 'openai')).toBeNull()
83-
expect(mockGetWorkspaceById).not.toHaveBeenCalled()
84-
})
85-
86-
it('returns null when the workspace does not exist', async () => {
87-
mockGetWorkspaceById.mockResolvedValue(null)
88-
89-
expect(await getBYOKKey(uniqueWorkspaceId(), 'openai')).toBeNull()
9077
})
9178

9279
it('returns null when the workspace has no keys for the provider', async () => {
@@ -123,6 +110,17 @@ describe('getBYOKKey', () => {
123110
])
124111
})
125112

113+
it('reads the key list fresh from the database on every call', async () => {
114+
const workspaceId = uniqueWorkspaceId()
115+
mockOrderBy.mockResolvedValue([storedKey('key-1')])
116+
117+
await getBYOKKey(workspaceId, 'openai')
118+
await getBYOKKey(workspaceId, 'openai')
119+
await getBYOKKey(workspaceId, 'openai')
120+
121+
expect(mockOrderBy).toHaveBeenCalledTimes(3)
122+
})
123+
126124
it('tracks rotation independently per provider within a workspace', async () => {
127125
const workspaceId = uniqueWorkspaceId()
128126
mockOrderBy.mockResolvedValue([storedKey('key-1'), storedKey('key-2')])

apps/sim/lib/api-key/byok.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { getRotatingApiKey } from '@/lib/core/config/api-keys'
66
import { env } from '@/lib/core/config/env'
77
import { isHosted } from '@/lib/core/config/env-flags'
88
import { decryptSecret } from '@/lib/core/security/encryption'
9-
import { getWorkspaceById } from '@/lib/workspaces/permissions/utils'
109
import { getHostedModels } from '@/providers/models'
1110
import { PROVIDER_PLACEHOLDER_KEY } from '@/providers/utils'
1211
import { useProvidersStore } from '@/stores/providers/store'
@@ -37,6 +36,11 @@ function nextRotationIndex(poolKey: string, poolSize: number): number {
3736
* multiple keys stored for the provider, requests round-robin across them in
3837
* creation order. A key that fails to decrypt is skipped in favor of the next
3938
* one in the pool.
39+
*
40+
* The key list is read fresh from the database on every call rather than
41+
* cached: BYOK lookups are not a hot database query, and reading fresh keeps
42+
* key revocation/rotation effective immediately across every ECS task with no
43+
* cross-instance cache-coherence concern.
4044
*/
4145
export async function getBYOKKey(
4246
workspaceId: string | undefined | null,
@@ -47,11 +51,6 @@ export async function getBYOKKey(
4751
}
4852

4953
try {
50-
const activeWorkspace = await getWorkspaceById(workspaceId)
51-
if (!activeWorkspace) {
52-
return null
53-
}
54-
5554
const keys = await db
5655
.select({ id: workspaceBYOKKeys.id, encryptedApiKey: workspaceBYOKKeys.encryptedApiKey })
5756
.from(workspaceBYOKKeys)

apps/sim/lib/billing/calculations/usage-monitor.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,10 @@ export async function checkOrgMemberUsageLimit(
467467
return { isExceeded: false, currentUsage: 0, limit: null }
468468
}
469469

470+
// Resolve the member cap first and short-circuit when none is set — the
471+
// common case. Computing usage is only worthwhile once a cap exists, so the
472+
// two queries stay sequential rather than racing (parallelizing would add a
473+
// usage query on every uncapped member's execution).
470474
const limit = await getOrgMemberUsageLimit(organizationId, userId)
471475
if (limit === null) {
472476
return { isExceeded: false, currentUsage: 0, limit: null }
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { beforeEach, describe, expect, it, vi } from 'vitest'
5+
6+
/**
7+
* Drizzle mock for `getHighestPrioritySubscription`. It issues up to four
8+
* queries keyed by table:
9+
* - `subscription` for the user's personal subs (parallelized with members)
10+
* - `member` for the user's org memberships (parallelized with subs)
11+
* - `organization` for the org-existence follow-up
12+
* - `subscription` again for the org-scoped subs follow-up
13+
*
14+
* The mock routes results by the table object passed to `.from()`, serving the
15+
* (twice-read) `subscription` table from a FIFO queue (first read = personal,
16+
* second = org). It records which tables were queried so we can assert the
17+
* parallelized pair both run and that follow-ups are skipped when appropriate.
18+
*
19+
* Table sentinels and shared mock state live inside `vi.hoisted` so the
20+
* `vi.mock` factories (hoisted to the top of the file) can reference them.
21+
*/
22+
const { SUBSCRIPTION_TABLE, MEMBER_TABLE, ORGANIZATION_TABLE, resultsByTable, fromCalls, select } =
23+
vi.hoisted(() => {
24+
const SUBSCRIPTION_TABLE = { __table: 'subscription' }
25+
const MEMBER_TABLE = { __table: 'member' }
26+
const ORGANIZATION_TABLE = { __table: 'organization' }
27+
28+
const resultsByTable: Record<string, unknown[][]> = {
29+
subscription: [],
30+
member: [],
31+
organization: [],
32+
}
33+
const fromCalls: string[] = []
34+
35+
const select = vi.fn(() => ({
36+
from: (table: { __table: string }) => {
37+
fromCalls.push(table.__table)
38+
const where = () => {
39+
const queue = resultsByTable[table.__table]
40+
const next = queue.length > 0 ? queue.shift() : []
41+
return Promise.resolve(next ?? [])
42+
}
43+
return { where }
44+
},
45+
}))
46+
47+
return {
48+
SUBSCRIPTION_TABLE,
49+
MEMBER_TABLE,
50+
ORGANIZATION_TABLE,
51+
resultsByTable,
52+
fromCalls,
53+
select,
54+
}
55+
})
56+
57+
vi.mock('@sim/db', () => ({
58+
db: { select },
59+
}))
60+
61+
vi.mock('@sim/db/schema', () => ({
62+
subscription: SUBSCRIPTION_TABLE,
63+
member: MEMBER_TABLE,
64+
organization: ORGANIZATION_TABLE,
65+
}))
66+
67+
/**
68+
* Realistic plan-check predicates so `pickHighestPrioritySubscription` exercises
69+
* the real Enterprise > Team > Pro priority ordering over the rows we feed it.
70+
*/
71+
vi.mock('@/lib/billing/subscriptions/utils', () => ({
72+
ENTITLED_SUBSCRIPTION_STATUSES: ['active', 'past_due'],
73+
checkEnterprisePlan: (s: any) =>
74+
s?.plan === 'enterprise' && ['active', 'past_due'].includes(s?.status),
75+
checkTeamPlan: (s: any) => s?.plan === 'team' && ['active', 'past_due'].includes(s?.status),
76+
checkProPlan: (s: any) => s?.plan === 'pro' && ['active', 'past_due'].includes(s?.status),
77+
}))
78+
79+
import { getHighestPrioritySubscription } from '@/lib/billing/core/plan'
80+
81+
interface SubRow {
82+
id: string
83+
referenceId: string
84+
plan: string
85+
status: string
86+
}
87+
88+
function personalPro(userId: string): SubRow {
89+
return { id: 'sub-personal-pro', referenceId: userId, plan: 'pro', status: 'active' }
90+
}
91+
92+
function orgEnterprise(orgId: string): SubRow {
93+
return { id: 'sub-org-enterprise', referenceId: orgId, plan: 'enterprise', status: 'active' }
94+
}
95+
96+
function queue(table: 'subscription' | 'member' | 'organization', rows: unknown[]) {
97+
resultsByTable[table].push(rows)
98+
}
99+
100+
describe('getHighestPrioritySubscription', () => {
101+
beforeEach(() => {
102+
vi.clearAllMocks()
103+
resultsByTable.subscription = []
104+
resultsByTable.member = []
105+
resultsByTable.organization = []
106+
fromCalls.length = 0
107+
})
108+
109+
it('picks the org Enterprise sub over a personal Pro sub (priority order)', async () => {
110+
queue('subscription', [personalPro('user-1')]) // personalSubs query
111+
queue('member', [{ organizationId: 'org-1' }]) // memberships query
112+
queue('organization', [{ id: 'org-1' }]) // org-existence query
113+
queue('subscription', [orgEnterprise('org-1')]) // org-subscriptions query
114+
115+
const result = await getHighestPrioritySubscription('user-1')
116+
117+
expect(result).not.toBeNull()
118+
expect(result?.id).toBe('sub-org-enterprise')
119+
expect(result?.plan).toBe('enterprise')
120+
})
121+
122+
it('selection is deterministic regardless of which parallelized query resolves first', async () => {
123+
queue('subscription', [personalPro('user-1')])
124+
queue('member', [{ organizationId: 'org-1' }])
125+
queue('organization', [{ id: 'org-1' }])
126+
queue('subscription', [orgEnterprise('org-1')])
127+
128+
const result = await getHighestPrioritySubscription('user-1')
129+
130+
// Enterprise (org) always wins over Pro (personal).
131+
expect(result?.id).toBe('sub-org-enterprise')
132+
})
133+
134+
it('issues BOTH the personal-subscriptions and memberships queries (parallelized pair)', async () => {
135+
queue('subscription', [personalPro('user-1')])
136+
queue('member', [{ organizationId: 'org-1' }])
137+
queue('organization', [{ id: 'org-1' }])
138+
queue('subscription', [orgEnterprise('org-1')])
139+
140+
await getHighestPrioritySubscription('user-1')
141+
142+
expect(fromCalls).toContain('subscription')
143+
expect(fromCalls).toContain('member')
144+
// First two queries are exactly the parallelized pair (in either order).
145+
expect(fromCalls.slice(0, 2).sort()).toEqual(['member', 'subscription'])
146+
})
147+
148+
it('returns the personal sub and skips org follow-ups when there are no memberships', async () => {
149+
queue('subscription', [personalPro('user-1')])
150+
queue('member', []) // no org memberships
151+
152+
const result = await getHighestPrioritySubscription('user-1')
153+
154+
expect(result?.id).toBe('sub-personal-pro')
155+
expect(result?.plan).toBe('pro')
156+
// org-existence + org-subscription follow-ups are NOT issued.
157+
expect(fromCalls).not.toContain('organization')
158+
expect(fromCalls.filter((t) => t === 'subscription')).toHaveLength(1)
159+
})
160+
161+
it('returns null when neither personal nor org subscriptions exist', async () => {
162+
queue('subscription', []) // no personal subs
163+
queue('member', []) // no memberships
164+
165+
const result = await getHighestPrioritySubscription('user-1')
166+
167+
expect(result).toBeNull()
168+
})
169+
170+
it('excludes orphaned org memberships whose organization row no longer exists', async () => {
171+
queue('subscription', []) // no personal subs
172+
queue('member', [{ organizationId: 'ghost-org' }]) // membership points at a deleted org
173+
queue('organization', []) // org-existence returns nothing -> orphaned
174+
175+
const result = await getHighestPrioritySubscription('user-1')
176+
177+
// Org subs are never fetched (no valid org ids) -> falls back to null.
178+
expect(result).toBeNull()
179+
expect(fromCalls).toContain('organization')
180+
// Only the initial personal-subs read on `subscription`; org-subs query skipped.
181+
expect(fromCalls.filter((t) => t === 'subscription')).toHaveLength(1)
182+
})
183+
184+
it('falls back to the personal sub when the only org is orphaned', async () => {
185+
queue('subscription', [personalPro('user-1')])
186+
queue('member', [{ organizationId: 'ghost-org' }])
187+
queue('organization', []) // orphaned org
188+
189+
const result = await getHighestPrioritySubscription('user-1')
190+
191+
expect(result?.id).toBe('sub-personal-pro')
192+
expect(fromCalls.filter((t) => t === 'subscription')).toHaveLength(1)
193+
})
194+
})

apps/sim/lib/billing/core/plan.ts

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -82,20 +82,21 @@ export async function getHighestPrioritySubscription(
8282
) {
8383
const { onError = 'return-null', executor = db } = options
8484
try {
85-
const personalSubs = await executor
86-
.select()
87-
.from(subscription)
88-
.where(
89-
and(
90-
eq(subscription.referenceId, userId),
91-
inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES)
92-
)
93-
)
94-
95-
const memberships = await executor
96-
.select({ organizationId: member.organizationId })
97-
.from(member)
98-
.where(eq(member.userId, userId))
85+
const [personalSubs, memberships] = await Promise.all([
86+
executor
87+
.select()
88+
.from(subscription)
89+
.where(
90+
and(
91+
eq(subscription.referenceId, userId),
92+
inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES)
93+
)
94+
),
95+
executor
96+
.select({ organizationId: member.organizationId })
97+
.from(member)
98+
.where(eq(member.userId, userId)),
99+
])
99100

100101
const orgIds = memberships.map((m: { organizationId: string }) => m.organizationId)
101102

apps/sim/lib/execution/preprocessing.test.ts

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ describe('preprocessExecution ban gate', () => {
176176
} as any)
177177
})
178178

179-
it('blocks execution with 403 when the actor is banned, before any billing queries', async () => {
179+
it('blocks execution with 403 when the actor is banned (ban wins over the parallel gates)', async () => {
180180
mockGetActivelyBannedUserIds.mockResolvedValue(['billed-account-1'])
181181

182182
const loggingSession = {
@@ -194,8 +194,39 @@ describe('preprocessExecution ban gate', () => {
194194
error: { statusCode: 403, logCreated: true, message: 'Account suspended' },
195195
})
196196
expect(loggingSession.safeStart).toHaveBeenCalled()
197-
expect(getHighestPrioritySubscription).not.toHaveBeenCalled()
198-
expect(checkServerSideUsageLimits).not.toHaveBeenCalled()
197+
})
198+
199+
it('returns 403 (ban precedence) when ban, usage, and rate limit all fail simultaneously', async () => {
200+
mockGetActivelyBannedUserIds.mockResolvedValue(['billed-account-1'])
201+
vi.mocked(checkServerSideUsageLimits).mockResolvedValue({
202+
isExceeded: true,
203+
currentUsage: 20,
204+
limit: 10,
205+
message: 'Usage limit exceeded. Please upgrade your plan to continue.',
206+
} as any)
207+
mockCheckRateLimit.mockResolvedValue({
208+
allowed: false,
209+
remaining: 0,
210+
resetAt: new Date(),
211+
})
212+
213+
const loggingSession = {
214+
safeStart: vi.fn().mockResolvedValue(true),
215+
safeCompleteWithError: vi.fn().mockResolvedValue(undefined),
216+
}
217+
218+
const result = await preprocessExecution({
219+
...baseOptions,
220+
checkRateLimit: true,
221+
loggingSession: loggingSession as any,
222+
})
223+
224+
// Ban (403) takes precedence over usage (402) and rate limit (429),
225+
// independent of which parallel gate's promise settled first.
226+
expect(result).toMatchObject({
227+
success: false,
228+
error: { statusCode: 403, logCreated: true, message: 'Account suspended' },
229+
})
199230
})
200231

201232
it('checks the billing actor, caller-provided userId, and workflow owner in one call', async () => {
@@ -234,6 +265,5 @@ describe('preprocessExecution ban gate', () => {
234265
success: false,
235266
error: { statusCode: 500, logCreated: true },
236267
})
237-
expect(checkServerSideUsageLimits).not.toHaveBeenCalled()
238268
})
239269
})

0 commit comments

Comments
 (0)