Skip to content

Commit bba3a14

Browse files
authored
feat: criticality worker init [CM-1214] (#4161)
Signed-off-by: Mouad BANI <mouad-mb@outlook.com> Signed-off-by: Mouad BANI <mbani@contractor.linuxfoundation.org>
1 parent 92c0b79 commit bba3a14

11 files changed

Lines changed: 643 additions & 21 deletions

File tree

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
-- Manual override table for criticality scoring (ADR-0001 §Spotlight overrides).
2+
-- Packages listed here are forced is_critical = TRUE regardless of computed score.
3+
-- Applied after ranking inside rank_packages_universe() so overrides survive
4+
-- every automated re-rank pass.
5+
--
6+
-- rationale, added_by, added_at are required — the table must stay auditable.
7+
-- namespace is nullable: cargo crates have no namespace, Maven artifacts do.
8+
-- The UNIQUE key uses COALESCE so (ecosystem, NULL namespace, name) is enforced correctly.
9+
10+
CREATE TABLE package_criticality_spotlight (
11+
id bigserial PRIMARY KEY,
12+
ecosystem text NOT NULL,
13+
namespace text,
14+
name text NOT NULL,
15+
rationale text NOT NULL,
16+
added_by text NOT NULL,
17+
added_at timestamptz NOT NULL DEFAULT NOW()
18+
);
19+
20+
-- Functional unique index: COALESCE treats NULL namespace as '' so that
21+
-- (cargo, NULL, tokio) and (cargo, NULL, serde) are unique but a duplicate
22+
-- (cargo, NULL, tokio) entry is rejected.
23+
CREATE UNIQUE INDEX ON package_criticality_spotlight
24+
(ecosystem, COALESCE(namespace, ''), name);
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
-- Renames criticality_score → impact on both packages_universe and packages,
2+
-- and installs rank_packages_universe() with the updated formula.
3+
--
4+
-- Formula (ADR-0001 §Criticality scoring methodology):
5+
-- impact = w_downloads * pct_rank( LOG(1 + downloads_last_30d) ) within ecosystem
6+
-- + w_dep_pkgs * pct_rank( LOG(1 + dependent_count) ) within ecosystem
7+
-- + w_transitive * pct_rank( LOG(1 + transitive_dependent_count) ) within ecosystem
8+
--
9+
-- Default weights: 0.25 / 0.25 / 0.50 (sum to 1.0).
10+
-- All weights and the top-N budget are call-time parameters — tunable without
11+
-- schema or code changes.
12+
--
13+
-- Steps inside the function:
14+
-- 1. Score — compute impact via weighted PERCENT_RANK()
15+
-- 2. Rank — ROW_NUMBER() per ecosystem, flag top-N as is_critical
16+
-- 2.5 Spotlight — force is_critical = TRUE for rows in package_criticality_spotlight
17+
-- 3. Propagate — copy impact + is_critical onto the packages table
18+
19+
ALTER TABLE packages_universe
20+
RENAME COLUMN criticality_score TO impact;
21+
22+
ALTER TABLE packages
23+
RENAME COLUMN criticality_score TO impact;
24+
25+
CREATE OR REPLACE FUNCTION rank_packages_universe(
26+
weight_downloads numeric DEFAULT 0.25,
27+
weight_dependent_packages numeric DEFAULT 0.25,
28+
weight_transitive numeric DEFAULT 0.50,
29+
critical_top_n_by_ecosystem jsonb DEFAULT '{
30+
"npm": 210000,
31+
"pypi": 140000,
32+
"maven": 120000,
33+
"nuget": 70000,
34+
"packagist": 56000,
35+
"go": 42000,
36+
"cargo": 28000,
37+
"rubygems": 21000,
38+
"docker": 13000
39+
}'::jsonb
40+
)
41+
RETURNS TABLE(scored_rows int, ranked_rows int, propagated_rows int)
42+
LANGUAGE plpgsql AS $$
43+
DECLARE
44+
n_scored int;
45+
n_ranked int;
46+
n_propagated int;
47+
BEGIN
48+
-- ── Step 1: score ──────────────────────────────────────────────────────────
49+
-- last_rank_pass_at updated unconditionally on every pass (schema requirement).
50+
WITH percentile_scores AS (
51+
SELECT
52+
id,
53+
(
54+
weight_downloads * PERCENT_RANK() OVER (
55+
PARTITION BY ecosystem ORDER BY LOG(1 + COALESCE(downloads_last_30d, 0)))
56+
57+
+ weight_dependent_packages * PERCENT_RANK() OVER (
58+
PARTITION BY ecosystem ORDER BY LOG(1 + COALESCE(dependent_count, 0)))
59+
60+
+ weight_transitive * PERCENT_RANK() OVER (
61+
PARTITION BY ecosystem ORDER BY LOG(1 + COALESCE(transitive_dependent_count, 0)))
62+
)::numeric(10, 4) AS new_impact
63+
FROM packages_universe
64+
)
65+
UPDATE packages_universe pu
66+
SET impact = ps.new_impact,
67+
last_rank_pass_at = NOW()
68+
FROM percentile_scores ps
69+
WHERE pu.id = ps.id;
70+
71+
GET DIAGNOSTICS n_scored = ROW_COUNT;
72+
73+
-- ── Step 2: rank + flag ────────────────────────────────────────────────────
74+
WITH ranked AS (
75+
SELECT
76+
id, ecosystem,
77+
ROW_NUMBER() OVER (
78+
PARTITION BY ecosystem
79+
ORDER BY impact DESC NULLS LAST, id
80+
) AS r
81+
FROM packages_universe
82+
WHERE purl IS NOT NULL
83+
),
84+
flagged AS (
85+
SELECT
86+
id, r,
87+
COALESCE(
88+
r <= (critical_top_n_by_ecosystem ->> ecosystem)::int,
89+
FALSE
90+
) AS new_is_critical
91+
FROM ranked
92+
)
93+
UPDATE packages_universe pu
94+
SET rank_in_ecosystem = f.r,
95+
is_critical = f.new_is_critical
96+
FROM flagged f
97+
WHERE pu.id = f.id
98+
AND (
99+
pu.rank_in_ecosystem IS DISTINCT FROM f.r
100+
OR pu.is_critical IS DISTINCT FROM f.new_is_critical
101+
);
102+
103+
GET DIAGNOSTICS n_ranked = ROW_COUNT;
104+
105+
-- ── Step 2.5: apply spotlight overrides ───────────────────────────────────
106+
-- Force is_critical = TRUE for any row in package_criticality_spotlight,
107+
-- regardless of computed score or rank. Runs after Step 2 so overrides
108+
-- survive every automated re-rank pass.
109+
-- IS NOT DISTINCT FROM handles the NULL namespace case (e.g. cargo crates).
110+
UPDATE packages_universe pu
111+
SET is_critical = TRUE
112+
FROM package_criticality_spotlight s
113+
WHERE pu.ecosystem = s.ecosystem
114+
AND (pu.namespace IS NOT DISTINCT FROM s.namespace)
115+
AND pu.name = s.name
116+
AND pu.is_critical = FALSE;
117+
118+
-- ── Step 3: propagate to packages ─────────────────────────────────────────
119+
-- last_rank_pass_at updated unconditionally on every pass (schema requirement).
120+
UPDATE packages p
121+
SET impact = pu.impact,
122+
is_critical = pu.is_critical,
123+
last_rank_pass_at = NOW()
124+
FROM packages_universe pu
125+
WHERE p.purl = pu.purl
126+
AND p.ecosystem = pu.ecosystem;
127+
128+
GET DIAGNOSTICS n_propagated = ROW_COUNT;
129+
130+
RETURN QUERY SELECT n_scored, n_ranked, n_propagated;
131+
END;
132+
$$;

