Skip to content

Commit 291a993

Browse files
authored
feat: improve enricher rate-limit handling for higher throughput [CM-1220] (#4189)
Signed-off-by: Mouad BANI <mouad-mb@outlook.com>
1 parent 61896b1 commit 291a993

7 files changed

Lines changed: 414 additions & 183 deletions

File tree

services/apps/packages_worker/src/bin/github-repos-enricher.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,23 @@ const main = async () => {
2424
const enricherConfig = getEnricherConfig()
2525
const appConfig = getGithubAppConfig()
2626

27-
const installationIds = await resolveInstallations(appConfig)
27+
const discoveredIds = await resolveInstallations(appConfig)
2828

29-
if (installationIds.length === 0) {
29+
if (discoveredIds.length === 0) {
3030
log.error('No GitHub App installations found — cannot build token pool')
3131
process.exit(1)
3232
}
3333

34-
await fetchRateLimitDiagnostics(appConfig.appId, appConfig.privateKeyPem, installationIds)
34+
const installationIds = await fetchRateLimitDiagnostics(
35+
appConfig.appId,
36+
appConfig.privateKeyPem,
37+
discoveredIds,
38+
)
39+
40+
if (installationIds.length === 0) {
41+
log.error('All installations failed the rate limit probe — cannot build token pool')
42+
process.exit(1)
43+
}
3544

3645
log.info(
3746
{

services/apps/packages_worker/src/config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ export function getEnricherConfig() {
4141
return {
4242
updateIntervalHours: requireEnvInt('ENRICHER_REPO_UPDATE_INTERVAL_HOURS'),
4343
idleSleepSec: requireEnvInt('ENRICHER_IDLE_SLEEP_SEC'),
44-
concurrency: parseInt(process.env.ENRICHER_CONCURRENCY ?? '80', 10),
44+
concurrency: parseInt(process.env.ENRICHER_CONCURRENCY ?? '150', 10),
4545
fetchTimeoutMs: parseInt(process.env.ENRICHER_FETCH_TIMEOUT_MS ?? '10000', 10),
4646
}
4747
}

services/apps/packages_worker/src/enricher/fetchActivitySnapshot.ts

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ function buildDateWindows(): {
100100

101101
interface RateLimit {
102102
cost: number
103+
remaining: number
104+
resetAt: string
103105
}
104106

105107
interface IssueCount {
@@ -164,7 +166,7 @@ const SUMMARY_QUERY = `
164166
$searchIssuesOpened: String!, $searchIssuesClosed: String!,
165167
$searchIssuesOpened6m: String!, $searchIssuesOpenNow: String!
166168
) {
167-
rateLimit { cost }
169+
rateLimit { cost remaining resetAt }
168170
repository(owner: $owner, name: $name) {
169171
defaultBranchRef {
170172
target {
@@ -188,7 +190,7 @@ const SUMMARY_QUERY = `
188190

189191
const PR_PAGE_QUERY = `
190192
query($owner: String!, $name: String!, $cursor: String) {
191-
rateLimit { cost }
193+
rateLimit { cost remaining resetAt }
192194
repository(owner: $owner, name: $name) {
193195
pullRequests(first: ${PAGE_SIZE}, orderBy: { field: CREATED_AT, direction: DESC }, after: $cursor) {
194196
pageInfo { hasNextPage endCursor }
@@ -210,10 +212,16 @@ async function pagePrs(
210212
token: string,
211213
timeoutMs: number,
212214
windowStart: Date,
213-
): Promise<{ nodes: PrNode[]; rateLimitCost: number; httpRequestCount: number }> {
215+
): Promise<{
216+
nodes: PrNode[]
217+
rateLimitCost: number
218+
httpRequestCount: number
219+
rateLimit: RateLimit | null
220+
}> {
214221
const nodes: PrNode[] = []
215222
let rateLimitCost = 0
216223
let httpRequestCount = 0
224+
let rateLimit: RateLimit | null = null
217225
let cursor: string | undefined
218226

219227
for (;;) {
@@ -223,6 +231,7 @@ async function pagePrs(
223231
const data = await graphqlRequest<PrPageQueryResult>(PR_PAGE_QUERY, variables, token, timeoutMs)
224232
rateLimitCost += data.rateLimit.cost
225233
httpRequestCount++
234+
rateLimit = data.rateLimit
226235

227236
const connection = data.repository.pullRequests
228237
let reachedWindowBoundary = false
@@ -239,12 +248,12 @@ async function pagePrs(
239248
cursor = connection.pageInfo.endCursor
240249
}
241250

242-
return { nodes, rateLimitCost, httpRequestCount }
251+
return { nodes, rateLimitCost, httpRequestCount, rateLimit }
243252
}
244253

245254
const ISSUE_PAGE_QUERY = `
246255
query($owner: String!, $name: String!, $cursor: String) {
247-
rateLimit { cost }
256+
rateLimit { cost remaining resetAt }
248257
repository(owner: $owner, name: $name) {
249258
issues(first: ${PAGE_SIZE}, orderBy: { field: CREATED_AT, direction: DESC }, after: $cursor) {
250259
pageInfo { hasNextPage endCursor }
@@ -265,10 +274,16 @@ async function pageIssues(
265274
token: string,
266275
timeoutMs: number,
267276
windowStart: Date,
268-
): Promise<{ nodes: IssueNode[]; rateLimitCost: number; httpRequestCount: number }> {
277+
): Promise<{
278+
nodes: IssueNode[]
279+
rateLimitCost: number
280+
httpRequestCount: number
281+
rateLimit: RateLimit | null
282+
}> {
269283
const nodes: IssueNode[] = []
270284
let rateLimitCost = 0
271285
let httpRequestCount = 0
286+
let rateLimit: RateLimit | null = null
272287
let cursor: string | undefined
273288

274289
for (;;) {
@@ -283,6 +298,7 @@ async function pageIssues(
283298
)
284299
rateLimitCost += data.rateLimit.cost
285300
httpRequestCount++
301+
rateLimit = data.rateLimit
286302

287303
const connection = data.repository.issues
288304
let reachedWindowBoundary = false
@@ -299,7 +315,7 @@ async function pageIssues(
299315
cursor = connection.pageInfo.endCursor
300316
}
301317

302-
return { nodes, rateLimitCost, httpRequestCount }
318+
return { nodes, rateLimitCost, httpRequestCount, rateLimit }
303319
}
304320

305321
export async function fetchActivitySnapshot(
@@ -363,6 +379,11 @@ export async function fetchActivitySnapshot(
363379
const prMedians = computePrMedians(prResult.nodes)
364380
const issueMedians = computeIssueMedians(issueResult.nodes)
365381

382+
// Lowest remaining across all requests — remaining only decreases within a reset window
383+
const lastRateLimit = [summaryData.rateLimit, prResult.rateLimit, issueResult.rateLimit]
384+
.filter((rl): rl is RateLimit => rl !== null)
385+
.reduce((min, rl) => (rl.remaining < min.remaining ? rl : min))
386+
366387
return {
367388
repoId,
368389
snapshotAt: new Date().toISOString(),
@@ -385,5 +406,7 @@ export async function fetchActivitySnapshot(
385406
issueMedianTimeToFirstResponseHours: issueMedians.medianTimeToFirstResponseHours,
386407
httpRequestCount: totalHttpRequests,
387408
rateLimitCost: totalRateLimitCost,
409+
rateLimitRemaining: lastRateLimit.remaining,
410+
rateLimitResetAt: lastRateLimit.resetAt,
388411
}
389412
}

services/apps/packages_worker/src/enricher/fetchLightRepo.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,20 @@ async function fetchSecurityFileEnabled(
5252
})
5353
if (response.status === 200) return true
5454
if (response.status === 404) return false
55+
if (response.status === 403) {
56+
const body = await response.text()
57+
if (body.toLowerCase().includes('rate limit')) {
58+
// REST secondary limits send retry-after; primary limits send x-ratelimit-reset
59+
const retryAfterSec = parseInt(response.headers.get('retry-after') ?? '0', 10)
60+
const resetSec = parseInt(response.headers.get('x-ratelimit-reset') ?? '0', 10)
61+
const resetMs = retryAfterSec
62+
? Date.now() + retryAfterSec * 1000
63+
: resetSec
64+
? resetSec * 1000 + 5_000
65+
: Date.now() + 65_000
66+
throw new FetchError('RATE_LIMIT', `Contents API rate limited on ${path}`, resetMs)
67+
}
68+
}
5569
throw new Error(`Unexpected status ${response.status} for ${path}`)
5670
} finally {
5771
clearTimeout(timeoutId)
@@ -65,6 +79,8 @@ async function fetchSecurityFileEnabled(
6579
])
6680
return root || dotGithub
6781
} catch (err) {
82+
// Rate limits propagate so the caller can park the installation and requeue the repo
83+
if (err instanceof FetchError && err.kind === 'RATE_LIMIT') throw err
6884
log.warn(
6985
{
7086
url,

services/apps/packages_worker/src/enricher/githubAppAuth.ts

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import jwt from 'jsonwebtoken'
22

33
import { getServiceChildLogger } from '@crowd/logging'
44

5+
import { FetchError } from './types'
6+
57
const log = getServiceChildLogger('github-app-auth')
68

79
const GITHUB_API = 'https://api.github.com'
@@ -80,7 +82,22 @@ export async function getInstallationToken(
8082

8183
if (!resp.ok) {
8284
const body = await resp.text()
83-
throw new Error(
85+
if (resp.status === 429 || (resp.status === 403 && body.toLowerCase().includes('rate limit'))) {
86+
const retryAfterSec = parseInt(resp.headers.get('retry-after') ?? '0', 10)
87+
const resetSec = parseInt(resp.headers.get('x-ratelimit-reset') ?? '0', 10)
88+
const resetMs = retryAfterSec
89+
? Date.now() + retryAfterSec * 1000
90+
: resetSec
91+
? resetSec * 1000 + 5_000
92+
: Date.now() + 65_000
93+
throw new FetchError(
94+
'RATE_LIMIT',
95+
`Rate limited minting token for installation ${installationId}`,
96+
resetMs,
97+
)
98+
}
99+
throw new FetchError(
100+
'AUTH',
84101
`Failed to mint token for installation ${installationId} (${resp.status}): ${body}`,
85102
)
86103
}
@@ -100,23 +117,35 @@ interface RateLimitEntry {
100117
}
101118

102119
/**
103-
* Fetches GraphQL rate limit info for every installation in parallel (one-time startup diagnostic).
104-
* GET /rate_limit does not consume quota.
120+
* Fetches GraphQL rate limit info for every installation in parallel (one-time startup check).
121+
* GET /rate_limit does not consume quota. Excludes installations that fail permanently
122+
* (IP allowlists, suspended apps) — those would fail every enrichment request too.
123+
* Transient probe failures stay in the pool; runtime parking handles them if truly broken.
105124
*/
106125
export async function fetchRateLimitDiagnostics(
107126
appId: string,
108127
privateKeyPem: string,
109128
installationIds: number[],
110-
): Promise<void> {
129+
): Promise<number[]> {
111130
const results = await Promise.all(
112131
installationIds.map(
113-
async (id): Promise<RateLimitEntry | { installationId: number; error: string }> => {
132+
async (
133+
id,
134+
): Promise<
135+
RateLimitEntry | { installationId: number; error: string; permanent: boolean }
136+
> => {
114137
try {
115138
const token = await getInstallationToken(appId, privateKeyPem, id)
116139
const resp = await fetch(`${GITHUB_API}/rate_limit`, {
117140
headers: { Authorization: `bearer ${token}`, Accept: 'application/vnd.github+json' },
118141
})
119-
if (!resp.ok) return { installationId: id, error: `HTTP ${resp.status}` }
142+
if (!resp.ok) {
143+
return {
144+
installationId: id,
145+
error: `HTTP ${resp.status}`,
146+
permanent: [401, 403, 404].includes(resp.status),
147+
}
148+
}
120149
// eslint-disable-next-line @typescript-eslint/no-explicit-any
121150
const data = (await resp.json()) as any
122151
const g = data.resources?.graphql
@@ -128,34 +157,53 @@ export async function fetchRateLimitDiagnostics(
128157
resetAt: new Date(g.reset * 1000).toISOString(),
129158
}
130159
} catch (err) {
131-
return { installationId: id, error: (err as Error).message }
160+
const permanent = err instanceof FetchError && err.kind === 'AUTH'
161+
return { installationId: id, error: (err as Error).message, permanent }
132162
}
133163
},
134164
),
135165
)
136166

137167
let totalLimit = 0
138168
let totalRemaining = 0
139-
let failures = 0
169+
const healthyIds: number[] = []
170+
const excluded: Array<{ installationId: number; error: string }> = []
171+
const transientFailures: Array<{ installationId: number; error: string }> = []
140172
for (const r of results) {
141173
if ('error' in r) {
142-
failures++
174+
if (r.permanent) {
175+
excluded.push({ installationId: r.installationId, error: r.error })
176+
} else {
177+
transientFailures.push({ installationId: r.installationId, error: r.error })
178+
healthyIds.push(r.installationId)
179+
}
143180
} else {
144181
totalLimit += r.limit
145182
totalRemaining += r.remaining
183+
healthyIds.push(r.installationId)
146184
}
147185
}
148186

149187
log.info(
150188
{
151189
installations: installationIds.length,
152-
failures,
190+
excluded: excluded.length,
191+
transientFailures: transientFailures.length,
153192
totalLimit,
154193
totalRemaining,
155194
totalUsed: totalLimit - totalRemaining,
156195
},
157196
'GitHub App rate limit capacity',
158197
)
198+
199+
if (transientFailures.length > 0) {
200+
log.warn({ transientFailures }, 'Probe failed transiently — keeping installations in the pool')
201+
}
202+
if (excluded.length > 0) {
203+
log.warn({ excluded }, 'Excluding installations that permanently failed the probe')
204+
}
205+
206+
return healthyIds
159207
}
160208

161209
export interface GithubAppConfig {

0 commit comments

Comments
 (0)