Skip to content

Commit 399b9ba

Browse files
committed
feat: add backfill script
Signed-off-by: Umberto Sgueglia <usgueglia@contractor.linuxfoundation.org>
1 parent b7f1724 commit 399b9ba

6 files changed

Lines changed: 168 additions & 0 deletions

File tree

services/apps/packages_worker/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
"dev:pagerank": "tsx --expose-gc src/criticality/run-pagerank.ts",
1414
"start:pom-fetcher": "SERVICE=pom-fetcher tsx src/bin/pom-fetcher.ts",
1515
"backfill:maven": "SERVICE=maven tsx src/bin/maven-backfill.ts",
16+
"backfill:stewardship": "SERVICE=stewardship-backfill tsx src/bin/stewardship-backfill.ts",
17+
"backfill:stewardship:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=stewardship-backfill LOG_LEVEL=info tsx src/bin/stewardship-backfill.ts",
1618
"dev:packages-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=packages-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9233 src/bin/packages-worker.ts",
1719
"dev:criticality-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=criticality-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9237 src/bin/criticality-worker.ts",
1820
"start:maven-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=maven-worker tsx src/bin/maven-worker.ts",
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { getServiceLogger } from '@crowd/logging'
2+
3+
import { getPackagesDb } from '../db'
4+
import { runStewardshipBackfill } from '../stewardship/runStewardshipBackfill'
5+
6+
const log = getServiceLogger()
7+
8+
let shuttingDown = false
9+
10+
// Graceful stop: finish the in-flight batch, then exit. Safe to interrupt — every
11+
// write is ON CONFLICT DO NOTHING so re-running resumes where it left off.
12+
const shutdown = () => {
13+
if (shuttingDown) return
14+
shuttingDown = true
15+
log.info('Shutting down stewardship backfill (stopping after the current batch)...')
16+
}
17+
18+
process.on('SIGINT', shutdown)
19+
process.on('SIGTERM', shutdown)
20+
21+
const main = async () => {
22+
log.info('stewardship backfill starting...')
23+
24+
const qx = await getPackagesDb()
25+
await qx.selectOne('SELECT 1')
26+
log.info('Connected to packages-db.')
27+
28+
const rawBatchSize = parseInt(process.env.STEWARDSHIP_BACKFILL_BATCH_SIZE ?? '10000', 10)
29+
if (!Number.isFinite(rawBatchSize) || rawBatchSize <= 0) {
30+
throw new Error(
31+
`STEWARDSHIP_BACKFILL_BATCH_SIZE must be a positive integer, got: ${process.env.STEWARDSHIP_BACKFILL_BATCH_SIZE}`,
32+
)
33+
}
34+
const batchSize = rawBatchSize
35+
36+
const totals = await runStewardshipBackfill(qx, { batchSize }, () => shuttingDown)
37+
38+
log.info({ ...totals }, 'stewardship backfill complete')
39+
process.exit(0)
40+
}
41+
42+
main().catch((err) => {
43+
log.error({ err }, 'stewardship backfill fatal error')
44+
process.exit(1)
45+
})
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import {
2+
QueryExecutor,
3+
insertUnassignedStewardships,
4+
listCriticalPackagesWithoutStewardship,
5+
} from '@crowd/data-access-layer'
6+
import { getServiceChildLogger } from '@crowd/logging'
7+
8+
const log = getServiceChildLogger('stewardship-backfill')
9+
10+
export interface BackfillResult {
11+
inserted: number
12+
skipped: number
13+
batches: number
14+
}
15+
16+
interface BackfillOptions {
17+
batchSize: number
18+
}
19+
20+
/**
21+
* Seeds one `stewardships` row (status=unassigned, origin=auto_imported) for
22+
* every critical package that doesn't already have one. Idempotent: ON CONFLICT
23+
* DO NOTHING means re-running is safe and will just report 0 inserts.
24+
*
25+
* Designed to be called from a Temporal activity or directly from the bin script.
26+
* The `isStopping` callback lets the caller signal a graceful shutdown between
27+
* batches — the function returns the totals collected so far.
28+
*/
29+
export async function runStewardshipBackfill(
30+
qx: QueryExecutor,
31+
options: BackfillOptions,
32+
isStopping: () => boolean = () => false,
33+
): Promise<BackfillResult> {
34+
const { batchSize } = options
35+
let lastId = 0
36+
let inserted = 0
37+
let skipped = 0
38+
let batches = 0
39+
40+
while (!isStopping()) {
41+
const ids = await listCriticalPackagesWithoutStewardship(qx, {
42+
afterId: lastId,
43+
limit: batchSize,
44+
})
45+
46+
if (ids.length === 0) break
47+
48+
const batchInserted = await insertUnassignedStewardships(qx, ids)
49+
const batchSkipped = ids.length - batchInserted
50+
51+
inserted += batchInserted
52+
skipped += batchSkipped
53+
batches++
54+
lastId = ids[ids.length - 1]
55+
56+
log.info({ batches, inserted, skipped, lastId, batchInserted, batchSkipped }, 'Batch complete.')
57+
}
58+
59+
return { inserted, skipped, batches }
60+
}

