Skip to content

Commit 9867df3

Browse files
authored
chore: consolidate npm package state (#4184)
Signed-off-by: anilb <epipav@gmail.com>
1 parent 48593db commit 9867df3

5 files changed

Lines changed: 57 additions & 23 deletions

File tree

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
-- ── 1. Add the 30d watermark columns to npm_package_state ──────────────────────
2+
ALTER TABLE npm_package_state
3+
ADD COLUMN IF NOT EXISTS downloads_30d_last_run_at timestamptz, -- breadth watermark: latest 30d window refreshed
4+
ADD COLUMN IF NOT EXISTS downloads_30d_history_backfilled_at timestamptz, -- depth watermark: NULL until full older history filled
5+
ADD COLUMN IF NOT EXISTS downloads_30d_run_result jsonb; -- { status, httpStatus?, errorKind?, message? }
6+
7+
-- Recreate the two indexes the old table had — both due-selection queries
8+
-- filter/order on these columns.
9+
CREATE INDEX IF NOT EXISTS npm_package_state_downloads_30d_last_run_at_idx
10+
ON npm_package_state (downloads_30d_last_run_at);
11+
CREATE INDEX IF NOT EXISTS npm_package_state_downloads_30d_history_backfilled_at_idx
12+
ON npm_package_state (downloads_30d_history_backfilled_at);
13+
14+
-- ── 2. Migrate existing rows ───────────────────────────────────────────────────
15+
INSERT INTO npm_package_state
16+
(purl, downloads_30d_last_run_at, downloads_30d_history_backfilled_at, downloads_30d_run_result)
17+
SELECT purl, downloads_30d_last_run_at, downloads_30d_history_backfilled_at, downloads_30d_run_result
18+
FROM npm_package_universe_state
19+
ON CONFLICT (purl) DO UPDATE SET
20+
downloads_30d_last_run_at = EXCLUDED.downloads_30d_last_run_at,
21+
downloads_30d_history_backfilled_at = EXCLUDED.downloads_30d_history_backfilled_at,
22+
downloads_30d_run_result = EXCLUDED.downloads_30d_run_result;
23+
24+
-- ── 3. Drop the retired table ──────────────────────────────────────────────────
25+
DROP TABLE npm_package_universe_state;

services/apps/packages_worker/src/npm/activities.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import {
88
getMissingDownloadDates,
99
getNpmChangesLastSeq,
1010
getNpmPackagesNeedingDailyBackfill,
11+
getNpmPurlsDueForLast30dHistory,
12+
getNpmPurlsDueForLatest30d,
1113
getNpmPurlsForChangedNames,
12-
getNpmUniversePurlsDueForLast30dHistory,
13-
getNpmUniversePurlsDueForLatest30d,
1414
getUnscannedNpmPurls,
1515
insertDailyDownloads,
1616
logAuditFieldChanges,
@@ -507,7 +507,7 @@ export async function refreshLatestLast30dLane(
507507
lanes: number,
508508
): Promise<{ fetched: number }> {
509509
const qx = await getPackagesDb()
510-
const due = await getNpmUniversePurlsDueForLatest30d(qx, cutoff, batchSize, laneIndex, lanes)
510+
const due = await getNpmPurlsDueForLatest30d(qx, cutoff, batchSize, laneIndex, lanes)
511511
if (due.length === 0) return { fetched: 0 }
512512

513513
const window = latestLast30dWindow(utcFirstOfCurrentMonth())
@@ -554,7 +554,7 @@ export async function backfillLast30dHistoryLane(
554554
lanes: number,
555555
): Promise<{ fetched: number }> {
556556
const qx = await getPackagesDb()
557-
const due = await getNpmUniversePurlsDueForLast30dHistory(qx, batchSize, laneIndex, lanes)
557+
const due = await getNpmPurlsDueForLast30dHistory(qx, batchSize, laneIndex, lanes)
558558
if (due.length === 0) return { fetched: 0 }
559559

560560
const latestEnd = utcFirstOfCurrentMonth()

services/libs/data-access-layer/src/packages/downloadsDaily.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ export interface DailyBackfillCandidate {
3030

3131
// `laneIndex`/`laneCount` shard the due set across concurrent lanes by a stable hash
3232
// of the purl, so each lane drains a disjoint slice (laneCount=1 ⇒ no sharding).
33+
// Restricted to is_critical packages — daily downloads are deep per-package history,
34+
// scoped to the critical set (matching the metadata pass). The 30d download passes run
35+
// over all npm packages, since their counts feed the criticality ranking.
3336
export async function getNpmPackagesNeedingDailyBackfill(
3437
qx: QueryExecutor,
3538
cutoff: string,
@@ -49,6 +52,7 @@ export async function getNpmPackagesNeedingDailyBackfill(
4952
FROM packages p
5053
LEFT JOIN npm_package_state s ON s.purl = p.purl
5154
WHERE p.ecosystem = 'npm'
55+
AND p.is_critical = TRUE
5256
AND (((hashtext(p.purl) % $(laneCount)) + $(laneCount)) % $(laneCount)) = $(laneIndex)
5357
AND (s.daily_downloads_last_processed_at IS NULL
5458
OR s.daily_downloads_last_processed_at < $(cutoff)::timestamptz)

services/libs/data-access-layer/src/packages/downloadsLast30d.ts

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ export interface Last30dCandidate {
2525
// BREADTH selection. A purl is "due for the latest window" while its breadth watermark
2626
// (`downloads_30d_last_run_at`) is older than this run's cutoff (or absent). The watermark
2727
// is bumped once its current 30-day window is refreshed, so the monthly run touches every
28-
// package's latest window exactly once and the denormalized number lands across the whole
29-
// universe before any deep history is filled. Older history is a separate pass keyed on
30-
// `downloads_30d_history_backfilled_at` — see getNpmUniversePurlsDueForLast30dHistory.
31-
export async function getNpmUniversePurlsDueForLatest30d(
28+
// package's latest window exactly once and the denormalized number lands across all npm
29+
// packages before any deep history is filled. Older history is a separate pass keyed on
30+
// `downloads_30d_history_backfilled_at` — see getNpmPurlsDueForLast30dHistory.
31+
export async function getNpmPurlsDueForLatest30d(
3232
qx: QueryExecutor,
3333
cutoff: string,
3434
batchSize: number,
@@ -39,7 +39,7 @@ export async function getNpmUniversePurlsDueForLatest30d(
3939
`WITH due AS (
4040
SELECT p.purl AS purl, p.first_release_at
4141
FROM packages p
42-
LEFT JOIN npm_package_universe_state s ON s.purl = p.purl
42+
LEFT JOIN npm_package_state s ON s.purl = p.purl
4343
WHERE p.ecosystem = 'npm'
4444
AND (((hashtext(p.purl) % $(laneCount)) + $(laneCount)) % $(laneCount)) = $(laneIndex)
4545
AND (s.downloads_30d_last_run_at IS NULL
@@ -61,7 +61,7 @@ export async function getNpmUniversePurlsDueForLatest30d(
6161
// not yet been filled (`downloads_30d_history_backfilled_at IS NULL`). This keeps the work
6262
// strictly breadth-first per package: history is never fetched before the latest window.
6363
// Sharded the same way; oldest-breadth-first so the longest-waiting packages drain first.
64-
export async function getNpmUniversePurlsDueForLast30dHistory(
64+
export async function getNpmPurlsDueForLast30dHistory(
6565
qx: QueryExecutor,
6666
batchSize: number,
6767
laneIndex: number,
@@ -71,9 +71,8 @@ export async function getNpmUniversePurlsDueForLast30dHistory(
7171
`WITH due AS (
7272
SELECT p.purl AS purl, p.first_release_at, s.downloads_30d_last_run_at AS last_run_at
7373
FROM packages p
74-
JOIN npm_package_universe_state s ON s.purl = p.purl
74+
JOIN npm_package_state s ON s.purl = p.purl
7575
WHERE p.ecosystem = 'npm'
76-
AND p.is_critical = TRUE
7776
AND (((hashtext(p.purl) % $(laneCount)) + $(laneCount)) % $(laneCount)) = $(laneIndex)
7877
AND s.downloads_30d_last_run_at IS NOT NULL
7978
AND s.downloads_30d_history_backfilled_at IS NULL
@@ -90,7 +89,7 @@ export async function getNpmUniversePurlsDueForLast30dHistory(
9089
}
9190

9291
// Structured outcome of a last-30d run, stored as JSONB in
93-
// npm_package_universe_state.downloads_30d_run_result.
92+
// npm_package_state.downloads_30d_run_result.
9493
export interface Last30dRunResult {
9594
status: 'success' | 'error'
9695
httpStatus?: number
@@ -107,7 +106,7 @@ export async function markLast30dProcessed(
107106
result: Last30dRunResult,
108107
): Promise<void> {
109108
await qx.result(
110-
`INSERT INTO npm_package_universe_state (purl, downloads_30d_last_run_at, downloads_30d_run_result)
109+
`INSERT INTO npm_package_state (purl, downloads_30d_last_run_at, downloads_30d_run_result)
111110
VALUES ($(purl), NOW(), $(result)::jsonb)
112111
ON CONFLICT (purl) DO UPDATE SET
113112
downloads_30d_last_run_at = NOW(),
@@ -126,7 +125,7 @@ export async function markLast30dHistoryBackfilled(
126125
result: Last30dRunResult,
127126
): Promise<void> {
128127
await qx.result(
129-
`UPDATE npm_package_universe_state
128+
`UPDATE npm_package_state
130129
SET downloads_30d_history_backfilled_at = NOW(),
131130
downloads_30d_run_result = $(result)::jsonb
132131
WHERE purl = $(purl)`,

services/libs/data-access-layer/src/packages/npmPackageState.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ export interface NpmMetadataRunResult {
1111
}
1212

1313
// Mark a package as metadata-scanned and record the run outcome (+ timestamp). Keyed
14-
// by purl (from the packages row). metadata_first_scanned_at is kept from the first
15-
// insert; metadata_run_result/metadata_last_run_at are refreshed on every run.
14+
// by purl (from the packages row). metadata_run_result/metadata_last_run_at are refreshed
15+
// on every run — metadata_last_run_at is the authoritative "metadata has run" signal.
1616
export async function markNpmPackageScanned(
1717
qx: QueryExecutor,
1818
purl: string,
@@ -28,7 +28,9 @@ export async function markNpmPackageScanned(
2828
)
2929
}
3030

31-
// npm packages in the `packages` table that have never been metadata-scanned.
31+
// Critical npm packages in the `packages` table whose metadata has never been scanned.
32+
// Only is_critical packages are enriched — metadata is deep, per-package work, so it is
33+
// scoped to the critical set (matching the daily-downloads pass).
3234
// Keyset-paginated on purl so the workflow can drain a large first run across
3335
// many continueAsNew runs.
3436
export async function getUnscannedNpmPurls(
@@ -41,8 +43,9 @@ export async function getUnscannedNpmPurls(
4143
FROM packages p
4244
LEFT JOIN npm_package_state s ON s.purl = p.purl
4345
WHERE p.ecosystem = 'npm'
46+
AND p.is_critical = TRUE
4447
AND p.purl > $(afterPurl)
45-
AND s.purl IS NULL
48+
AND s.metadata_last_run_at IS NULL
4649
ORDER BY p.purl
4750
LIMIT $(batchSize)`,
4851
{ afterPurl, batchSize },
@@ -51,9 +54,11 @@ export async function getUnscannedNpmPurls(
5154
}
5255

5356
// Given a list of changed npm registry names (from the _changes feed), return the
54-
// purls of those that exist as npm rows in `packages`. The purl is read straight
55-
// from the row; feed names are matched against the decoded namespace/name columns
56-
// (the purl is percent-encoded, so substr(purl) would be %40scope/name).
57+
// purls of the critical ones that exist as npm rows in `packages`. The purl is read
58+
// straight from the row; feed names are matched against the decoded namespace/name
59+
// columns (the purl is percent-encoded, so substr(purl) would be %40scope/name).
60+
// Restricted to is_critical packages — non-critical packages are never metadata-scanned,
61+
// so there is nothing to keep fresh when they change.
5762
export async function getNpmPurlsForChangedNames(
5863
qx: QueryExecutor,
5964
names: string[],
@@ -64,7 +69,8 @@ export async function getNpmPurlsForChangedNames(
6469
FROM packages p
6570
JOIN unnest($(names)::text[]) AS u(name)
6671
ON (CASE WHEN p.namespace IS NOT NULL THEN p.namespace || '/' || p.name ELSE p.name END) = u.name
67-
WHERE p.ecosystem = 'npm'`,
72+
WHERE p.ecosystem = 'npm'
73+
AND p.is_critical = TRUE`,
6874
{ names },
6975
)
7076
return rows.map((r) => r.purl)

0 commit comments

Comments
 (0)