docs/adr/0001-oss-packages-design-decisions.md

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -118,24 +118,22 @@ PageRank centrality is the primary blast-radius signal; transitive dependent cou
118118
Per-ecosystem percentile-rank of each log-transformed signal, then weighted blend:
119119

120120
```
121-
score = w_downloads * pct_rank( LN(1 + downloads_last_30d) ) within ecosystem
122-
+ w_dep_pkgs * pct_rank( LN(1 + dependent_packages_count) ) within ecosystem
123-
+ w_dep_repos * pct_rank( LN(1 + dependent_repos_count) ) within ecosystem
124-
+ w_transitive * pct_rank( LN(1 + transitive_dependent_count) ) within ecosystem
125-
+ w_centrality * pct_rank( centrality_score ) within ecosystem
121+
impact = w_downloads * pct_rank( LOG(1 + downloads_last_30d) ) within ecosystem
122+
+ w_dep_pkgs * pct_rank( LOG(1 + dependent_count) ) within ecosystem
123+
+ w_transitive * pct_rank( LOG(1 + transitive_dependent_count) ) within ecosystem
126124
```
127125

128-
Weights sum to 1.0 → score`[0, 1]`. Centrality skips the `LN()` (PageRank is already in a small bounded range) but still passes through `pct_rank` so every signal lands on the same percentile scale. Starting weight bias: centrality dominant (PageRank is the primary blast-radius signal), transitive count low (kept as a sanity floor — see Inputs note on double-counting), direct dependents and downloads balanced as secondary popularity signals. All weights are call-time numeric parameters to `rank_packages_universe()` — tunable without schema or code changes.
126+
Weights sum to 1.0 → impact`[0, 1]`. `dependent_count` is direct dependent packages only; `transitive_dependent_count` is indirect dependents only. All weights are call-time numeric parameters to `rank_packages_universe()` — tunable without schema or code changes.
129127