services/libs/data-access-layer/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ export * from './osspckgs/ingestJobs'
2020
export * from './osspckgs/maintainers'
2121
export * from './osspckgs/packages'
2222
export * from './osspckgs/repos'
23+
export * from './osspckgs/stewardships'
2324
export * from './osspckgs/versions'

services/libs/data-access-layer/src/osspckgs/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ export * from './packages'
33
export * from './maintainers'
44
export * from './versions'
55
export * from './repos'
6+
export * from './stewardships'
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { QueryExecutor } from '../queryExecutor'
2+
3+
/**
4+
* Returns a page of critical package ids that do not yet have a stewardship row,
5+
* ordered by id ascending. Used as the cursor-based pagination source for the
6+
* stewardship backfill.
7+
*/
8+
export async function listCriticalPackagesWithoutStewardship(
9+
qx: QueryExecutor,
10+
options: { afterId: number; limit: number },
11+
): Promise<number[]> {
12+
// pg returns BIGINT columns as strings; Number() is safe here because
13+
// package ids are well within JS safe-integer range.
14+
const rows: Array<{ id: string | number }> = await qx.select(
15+
`
16+
SELECT p.id
17+
FROM packages p
18+
LEFT JOIN stewardships s ON s.package_id = p.id
19+
WHERE p.is_critical = true
20+
AND p.id > $(afterId)
21+
AND s.package_id IS NULL
22+
ORDER BY p.id ASC
23+
LIMIT $(limit)
24+
`,
25+
options,
26+
)
27+
return rows.map((r) => Number(r.id))
28+
}
29+
30+
/**
31+
* Inserts one unassigned stewardship row per package id. Idempotent:
32+
* ON CONFLICT DO NOTHING skips ids that already have a row.
33+
* Returns the number of rows actually inserted.
34+
*
35+
* Re-checks is_critical at insert time to guard against concurrent criticality
36+
* changes between the SELECT and INSERT phases.
37+
*/
38+
export async function insertUnassignedStewardships(
39+
qx: QueryExecutor,
40+
packageIds: number[],
41+
): Promise<number> {
42+
if (packageIds.length === 0) return 0
43+
const result: { count: string } = await qx.selectOne(
44+
`
45+
WITH ins AS (
46+
INSERT INTO stewardships (package_id, status, origin, opened_at, last_status_at)
47+
SELECT p.id, 'unassigned', 'auto_imported', NOW(), NOW()
48+
FROM packages p
49+
WHERE p.id = ANY($(packageIds)::bigint[])
50+
AND p.is_critical = true
51+
ON CONFLICT (package_id) DO NOTHING
52+
RETURNING 1
53+
)
54+
SELECT COUNT(*) AS count FROM ins
55+
`,
56+
{ packageIds },
57+
)
58+
return parseInt(result.count, 10)
59+
}

0 commit comments

Comments
 (0)