Skip to content

Commit 9a4c9d2

Browse files
fix(db-part-3): bound cross-request shared promises against pool wedge (#5021)
* fix(db-part-3): bound cross-request shared promises against pool wedge * address comments
1 parent 005fa10 commit 9a4c9d2

9 files changed

Lines changed: 292 additions & 19 deletions

File tree

apps/sim/app/api/auth/oauth/utils.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ async function performCoalescedRefresh({
358358

359359
const lockKey = `oauth:refresh:${accountId}`
360360

361-
return coalesceLocally(lockKey, () =>
361+
const refreshPromise = coalesceLocally(lockKey, () =>
362362
withLeaderLock<string>({
363363
key: lockKey,
364364
onLeader: async () => {
@@ -429,6 +429,16 @@ async function performCoalescedRefresh({
429429
},
430430
})
431431
)
432+
433+
try {
434+
return await refreshPromise
435+
} catch (error) {
436+
logger.error('Coalesced refresh did not settle', {
437+
...logContext,
438+
error: toError(error).message,
439+
})
440+
return null
441+
}
432442
}
433443

434444
export async function getOAuthToken(userId: string, providerId: string): Promise<string | null> {

apps/sim/app/api/function/execute/route.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,13 @@ let typescriptModulePromise: Promise<TypeScriptModule> | null = null
115115

116116
async function loadTypeScriptModule(): Promise<TypeScriptModule> {
117117
if (!typescriptModulePromise) {
118-
typescriptModulePromise = import('typescript').then((mod) => {
119-
const tsModule = (mod?.default ?? mod) as TypeScriptModule
120-
return tsModule
121-
})
118+
typescriptModulePromise = import('typescript').then(
119+
(mod) => (mod?.default ?? mod) as TypeScriptModule,
120+
(error) => {
121+
typescriptModulePromise = null
122+
throw error
123+
}
124+
)
122125
}
123126

124127
return typescriptModulePromise

apps/sim/lib/concurrency/__tests__/singleflight.test.ts

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
*/
44
import { sleep } from '@sim/utils/helpers'
55
import { afterEach, describe, expect, it, vi } from 'vitest'
6-
import { __resetCoalesceLocallyForTests, coalesceLocally } from '@/lib/concurrency/singleflight'
6+
import {
7+
__resetCoalesceLocallyForTests,
8+
CoalesceSettleTimeoutError,
9+
coalesceLocally,
10+
} from '@/lib/concurrency/singleflight'
711

812
afterEach(() => {
913
__resetCoalesceLocallyForTests()
@@ -57,9 +61,103 @@ describe('coalesceLocally', () => {
5761
await expect(coalesceLocally('rejection', fn)).rejects.toThrow('fail 2')
5862
})
5963

64+
it('surfaces a synchronously-thrown fn error and evicts the entry', async () => {
65+
const fn = vi.fn((): Promise<string> => {
66+
throw new Error('sync boom')
67+
})
68+
69+
// The real error must surface (not a TDZ ReferenceError from the evict
70+
// closure) and the entry must be evicted so the next call retries.
71+
await expect(coalesceLocally('sync-throw', fn)).rejects.toThrow('sync boom')
72+
await expect(coalesceLocally('sync-throw', fn)).rejects.toThrow('sync boom')
73+
expect(fn).toHaveBeenCalledTimes(2)
74+
})
75+
6076
it('does not coalesce across distinct keys', async () => {
6177
const fn = vi.fn(async () => 'value')
6278
await Promise.all([coalesceLocally('a', fn), coalesceLocally('b', fn)])
6379
expect(fn).toHaveBeenCalledTimes(2)
6480
})
81+
82+
it('rejects all awaiters and evicts the entry when the producer misses the settle deadline', async () => {
83+
vi.useFakeTimers()
84+
try {
85+
let resolveHung: (value: string) => void
86+
const hung = vi.fn(
87+
() =>
88+
new Promise<string>((resolve) => {
89+
resolveHung = resolve
90+
})
91+
)
92+
93+
const a = coalesceLocally('wedged', hung)
94+
const b = coalesceLocally('wedged', hung)
95+
const aAssertion = expect(a).rejects.toBeInstanceOf(CoalesceSettleTimeoutError)
96+
const bAssertion = expect(b).rejects.toBeInstanceOf(CoalesceSettleTimeoutError)
97+
98+
await vi.advanceTimersByTimeAsync(30_000)
99+
await aAssertion
100+
await bAssertion
101+
expect(hung).toHaveBeenCalledTimes(1)
102+
103+
const fresh = vi.fn(async () => 'recovered')
104+
await expect(coalesceLocally('wedged', fresh)).resolves.toBe('recovered')
105+
expect(fresh).toHaveBeenCalledTimes(1)
106+
107+
resolveHung!('late')
108+
} finally {
109+
vi.useRealTimers()
110+
}
111+
})
112+
113+
it('a timed-out producer settling late does not evict its successor', async () => {
114+
vi.useFakeTimers()
115+
try {
116+
let resolveOld: (value: string) => void
117+
const old = coalesceLocally(
118+
'late-settle',
119+
() =>
120+
new Promise<string>((resolve) => {
121+
resolveOld = resolve
122+
}),
123+
1_000
124+
)
125+
const oldAssertion = expect(old).rejects.toBeInstanceOf(CoalesceSettleTimeoutError)
126+
await vi.advanceTimersByTimeAsync(1_000)
127+
await oldAssertion
128+
129+
let resolveNew: (value: string) => void
130+
const successor = coalesceLocally(
131+
'late-settle',
132+
() =>
133+
new Promise<string>((resolve) => {
134+
resolveNew = resolve
135+
})
136+
)
137+
138+
resolveOld!('late')
139+
await vi.advanceTimersByTimeAsync(0)
140+
141+
const joined = coalesceLocally('late-settle', async () => 'should-not-run')
142+
expect(joined).toBe(successor)
143+
144+
resolveNew!('new-value')
145+
await expect(successor).resolves.toBe('new-value')
146+
} finally {
147+
vi.useRealTimers()
148+
}
149+
})
150+
151+
it('does not fire the deadline for producers that settle in time', async () => {
152+
vi.useFakeTimers()
153+
try {
154+
const value = await coalesceLocally('prompt', async () => 'ok', 1_000)
155+
expect(value).toBe('ok')
156+
157+
await vi.advanceTimersByTimeAsync(2_000)
158+
await expect(coalesceLocally('prompt', async () => 'again', 1_000)).resolves.toBe('again')
159+
} finally {
160+
vi.useRealTimers()
161+
}
162+
})
65163
})

apps/sim/lib/concurrency/singleflight.ts

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,69 @@
11
const inflight = new Map<string, Promise<unknown>>()
22

3-
export function coalesceLocally<T>(key: string, fn: () => Promise<T>): Promise<T> {
3+
/**
4+
* Default deadline for a coalesced producer to settle. Joiners share the
5+
* producer's promise, so without a deadline a single hung producer wedges
6+
* every future caller for that key until process restart.
7+
*/
8+
const DEFAULT_SETTLE_TIMEOUT_MS = 30_000
9+
10+
/**
11+
* Thrown to all awaiters when a coalesced producer fails to settle within
12+
* its deadline. The entry is evicted first, so the next caller mints a
13+
* fresh producer instead of joining the wedged one.
14+
*/
15+
export class CoalesceSettleTimeoutError extends Error {
16+
constructor(key: string, timeoutMs: number) {
17+
super(`Coalesced producer for "${key}" did not settle within ${timeoutMs}ms`)
18+
this.name = 'CoalesceSettleTimeoutError'
19+
}
20+
}
21+
22+
/**
23+
* Deduplicates concurrent async work by key within this process: the first
24+
* caller runs `fn`, every concurrent caller for the same key shares its
25+
* promise. The entry is evicted when the producer settles (either way) or
26+
* when the settle deadline fires, whichever comes first. The underlying
27+
* `fn` is not cancelled on timeout — it keeps running detached, but no new
28+
* caller will join it.
29+
*/
30+
export function coalesceLocally<T>(
31+
key: string,
32+
fn: () => Promise<T>,
33+
settleTimeoutMs: number = DEFAULT_SETTLE_TIMEOUT_MS
34+
): Promise<T> {
435
const existing = inflight.get(key) as Promise<T> | undefined
536
if (existing) return existing
637

7-
const promise = (async () => {
8-
try {
9-
return await fn()
10-
} finally {
11-
inflight.delete(key)
12-
}
13-
})()
38+
let timer: ReturnType<typeof setTimeout> | undefined
39+
const evict = () => {
40+
if (inflight.get(key) === guarded) inflight.delete(key)
41+
}
42+
43+
const guarded: Promise<T> = Promise.race([
44+
(async () => {
45+
try {
46+
// Defer fn() to a microtask so a synchronous throw surfaces as a
47+
// rejection after `guarded` and the timer are initialized. Calling it
48+
// inline would run the finally below during construction, touching
49+
// `guarded` in its temporal dead zone and masking fn's real error.
50+
return await Promise.resolve().then(fn)
51+
} finally {
52+
clearTimeout(timer)
53+
evict()
54+
}
55+
})(),
56+
new Promise<never>((_, reject) => {
57+
timer = setTimeout(() => {
58+
evict()
59+
reject(new CoalesceSettleTimeoutError(key, settleTimeoutMs))
60+
}, settleTimeoutMs)
61+
timer.unref?.()
62+
}),
63+
])
1464

15-
inflight.set(key, promise)
16-
return promise
65+
inflight.set(key, guarded)
66+
return guarded
1767
}
1868

1969
export function __resetCoalesceLocallyForTests(): void {

apps/sim/lib/mcp/oauth/storage.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,42 @@ describe('withMcpOauthRefreshLock', () => {
221221
}
222222
})
223223

224+
it('bounds the queue wait: callers stalled behind a wedged link reject without running fn', async () => {
225+
vi.useFakeTimers()
226+
try {
227+
mockAcquireLock.mockResolvedValue(true)
228+
let resolveFirst: (value: string) => void
229+
const hungFn = vi.fn(
230+
() =>
231+
new Promise<string>((resolve) => {
232+
resolveFirst = resolve
233+
})
234+
)
235+
const queuedFn = vi.fn(async () => 'second')
236+
237+
const first = withMcpOauthRefreshLock('row-stall', hungFn)
238+
const second = withMcpOauthRefreshLock('row-stall', queuedFn)
239+
const secondAssertion = expect(second).rejects.toThrow(/stalled for/)
240+
241+
await vi.advanceTimersByTimeAsync(90_000)
242+
await secondAssertion
243+
expect(queuedFn).not.toHaveBeenCalled()
244+
245+
// The wedged link is untouched by the queue deadline (its own fn keeps
246+
// the lock, protecting a possibly mid-rotation refresh) and the row
247+
// heals once it settles — including skipping the abandoned link's fn.
248+
resolveFirst!('first')
249+
await expect(first).resolves.toBe('first')
250+
251+
await expect(withMcpOauthRefreshLock('row-stall', async () => 'healed')).resolves.toBe(
252+
'healed'
253+
)
254+
expect(queuedFn).not.toHaveBeenCalled()
255+
} finally {
256+
vi.useRealTimers()
257+
}
258+
})
259+
224260
it('extends the lock TTL while fn() is running so long refreshes do not lose the lock', async () => {
225261
vi.useFakeTimers()
226262
try {

apps/sim/lib/mcp/oauth/storage.ts

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,10 @@ export async function clearState(rowId: string): Promise<void> {
237237
* Two-tier serialization (each caller runs its OWN `fn()` — callers consume
238238
* `McpClient` instances that can't be shared, unlike a scalar access token):
239239
* 1) In-process: per-row Promise chain. Concurrent callers queue; each
240-
* runs `fn()` after the previous settles.
240+
* runs `fn()` after the previous settles. The queue wait is bounded —
241+
* a caller whose turn does not arrive within
242+
* {@link REFRESH_QUEUE_WAIT_TIMEOUT_MS} rejects without ever running
243+
* its `fn()`, so a wedged link cannot accumulate callers indefinitely.
241244
* 2) Cross-process: Redis mutex (`acquireLock` / `releaseLock`) with a TTL
242245
* watchdog that periodically extends the lock while `fn()` runs, so
243246
* long-running refreshes don't drop the lock and let another process
@@ -251,18 +254,59 @@ const REFRESH_LOCK_EXTEND_INTERVAL_MS = 5_000
251254
const REFRESH_POLL_INTERVAL_MS = 100
252255
const REFRESH_MAX_WAIT_MS = 30_000
253256

257+
/**
258+
* Deadline on the in-process QUEUE WAIT only — the time a caller spends
259+
* waiting for its turn behind queued predecessors. Without it, one hung
260+
* link wedges every subsequent caller for that row until process restart.
261+
* Sized to survive one legitimately slow predecessor: up to
262+
* REFRESH_MAX_WAIT_MS of cross-process lock contention plus the MCP SDK's
263+
* 60s initialize timeout. Deliberately NOT applied to the caller's own
264+
* `fn()` run — aborting a running `fn()` would orphan a connected
265+
* `McpClient` and abandon a possibly mid-rotation refresh; `fn()` is
266+
* bounded by its own SDK/HTTP/Redis timeouts instead.
267+
*/
268+
const REFRESH_QUEUE_WAIT_TIMEOUT_MS = 90_000
269+
254270
const inflightChains = new Map<string, Promise<unknown>>()
255271

256272
export async function withMcpOauthRefreshLock<T>(rowId: string, fn: () => Promise<T>): Promise<T> {
257273
const lockKey = `mcp:oauth:refresh:${rowId}`
258274
const prev = inflightChains.get(lockKey) ?? Promise.resolve()
259-
const next = prev.catch(() => undefined).then(() => runWithRedisMutex(lockKey, rowId, fn))
275+
const prevSettled = prev.catch(() => undefined)
276+
277+
let queueTimedOut = false
278+
const next = prevSettled.then(() => {
279+
if (queueTimedOut) {
280+
throw new Error(`MCP OAuth refresh queue for ${rowId} abandoned after timeout`)
281+
}
282+
return runWithRedisMutex(lockKey, rowId, fn)
283+
})
260284
inflightChains.set(lockKey, next)
261285
const cleanup = () => {
262286
if (inflightChains.get(lockKey) === next) inflightChains.delete(lockKey)
263287
}
264288
next.then(cleanup, cleanup)
265-
return next as Promise<T>
289+
290+
let queueTimer: ReturnType<typeof setTimeout> | undefined
291+
const queueDeadline = new Promise<never>((_, reject) => {
292+
queueTimer = setTimeout(() => {
293+
queueTimedOut = true
294+
reject(
295+
new Error(
296+
`MCP OAuth refresh queue for ${rowId} stalled for ${REFRESH_QUEUE_WAIT_TIMEOUT_MS}ms`
297+
)
298+
)
299+
}, REFRESH_QUEUE_WAIT_TIMEOUT_MS)
300+
queueTimer.unref?.()
301+
})
302+
303+
try {
304+
await Promise.race([prevSettled, queueDeadline])
305+
} finally {
306+
clearTimeout(queueTimer)
307+
}
308+
309+
return next
266310
}
267311

268312
async function runWithRedisMutex<T>(

apps/sim/lib/oauth/oauth.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1496,6 +1496,15 @@ function extractErrorCode(value: unknown): string | undefined {
14961496
return undefined
14971497
}
14981498

1499+
/**
1500+
* Hard deadline on the token-endpoint exchange. This function does not coalesce
1501+
* on its own; its sole production caller (`performCoalescedRefresh` in the OAuth
1502+
* utils) shares one in-flight refresh across concurrent callers for a credential.
1503+
* Without this bound a hung endpoint would wedge every joiner on that key until
1504+
* the undici socket defaults (~5 min) gave up.
1505+
*/
1506+
const TOKEN_REFRESH_TIMEOUT_MS = 15_000
1507+
14991508
export async function refreshOAuthToken(
15001509
providerId: string,
15011510
refreshToken: string
@@ -1511,6 +1520,7 @@ export async function refreshOAuthToken(
15111520
method: 'POST',
15121521
headers,
15131522
body: useJsonBody ? JSON.stringify(bodyParams) : new URLSearchParams(bodyParams).toString(),
1523+
signal: AbortSignal.timeout(TOKEN_REFRESH_TIMEOUT_MS),
15141524
})
15151525

15161526
if (!response.ok) {

0 commit comments

Comments
 (0)