130-
**Suggested starting weights** (use as the first call, then iterate):
128+
`centrality_score` (PageRank) is computed and stored on `packages_universe` by the criticality worker and will be added to the formula if needed.
131129

132-
| Weight | Value | Signal | Rationale |
133-
| --------------- | ----- | -------------------- | ------------------------------------------------------ |
134-
| `w_centrality` | 0.40 | PageRank | Primary blast-radius signal |
135-
| `w_transitive` | 0.10 | Transitive dependents | Sanity floor; low to avoid double-counting centrality |
136-
| `w_dep_pkgs` | 0.20 | Direct dependent packages | Popularity within the package graph |
137-
| `w_dep_repos` | 0.15 | Direct dependent repos | Popularity across consumer codebases |
138-
| `w_downloads` | 0.15 | 30-day downloads | Adoption signal, lighter weight (noisy for new packages) |
130+
**Current weights** (defaults in `rank_packages_universe()`, iterate once the ranked list is observable):
131+
132+
| Weight | Value | Signal | Rationale |
133+
| --------------- | ----- | --------------------------- | -------------------------------------------------------------------- |
134+
| `w_transitive` | 0.50 | Indirect dependent packages | Primary blast-radius signal — captures packages invisible to direct counts |
135+
| `w_dep_pkgs` | 0.25 | Direct dependent packages | Popularity within the package graph |
136+
| `w_downloads` | 0.25 | 30-day downloads | Adoption signal, balanced with dependency reach |
139137

140138
These are a starting point, not a recommendation we've validated. They will be revised once the first ranked list is observable and stakeholders review which packages land in / near Tier 1 — particularly for smaller ecosystems where the percentile distribution is less stable.
141139

