Skip to content

Commit 16d19bf

Browse files
committed
perf: fetch modified advisories in parallel
1 parent 6842219 commit 16d19bf

File tree

1 file changed

+56
-14
lines changed

1 file changed

+56
-14
lines changed

pkg/database/smart.go

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func parseModifiedIDRow(columns []string) (*modifiedIDRow, error) {
154154
return &modifiedIDRow{id: columns[1], modified: modified}, nil
155155
}
156156

157-
func (db *SmartDB) fetchModifiedIDs(since time.Time) ([]modifiedIDRow, error) {
157+
func (db *SmartDB) fetchModifiedIDs(since time.Time) ([]string, error) {
158158
url := strings.TrimSuffix(db.ArchiveURL, "/all.zip") + "/modified_id.csv"
159159

160160
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil)
@@ -172,15 +172,15 @@ func (db *SmartDB) fetchModifiedIDs(since time.Time) ([]modifiedIDRow, error) {
172172
defer resp.Body.Close()
173173

174174
if resp.StatusCode == http.StatusNotModified {
175-
return []modifiedIDRow{}, nil
175+
return nil, nil
176176
} else if resp.StatusCode != http.StatusOK {
177177
return nil, fmt.Errorf("%w (%s)", ErrUnexpectedStatusCode, resp.Status)
178178
}
179179

180180
i := 0
181181
r := csv.NewReader(resp.Body)
182182

183-
var rows []modifiedIDRow
183+
var ids []string
184184

185185
for {
186186
i++
@@ -203,10 +203,10 @@ func (db *SmartDB) fetchModifiedIDs(since time.Time) ([]modifiedIDRow, error) {
203203
break
204204
}
205205

206-
rows = append(rows, *row)
206+
ids = append(ids, row.id)
207207
}
208208

209-
return rows, nil
209+
return ids, nil
210210
}
211211

212212
func (db *SmartDB) updateAdvisory(id string) error {
@@ -241,22 +241,64 @@ func (db *SmartDB) updateAdvisory(id string) error {
241241
return err
242242
}
243243

244-
func (db *SmartDB) updateModifiedAdvisories(since time.Time) error {
245-
modifiedIDs, err := db.fetchModifiedIDs(since)
244+
func (db *SmartDB) downloadModifiedAdvisories(ids []string) error {
245+
conLimit := 200
246246

247-
if err != nil {
248-
return err
247+
if len(ids) == 0 {
248+
return nil
249+
}
250+
251+
// buffered channel which controls the number of concurrent operations
252+
semaphoreChan := make(chan struct{}, conLimit)
253+
resultsChan := make(chan *result)
254+
255+
defer func() {
256+
close(semaphoreChan)
257+
close(resultsChan)
258+
}()
259+
260+
for i, id := range ids {
261+
go func(i int, id string) {
262+
// read from the buffered semaphore channel, which will block if we're
263+
// already got as many goroutines as our concurrency limit allows
264+
//
265+
// when one of those routines finish they'll read from this channel,
266+
// freeing up a slot to unblock this send
267+
semaphoreChan <- struct{}{}
268+
269+
// use an empty OSV as we're reusing the result struct
270+
result := &result{i, OSV{}, db.updateAdvisory(id)}
271+
272+
resultsChan <- result
273+
274+
// read from the buffered semaphore to free up a slot to allow
275+
// another goroutine to start, since this one is wrapping up
276+
<-semaphoreChan
277+
}(i, id)
249278
}
250279

251-
for _, row := range modifiedIDs {
252-
err = db.updateAdvisory(row.id)
280+
var errs []error
253281

254-
if err != nil {
255-
return err
282+
for {
283+
result := <-resultsChan
284+
errs = append(errs, result.err)
285+
286+
if len(errs) == len(ids) {
287+
break
256288
}
257289
}
258290

259-
return nil
291+
return errors.Join(errs...)
292+
}
293+
294+
func (db *SmartDB) updateModifiedAdvisories(since time.Time) error {
295+
modifiedIDs, err := db.fetchModifiedIDs(since)
296+
297+
if err != nil {
298+
return err
299+
}
300+
301+
return db.downloadModifiedAdvisories(modifiedIDs)
260302
}
261303

262304
func (db *SmartDB) populate() (*time.Time, error) {

0 commit comments

Comments
 (0)