Skip to content

Commit b7c4084

Browse files
committed
feat: remove the ossf datasource and read from new lf api source
Signed-off-by: Umberto Sgueglia <usgueglia@contractor.linuxfoundation.org>
1 parent deda42a commit b7c4084

7 files changed

Lines changed: 78 additions & 240 deletions

File tree

services/apps/automatic_projects_discovery_worker/src/activities/activities.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { parse } from 'csv-parse'
22

3-
import { bulkUpsertProjectCatalog } from '@crowd/data-access-layer'
3+
import { bulkUpsertProjectCatalog, findLatestProjectCatalogSyncedAt } from '@crowd/data-access-layer'
44
import { IDbProjectCatalogCreate } from '@crowd/data-access-layer/src/project-catalog/types'
55
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
66
import { getServiceLogger } from '@crowd/logging'
@@ -19,7 +19,14 @@ export async function listSources(): Promise<string[]> {
1919

2020
export async function listDatasets(sourceName: string): Promise<IDatasetDescriptor[]> {
2121
const source = getSource(sourceName)
22-
const datasets = await source.listAvailableDatasets()
22+
23+
const qx = pgpQx(svc.postgres.reader.connection())
24+
const latestSyncedAt = await findLatestProjectCatalogSyncedAt(qx)
25+
const scoredAfter = latestSyncedAt ? latestSyncedAt.toISOString().slice(0, 10) : undefined
26+
27+
log.info({ sourceName, scoredAfter: scoredAfter ?? 'none (full fetch)' }, 'Listing datasets.')
28+
29+
const datasets = await source.listAvailableDatasets({ scoredAfter })
2330

2431
log.info({ sourceName, count: datasets.length, newest: datasets[0]?.id }, 'Datasets listed.')
2532

@@ -36,7 +43,9 @@ export async function processDataset(
3643
log.info({ sourceName, datasetId: dataset.id, url: dataset.url }, 'Processing dataset...')
3744

3845
const source = getSource(sourceName)
46+
log.info({ sourceName, datasetId: dataset.id }, 'Opening dataset stream...')
3947
const stream = await source.fetchDatasetStream(dataset)
48+
log.info({ sourceName, datasetId: dataset.id }, 'Dataset stream opened.')
4049

4150
// For CSV sources: pipe through csv-parse to get Record<string, string> objects.
4251
// For JSON sources: the stream already emits pre-parsed objects in object mode.
@@ -103,6 +112,7 @@ export async function processDataset(
103112
// Flush remaining rows that didn't fill a complete batch
104113
if (batch.length > 0) {
105114
batchNumber++
115+
log.info({ sourceName, datasetId: dataset.id, batchSize: batch.length }, 'Flushing final batch...')
106116
await bulkUpsertProjectCatalog(qx, batch)
107117
totalProcessed += batch.length
108118
}

services/apps/automatic_projects_discovery_worker/src/sources/lf-criticality-score/source.ts

Lines changed: 53 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import { IDatasetDescriptor, IDiscoverySource, IDiscoverySourceRow } from '../ty
88

99
const log = getServiceLogger()
1010

11-
const DEFAULT_API_URL = 'https://lf-criticality-score-api.example.com'
11+
const DEFAULT_API_HOST = 'lf-criticality-score-api.example.com'
12+
const DEFAULT_API_PORT = 443
1213
const PAGE_SIZE = 100
1314

1415
interface LfApiResponse {
@@ -20,30 +21,37 @@ interface LfApiResponse {
2021
}
2122

2223
interface LfApiRow {
23-
runDate: string
24-
repoUrl: string
24+
rundate: string
25+
repourl: string
2526
owner: string
26-
repoName: string
27+
reponame: string
2728
contributors: number
2829
organizations: number
29-
sizeSloc: number
30-
lastUpdated: number
30+
sizesloc: number
31+
lastupdated: number
3132
age: number
32-
commitFreq: number
33+
commitfreq: number
3334
score: number
3435
}
3536

3637
function getApiBaseUrl(): string {
37-
return (process.env.LF_CRITICALITY_SCORE_API_URL ?? DEFAULT_API_URL).replace(/\/$/, '')
38+
if (process.env.LF_CRITICALITY_SCORE_API_URL) {
39+
return process.env.LF_CRITICALITY_SCORE_API_URL.replace(/\/$/, '')
40+
}
41+
const host = (process.env.LF_CRITICALITY_SCORE_API_HOST ?? DEFAULT_API_HOST).replace(/\/$/, '')
42+
const port = parseInt(process.env.LF_CRITICALITY_SCORE_API_PORT ?? String(DEFAULT_API_PORT), 10)
43+
const scheme = port === 443 ? 'https' : 'http'
44+
return `${scheme}://${host}:${port}`
3845
}
3946

4047
async function fetchPage(
4148
baseUrl: string,
42-
startDate: string,
43-
endDate: string,
4449
page: number,
50+
scoredAfter?: string,
4551
): Promise<LfApiResponse> {
46-
const url = `${baseUrl}/projects/scores?startDate=${startDate}&endDate=${endDate}&page=${page}&pageSize=${PAGE_SIZE}`
52+
const params = new URLSearchParams({ page: String(page), pageSize: String(PAGE_SIZE) })
53+
if (scoredAfter) params.set('scoredAfter', scoredAfter)
54+
const url = `${baseUrl}/projects?${params.toString()}`
4755

4856
return new Promise((resolve, reject) => {
4957
const client = url.startsWith('https://') ? https : http
@@ -72,88 +80,73 @@ async function fetchPage(
7280
})
7381
}
7482

75-
/**
76-
* Generates the first day and last day of a given month.
77-
* monthOffset = 0 → current month, -1 → previous month, etc.
78-
*/
79-
function monthRange(monthOffset: number): { startDate: string; endDate: string } {
80-
const now = new Date()
81-
const year = now.getUTCFullYear()
82-
const month = now.getUTCMonth() + monthOffset // can be negative; Date handles rollover
83-
84-
const first = new Date(Date.UTC(year, month, 1))
85-
const last = new Date(Date.UTC(year, month + 1, 0)) // last day of month
86-
87-
const pad = (n: number) => String(n).padStart(2, '0')
88-
const fmt = (d: Date) =>
89-
`${d.getUTCFullYear()}-${pad(d.getUTCMonth() + 1)}-${pad(d.getUTCDate())}`
90-
91-
return { startDate: fmt(first), endDate: fmt(last) }
92-
}
93-
9483
export class LfCriticalityScoreSource implements IDiscoverySource {
9584
public readonly name = 'lf-criticality-score'
9685
public readonly format = 'json' as const
9786

98-
async listAvailableDatasets(): Promise<IDatasetDescriptor[]> {
87+
async listAvailableDatasets(options?: { scoredAfter?: string }): Promise<IDatasetDescriptor[]> {
9988
const baseUrl = getApiBaseUrl()
100-
101-
// Return one dataset per month for the last 12 months (newest first)
102-
const datasets: IDatasetDescriptor[] = []
103-
104-
for (let offset = 0; offset >= -11; offset--) {
105-
const { startDate, endDate } = monthRange(offset)
106-
const id = startDate.slice(0, 7) // e.g. "2026-02"
107-
108-
datasets.push({
109-
id,
110-
date: startDate,
111-
url: `${baseUrl}/projects/scores?startDate=${startDate}&endDate=${endDate}`,
112-
})
113-
}
114-
115-
return datasets
89+
const today = new Date().toISOString().slice(0, 10)
90+
const { scoredAfter } = options ?? {}
91+
92+
const params = new URLSearchParams()
93+
if (scoredAfter) params.set('scoredAfter', scoredAfter)
94+
const qs = params.toString()
95+
96+
return [
97+
{
98+
id: scoredAfter ? `${today}-since-${scoredAfter}` : today,
99+
date: today,
100+
url: `${baseUrl}/projects${qs ? `?${qs}` : ''}`,
101+
},
102+
]
116103
}
117104

118-
/**
119-
* Returns an object-mode Readable that fetches all pages from the API
120-
* and pushes each row as a plain object. Activities.ts iterates this
121-
* directly (no csv-parse) because format === 'json'.
122-
*/
123105
async fetchDatasetStream(dataset: IDatasetDescriptor): Promise<Readable> {
124106
const baseUrl = getApiBaseUrl()
107+
const scoredAfter = new URL(dataset.url).searchParams.get('scoredAfter') ?? undefined
125108

126-
// Extract startDate and endDate from the stored URL
127-
const parsed = new URL(dataset.url)
128-
const startDate = parsed.searchParams.get('startDate') ?? ''
129-
const endDate = parsed.searchParams.get('endDate') ?? ''
109+
log.info(
110+
{ datasetId: dataset.id, baseUrl, scoredAfter: scoredAfter ?? 'none (full fetch)' },
111+
'LF Criticality Score: starting stream fetch.',
112+
)
130113

131114
async function* pages() {
132115
let page = 1
133116
let totalPages = 1
134117

135118
do {
136-
const response = await fetchPage(baseUrl, startDate, endDate, page)
119+
log.info({ datasetId: dataset.id, page, totalPages }, 'LF Criticality Score: fetching page...')
120+
const response = await fetchPage(baseUrl, page, scoredAfter)
137121
totalPages = response.totalPages
138122

123+
if (page === 1) {
124+
log.info(
125+
{ datasetId: dataset.id, total: response.total, totalPages, pageSize: response.pageSize },
126+
'LF Criticality Score: first page received — total records available.',
127+
)
128+
}
129+
139130
for (const row of response.data) {
140131
yield row
141132
}
142133

143-
log.debug(
134+
log.info(
144135
{ datasetId: dataset.id, page, totalPages, rowsInPage: response.data.length },
145-
'LF Criticality Score page fetched.',
136+
'LF Criticality Score: page fetched.',
146137
)
147138

148139
page++
149140
} while (page <= totalPages)
141+
142+
log.info({ datasetId: dataset.id, totalPages }, 'LF Criticality Score: all pages fetched.')
150143
}
151144

152145
return Readable.from(pages(), { objectMode: true })
153146
}
154147

155148
parseRow(rawRow: Record<string, unknown>): IDiscoverySourceRow | null {
156-
const repoUrl = rawRow['repoUrl'] as string | undefined
149+
const repoUrl = (rawRow['repourl'] ?? rawRow['repoUrl']) as string | undefined
157150
if (!repoUrl) {
158151
return null
159152
}

services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/bucketClient.ts

Lines changed: 0 additions & 96 deletions
This file was deleted.

services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/source.ts

Lines changed: 0 additions & 75 deletions
This file was deleted.

0 commit comments

Comments
 (0)