services/apps/packages_worker/package.json

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,24 @@
22
"name": "@crowd/packages-worker",
33
"private": true,
44
"scripts": {
5+
"start:packages-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=packages-worker tsx src/bin/packages-worker.ts",
6+
"start:criticality-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=criticality-worker tsx src/bin/criticality-worker.ts",
57
"start:deps-dev-ingest": "CROWD_TEMPORAL_TASKQUEUE=deps-dev-ingest CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=deps-dev-ingest tsx src/bin/deps-dev-ingest.ts",
6-
"start:github-repos-enricher": "SERVICE=github-repos-enricher tsx src/bin/github-repos-enricher.ts",
78
"start:npm-worker": "CROWD_TEMPORAL_TASKQUEUE=npm-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=npm-worker tsx src/bin/npm-worker.ts",
8-
"start:packages-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=packages-worker tsx src/bin/packages-worker.ts",
9+
"start:github-repos-enricher": "SERVICE=github-repos-enricher tsx src/bin/github-repos-enricher.ts",
10+
"run:pagerank": "tsx src/criticality/run-pagerank.ts",
11+
"run:impact": "tsx src/criticality/run-impact.ts",
12+
"dev:pagerank": "tsx --expose-gc src/criticality/run-pagerank.ts",
13+
"dev:packages-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=packages-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9233 src/bin/packages-worker.ts",
14+
"dev:criticality-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=criticality-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9237 src/bin/criticality-worker.ts",
915
"dev:deps-dev-ingest": "CROWD_TEMPORAL_TASKQUEUE=deps-dev-ingest CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=deps-dev-ingest nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9235 src/bin/deps-dev-ingest.ts",
10-
"dev:deps-dev-ingest:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=deps-dev-ingest CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=deps-dev-ingest nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9235 src/bin/deps-dev-ingest.ts",
11-
"dev:github-repos-enricher": "SERVICE=github-repos-enricher LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9234 src/bin/github-repos-enricher.ts",
12-
"dev:github-repos-enricher:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=github-repos-enricher LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9234 src/bin/github-repos-enricher.ts",
1316
"dev:npm-worker": "CROWD_TEMPORAL_TASKQUEUE=npm-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=npm-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9236 src/bin/npm-worker.ts",
14-
"dev:npm-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=npm-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=npm-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9236 src/bin/npm-worker.ts",
15-
"dev:packages-worker": "CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=packages-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9233 src/bin/packages-worker.ts",
17+
"dev:github-repos-enricher": "SERVICE=github-repos-enricher LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9234 src/bin/github-repos-enricher.ts",
1618
"dev:packages-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=packages-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9233 src/bin/packages-worker.ts",
19+
"dev:criticality-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=packages-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=criticality-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9237 src/bin/criticality-worker.ts",
20+
"dev:deps-dev-ingest:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=deps-dev-ingest CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=deps-dev-ingest nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9235 src/bin/deps-dev-ingest.ts",
21+
"dev:npm-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=npm-worker CROWD_TEMPORAL_NAMESPACE=$CROWD_PACKAGES_TEMPORAL_NAMESPACE SERVICE=npm-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9236 src/bin/npm-worker.ts",
22+
"dev:github-repos-enricher:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=github-repos-enricher LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9234 src/bin/github-repos-enricher.ts",
1723
"export-to-bucket": "SERVICE=deps-dev-ingest tsx src/scripts/exportToBucket.ts",
1824
"export-to-bucket:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=deps-dev-ingest tsx src/scripts/exportToBucket.ts",
1925
"monitor:osspckgs": "SERVICE=monitor tsx src/scripts/monitorOsspckgs.ts",
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { getServiceLogger } from '@crowd/logging'
2+
3+
import { getPackagesDb } from '../db'
4+
5+
const log = getServiceLogger()
6+
7+
let shuttingDown = false
8+
9+
process.on('SIGINT', () => {
10+
shuttingDown = true
11+
})
12+
process.on('SIGTERM', () => {
13+
shuttingDown = true
14+
})
15+
16+
async function main() {
17+
log.info('criticality-worker starting')
18+
19+
const qx = await getPackagesDb()
20+
await qx.selectOne('SELECT 1')
21+
log.info(
22+
'Connected to packages-db. Ready — trigger PageRank via run:pagerank or impact score via rank_packages_universe().',
23+
)
24+
25+
while (!shuttingDown) {
26+
await new Promise((resolve) => setTimeout(resolve, 5_000))
27+
}
28+
29+
log.info('criticality-worker stopped')
30+
process.exit(0)
31+
}
32+
33+
main().catch((err) => {
34+
log.error({ err }, 'criticality-worker fatal error')
35+
process.exit(1)
36+
})
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import { Context } from '@temporalio/activity'
2+
3+
import { getServiceChildLogger } from '@crowd/logging'
4+
5+
import { getPackagesDb } from '../db'
6+
7+
import { buildGraph, computePageRank } from './graph'
8+
import { loadDirectEdges, mergeCentralityScores } from './queries'
9+
import { CentralityInput, CentralityResult } from './types'
10+
11+
const log = getServiceChildLogger('criticality')
12+
13+
const PAGERANK_DAMPING = 0.85
14+
const PAGERANK_MAX_ITER = 100
15+
const PAGERANK_CONVERGENCE = 1e-6
16+
17+
export async function criticalityComputePageRank(
18+
input: CentralityInput,
19+
): Promise<CentralityResult> {
20+
const { ecosystem } = input
21+
const damping = PAGERANK_DAMPING
22+
const maxIter = PAGERANK_MAX_ITER
23+
const convergence = PAGERANK_CONVERGENCE
24+
const start = Date.now()
25+
const qx = await getPackagesDb()
26+
27+
// ── Step 1: build CSR graph
28+
const edges = await loadDirectEdges(qx, ecosystem)
29+
const edgeCount = edges.length
30+
const graph = buildGraph(edges)
31+
edges.length = 0 // release JS edge objects — CSR holds all graph data
32+
log.info({ ecosystem, nodeCount: graph.N, edgeCount }, 'graph loaded')
33+
34+
// ── Step 2 & 3: PageRank
35+
const { scores, iterations } = computePageRank(
36+
graph,
37+
damping,
38+
maxIter,
39+
convergence,
40+
(iter, delta) => {
41+
try {
42+
Context.current().heartbeat({ ecosystem, iter, delta })
43+
} catch {
44+
/* standalone */
45+
}
46+
},
47+
)
48+
log.info({ ecosystem, iterations, nodeCount: graph.N }, 'PageRank converged')
49+
50+
// ── Step 4: merge centrality_score into packages_universe
51+
// Stream map entries into fixed-size chunks — O(CHUNK) extra memory, not O(N).
52+
const CHUNK = 10_000
53+
let buffer: Array<{ packageId: number; centralityScore: number }> = []
54+
55+
for (const [packageId, idx] of graph.nodeIndex) {
56+
buffer.push({ packageId, centralityScore: scores[idx] })
57+
if (buffer.length === CHUNK) {
58+
await mergeCentralityScores(qx, buffer)
59+
buffer = []
60+
}
61+
}
62+
if (buffer.length > 0) await mergeCentralityScores(qx, buffer)
63+
64+
return { ecosystem, nodeCount: graph.N, edgeCount, iterations, durationMs: Date.now() - start }
65+
}

0 commit comments

Comments
 (0)