-
Notifications
You must be signed in to change notification settings - Fork 729
Expand file tree
/
Copy pathactivities.ts
More file actions
134 lines (108 loc) · 3.86 KB
/
Copy pathactivities.ts
File metadata and controls
134 lines (108 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import { parse } from 'csv-parse'
import { bulkUpsertProjectCatalog } from '@crowd/data-access-layer'
import { IDbProjectCatalogCreate } from '@crowd/data-access-layer/src/project-catalog/types'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { getServiceLogger } from '@crowd/logging'
import { svc } from '../main'
import { getAvailableSourceNames, getSource } from '../sources/registry'
import { IDatasetDescriptor } from '../sources/types'
const log = getServiceLogger()
const BATCH_SIZE = 5000
export async function listSources(): Promise<string[]> {
return getAvailableSourceNames()
}
export async function listDatasets(sourceName: string): Promise<IDatasetDescriptor[]> {
const source = getSource(sourceName)
log.info({ sourceName }, 'Listing datasets.')
const datasets = await source.listAvailableDatasets()
log.info({ sourceName, count: datasets.length, newest: datasets[0]?.id }, 'Datasets listed.')
return datasets
}
export async function processDataset(
sourceName: string,
dataset: IDatasetDescriptor,
): Promise<void> {
const qx = pgpQx(svc.postgres.writer.connection())
const startTime = Date.now()
log.info({ sourceName, datasetId: dataset.id, url: dataset.url }, 'Processing dataset...')
const source = getSource(sourceName)
log.info({ sourceName, datasetId: dataset.id }, 'Opening dataset stream...')
const stream = await source.fetchDatasetStream(dataset)
log.info({ sourceName, datasetId: dataset.id }, 'Dataset stream opened.')
// For CSV sources: pipe through csv-parse to get Record<string, string> objects.
// For JSON sources: the stream already emits pre-parsed objects in object mode.
const records =
source.format === 'json'
? stream
: stream.pipe(
parse({
columns: true,
skip_empty_lines: true,
trim: true,
}),
)
// pipe() does not forward source errors to the destination automatically, so we
// destroy records explicitly — this surfaces the error in the for-await loop and
// lets Temporal mark the activity as failed and retry it.
stream.on('error', (err: Error) => {
log.error({ datasetId: dataset.id, error: err.message }, 'Stream error.')
records.destroy(err)
})
if (source.format !== 'json') {
const csvRecords = records as ReturnType<typeof parse>
csvRecords.on('error', (err) => {
log.error({ datasetId: dataset.id, error: err.message }, 'CSV parser error.')
})
}
let batch: IDbProjectCatalogCreate[] = []
let totalProcessed = 0
let totalSkipped = 0
let batchNumber = 0
let totalRows = 0
for await (const rawRow of records) {
totalRows++
const parsed = source.parseRow(rawRow as Record<string, unknown>)
if (!parsed) {
totalSkipped++
continue
}
batch.push({
projectSlug: parsed.projectSlug,
repoName: parsed.repoName,
repoUrl: parsed.repoUrl,
source: sourceName,
action: parsed.action ?? 'auto',
lfCriticalityScore: parsed.lfCriticalityScore,
})
if (batch.length >= BATCH_SIZE) {
batchNumber++
await bulkUpsertProjectCatalog(qx, batch)
totalProcessed += batch.length
batch = []
log.info({ totalProcessed, batchNumber, datasetId: dataset.id }, 'Batch upserted.')
}
}
// Flush remaining rows that didn't fill a complete batch
if (batch.length > 0) {
batchNumber++
log.info(
{ sourceName, datasetId: dataset.id, batchSize: batch.length },
'Flushing final batch...',
)
await bulkUpsertProjectCatalog(qx, batch)
totalProcessed += batch.length
}
const elapsedSeconds = ((Date.now() - startTime) / 1000).toFixed(1)
log.info(
{
sourceName,
datasetId: dataset.id,
totalRows,
totalProcessed,
totalSkipped,
totalBatches: batchNumber,
elapsedSeconds,
},
'Dataset processing complete.',
)
}