Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions services/apps/packages_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"dev:pagerank": "tsx --expose-gc src/criticality/run-pagerank.ts",
"start:pom-fetcher": "SERVICE=pom-fetcher tsx src/bin/pom-fetcher.ts",
"backfill:maven": "SERVICE=maven tsx src/bin/maven-backfill.ts",
"backfill:stewardship": "SERVICE=stewardship-backfill tsx src/bin/stewardship-backfill.ts",
"backfill:stewardship:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=stewardship-backfill LOG_LEVEL=info tsx src/bin/stewardship-backfill.ts",
"dev:packages-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=packages-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9233 src/bin/packages-worker.ts",
"dev:criticality-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=criticality-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9237 src/bin/criticality-worker.ts",
"start:maven-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=maven-worker tsx src/bin/maven-worker.ts",
Expand Down
45 changes: 45 additions & 0 deletions services/apps/packages_worker/src/bin/stewardship-backfill.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { getServiceLogger } from '@crowd/logging'

import { getPackagesDb } from '../db'
import { runStewardshipBackfill } from '../stewardship/runStewardshipBackfill'

const log = getServiceLogger()

let shuttingDown = false

// Graceful stop: finish the in-flight batch, then exit. Safe to interrupt — every
// write is ON CONFLICT DO NOTHING so re-running resumes where it left off.
const shutdown = () => {
if (shuttingDown) return
shuttingDown = true
log.info('Shutting down stewardship backfill (stopping after the current batch)...')
}

process.on('SIGINT', shutdown)
process.on('SIGTERM', shutdown)

const main = async () => {
log.info('stewardship backfill starting...')

const qx = await getPackagesDb()
await qx.selectOne('SELECT 1')
log.info('Connected to packages-db.')

const rawBatchSize = parseInt(process.env.STEWARDSHIP_BACKFILL_BATCH_SIZE ?? '10000', 10)
if (!Number.isFinite(rawBatchSize) || rawBatchSize <= 0) {
throw new Error(
`STEWARDSHIP_BACKFILL_BATCH_SIZE must be a positive integer, got: ${process.env.STEWARDSHIP_BACKFILL_BATCH_SIZE}`,
)
}
const batchSize = rawBatchSize

const totals = await runStewardshipBackfill(qx, { batchSize }, () => shuttingDown)

log.info({ ...totals }, 'stewardship backfill complete')
process.exit(0)
}

main().catch((err) => {
log.error({ err }, 'stewardship backfill fatal error')
process.exit(1)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import {
QueryExecutor,
insertUnassignedStewardships,
listCriticalPackagesWithoutStewardship,
} from '@crowd/data-access-layer'
import { getServiceChildLogger } from '@crowd/logging'

const log = getServiceChildLogger('stewardship-backfill')

export interface BackfillResult {
inserted: number
skipped: number
batches: number
}

interface BackfillOptions {
batchSize: number
}

/**
* Seeds one `stewardships` row (status=unassigned, origin=auto_imported) for
* every critical package that doesn't already have one. Idempotent: ON CONFLICT
* DO NOTHING means re-running is safe and will just report 0 inserts.
*
* Designed to be called from a Temporal activity or directly from the bin script.
* The `isStopping` callback lets the caller signal a graceful shutdown between
* batches — the function returns the totals collected so far.
*/
export async function runStewardshipBackfill(
qx: QueryExecutor,
options: BackfillOptions,
isStopping: () => boolean = () => false,
): Promise<BackfillResult> {
const { batchSize } = options
let lastId = 0
let inserted = 0
let skipped = 0
let batches = 0

while (!isStopping()) {
const ids = await listCriticalPackagesWithoutStewardship(qx, {
afterId: lastId,
limit: batchSize,
})

if (ids.length === 0) break

const batchInserted = await insertUnassignedStewardships(qx, ids)
const batchSkipped = ids.length - batchInserted

inserted += batchInserted
skipped += batchSkipped
batches++
lastId = ids[ids.length - 1]

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor skips newly critical packages

Medium Severity

The backfill advances lastId and only lists packages with p.id > afterId. Critical packages whose is_critical flips to true after that id was passed are never selected in that run, yet the loop still exits when no higher ids remain. Those packages stay without a stewardships row until the job is run again from the start.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 399b9ba. Configure here.


log.info({ batches, inserted, skipped, lastId, batchInserted, batchSkipped }, 'Batch complete.')
}

return { inserted, skipped, batches }
}
1 change: 1 addition & 0 deletions services/libs/data-access-layer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ export * from './osspckgs/ingestJobs'
export * from './osspckgs/maintainers'
export * from './osspckgs/packages'
export * from './osspckgs/repos'
export * from './osspckgs/stewardships'
export * from './osspckgs/versions'
1 change: 1 addition & 0 deletions services/libs/data-access-layer/src/osspckgs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export * from './packages'
export * from './maintainers'
export * from './versions'
export * from './repos'
export * from './stewardships'
59 changes: 59 additions & 0 deletions services/libs/data-access-layer/src/osspckgs/stewardships.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { QueryExecutor } from '../queryExecutor'

/**
* Returns a page of critical package ids that do not yet have a stewardship row,
* ordered by id ascending. Used as the cursor-based pagination source for the
* stewardship backfill.
*/
export async function listCriticalPackagesWithoutStewardship(
qx: QueryExecutor,
options: { afterId: number; limit: number },
): Promise<number[]> {
// pg returns BIGINT columns as strings; Number() is safe here because
// package ids are well within JS safe-integer range.
const rows: Array<{ id: string | number }> = await qx.select(
`
SELECT p.id
FROM packages p
LEFT JOIN stewardships s ON s.package_id = p.id
WHERE p.is_critical = true
AND p.id > $(afterId)
AND s.package_id IS NULL
ORDER BY p.id ASC
LIMIT $(limit)
`,
options,
)
return rows.map((r) => Number(r.id))
}

/**
* Inserts one unassigned stewardship row per package id. Idempotent:
* ON CONFLICT DO NOTHING skips ids that already have a row.
* Returns the number of rows actually inserted.
*
* Re-checks is_critical at insert time to guard against concurrent criticality
* changes between the SELECT and INSERT phases.
*/
export async function insertUnassignedStewardships(
qx: QueryExecutor,
packageIds: number[],
): Promise<number> {
if (packageIds.length === 0) return 0
const result: { count: string } = await qx.selectOne(
`
WITH ins AS (
INSERT INTO stewardships (package_id, status, origin, opened_at, last_status_at)
SELECT p.id, 'unassigned', 'auto_imported', NOW(), NOW()
FROM packages p
WHERE p.id = ANY($(packageIds)::bigint[])
AND p.is_critical = true
ON CONFLICT (package_id) DO NOTHING
RETURNING 1
)
SELECT COUNT(*) AS count FROM ins
`,
{ packageIds },
)
return parseInt(result.count, 10)
}
Loading