Skip to content

Commit ef781ff

Browse files
committed
fixes import retry would rollback whole transaction - now only the staging tables get truncated for the retry
1 parent 1389336 commit ef781ff

5 files changed

Lines changed: 177 additions & 12 deletions

File tree

vulndb/cisa_kev_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func insertCISAKEVBulk(ctx context.Context, tx pgx.Tx, entries []CISAKEVEntry) e
169169
return nil
170170
}
171171
if _, err := tx.Exec(ctx, `
172-
CREATE TEMP TABLE kev_stage (
172+
CREATE TEMP TABLE IF NOT EXISTS kev_stage (
173173
cve text,
174174
cisa_exploit_add date,
175175
cisa_action_due date,

vulndb/epss_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func insertEPSSBulk(ctx context.Context, tx pgx.Tx, epssData map[string]dtos.EPS
9292
return nil
9393
}
9494
if _, err := tx.Exec(ctx, `
95-
CREATE TEMP TABLE epss_stage (
95+
CREATE TEMP TABLE IF NOT EXISTS epss_stage (
9696
cve_id text,
9797
epss numeric(6,5),
9898
percentile numeric(6,5)

vulndb/import_debug.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Copyright (C) 2026 l3montree GmbH
2+
//
3+
// This program is free software: you can redistribute it and/or modify
4+
// it under the terms of the GNU Affero General Public License as
5+
// published by the Free Software Foundation, either version 3 of the
6+
// License, or (at your option) any later version.
7+
//
8+
// This program is distributed in the hope that it will be useful,
9+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
// GNU Affero General Public License for more details.
12+
//
13+
// You should have received a copy of the GNU Affero General Public License
14+
// along with this program. If not, see <https://www.gnu.org/licenses/>.
15+
package vulndb
16+
17+
import (
18+
"bytes"
19+
"encoding/json"
20+
"os"
21+
"testing"
22+
"time"
23+
24+
"github.com/l3montree-dev/devguard/dtos"
25+
"github.com/l3montree-dev/devguard/transformer"
26+
)
27+
28+
/*
29+
5:47AM ERR vulndb/integrity.go:116 invalid checksum when importing table=exploits expectedCount=8523 actualCount=8531 expectedChecksum=6232333932343434376236666336666535663665393965313334633335303563 actualChecksum=3335373562623638303565326630343838356534663462636435656662633461
30+
5:47AM ERR vulndb/integrity.go:116 invalid checksum when importing table=cve_relationships expectedCount=215759 actualCount=215788 expectedChecksum=6439313364643532363862326331373839346234343838356536316238643034 actualChecksum=3766303064303736356131643233663934343961316231366364373664376337
31+
5:47AM ERR vulndb/integrity.go:116 invalid checksum when importing table=affected_components expectedCount=2161081 actualCount=2161314 expectedChecksum=3730363337353037646331346366346436376539383930636664643338393630 actualChecksum=3931396231656432336331393661663066343634613062343638633038376663
32+
5:47AM ERR vulndb/integrity.go:116 invalid checksum when importing table=cves expectedCount=177924 actualCount=177937 expectedChecksum=3763393961343964323137366431336632653134373566363138373365313061 actualChecksum=3639396336643561346561343866313334656338393461323435343439316636
33+
5:47AM ERR vulndb/integrity.go:116 invalid checksum when importing table=cve_affected_component expectedCount=9495979 actualCount=9496443 expectedChecksum=3330326465346438323864633837303139656665633963366138316538343535 actualChecksum=6564616333633034393337363539613938633833333666313933303361663438
34+
35+
There are MORE cves in the database after the import than expected expectedCount=177924 actualCount=177937
36+
*/
37+
func TestImportRC(t *testing.T) {
38+
// extract the vulndb.tar.zst fixture to a temp dir
39+
if _, err := os.Stat("vulndb-testdata-new"); os.IsNotExist(err) {
40+
if err := untarZstd("vulndb-new.tar.zst", "vulndb-testdata-new"); err != nil {
41+
t.Fatalf("could not extract vulndb testdata: %v", err)
42+
}
43+
}
44+
/*lastImportTime, err := time.Parse(time.RFC3339Nano, "2026-05-10T05:10:29.831137845Z")
45+
if err != nil {
46+
t.Fatalf("could not parse last import time: %v", err)
47+
}*/
48+
/*
49+
50+
Okay ich habe keine Ahnung was ich hier tppe
51+
*/
52+
53+
currentStateFile, err := os.ReadFile("prod-db-cves-full.csv")
54+
if err != nil {
55+
t.Fatalf("could not read current state file: %v", err)
56+
}
57+
// split into lines
58+
lines := bytes.Split(currentStateFile, []byte{'\n'})
59+
// remove the first line (header)
60+
lines = lines[1:]
61+
62+
// read the whole osv.go file and check what cves we would insert right now
63+
osvEntries, err := readAllGobItems[OSVEntry]("vulndb-testdata-new/osv.gob")
64+
if err != nil {
65+
t.Fatalf("could not read osv gob file: %v", err)
66+
}
67+
68+
// remove all malicious packages and components from the osvEntries, as they are not part of the RC import
69+
filteredOSVEntries := make([]OSVEntry, 0, len(osvEntries))
70+
for _, entry := range osvEntries {
71+
if !bytes.HasPrefix([]byte(entry.OSV.ID), []byte("MAL-")) {
72+
filteredOSVEntries = append(filteredOSVEntries, entry)
73+
}
74+
}
75+
// check what cves we would insert right now
76+
cves := gobOSVToVulnFilterTransformer(time.Time{}, nil)(filteredOSVEntries)
77+
78+
// check which cves we would insert right now are not in the current state file
79+
currentStateMap := make(map[string]struct{})
80+
for _, line := range lines {
81+
// split by comma and get the first column (cve id)
82+
columns := bytes.Split(line, []byte{','})
83+
if len(columns) > 0 {
84+
currentStateMap[string(columns[0])] = struct{}{}
85+
}
86+
}
87+
88+
newStateMap := make(map[string]struct{})
89+
for _, cve := range cves.CVEs {
90+
newStateMap[cve.CVE] = struct{}{}
91+
}
92+
93+
// check which cves are in the currentStateMap but not in the newStateMap
94+
for cve := range currentStateMap {
95+
if _, exist := newStateMap[cve]; !exist {
96+
t.Logf("CVE in current state but not in new state: %s", cve)
97+
}
98+
}
99+
t.Fail()
100+
}
101+
102+
/*
103+
--- FAIL: TestImportRC (3.85s)
104+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-579f-8639-173e
105+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-e780-297e-3c37
106+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-01ac-8821-274a
107+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-f04c-582a-df62
108+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-1dc5-af13-00c1
109+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-7627-a361-b4d3
110+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-5818-1fba-950a
111+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-de02-7575-4370
112+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-37cc-2ae7-e3c8
113+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-34c7-ca18-1a8c
114+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-435f-9eb9-99cb
115+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-f4ca-f938-4210
116+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-7f2f-e83a-5508
117+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-879a-fe35-cf61
118+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state: ECHO-c9a3-95ec-f0d8
119+
/Users/timbastin/Desktop/l3montree/devguard/vulndb/import_test.go:88: CVE in current state but not in new state:
120+
*/
121+
122+
func TestWouldBeDeleted(t *testing.T) {
123+
b, err := os.ReadFile("test.osv.json")
124+
if err != nil {
125+
t.Fatalf("could not read test osv file: %v", err)
126+
}
127+
128+
// parse as osv entry
129+
var entry dtos.OSV
130+
if err := json.Unmarshal(b, &entry); err != nil {
131+
t.Fatalf("could not parse test osv file: %v", err)
132+
}
133+
134+
relationships := transformer.OSVToCVERelationships(&entry)
135+
affectedComponentsForCVE := transformer.AffectedComponentsFromOSV(&entry)
136+
if len(affectedComponentsForCVE) == 0 && len(relationships) == 0 {
137+
t.Logf("no relationships or affected components for CVE %s", entry.ID)
138+
}
139+
140+
cve := transformer.OSVToCVE(&entry)
141+
if cve.CVE == "" {
142+
t.Logf("could not transform OSV to CVE: %s", entry.ID)
143+
}
144+
}

vulndb/osv_service.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func (s osvService) fetchAndImportOSV(ctx context.Context, tx pgx.Tx, importStar
233233
kept = append(kept, e)
234234
}
235235
}
236-
slog.Info("filtered OSV entries after cleanup", "before", len(vulnRows.CVEs), "after", len(kept))
236+
slog.Info("filtered OSV entries after cleanup", "before", len(vulnRows.CVEs), "after", len(surviving))
237237
return kept, surviving, nil
238238
}
239239

@@ -343,6 +343,9 @@ func (s osvService) zipWorkerFunction(zipWorkWaitGroup *sync.WaitGroup, zipJobs
343343
continue
344344
}
345345
readCloser.Close()
346+
if osvEntry.ID == "ECHO-7f2f-e83a-5508" {
347+
slog.Info("found test entry, skipping", "id", osvEntry.ID)
348+
}
346349

347350
if shouldIgnoreVulnerabilityID(osvEntry.ID) {
348351
continue
@@ -653,6 +656,24 @@ func createStagingTables(ctx context.Context, tx pgx.Tx) error {
653656
return nil
654657
}
655658

659+
func clearStagingTables(ctx context.Context, tx pgx.Tx) error {
660+
_, err := tx.Exec(ctx, `
661+
TRUNCATE TABLE cves_stage;
662+
TRUNCATE TABLE cve_relationships_stage;
663+
TRUNCATE TABLE affected_components_stage;
664+
TRUNCATE TABLE cve_affected_component_stage;
665+
TRUNCATE TABLE exploits_stage;
666+
TRUNCATE TABLE mal_pkgs_stage;
667+
TRUNCATE TABLE mal_comps_stage;
668+
TRUNCATE TABLE epss_stage;
669+
TRUNCATE TABLE kev_stage;
670+
`)
671+
if err != nil {
672+
return fmt.Errorf("could not clear staging tables: %w", err)
673+
}
674+
return nil
675+
}
676+
656677
// if we insert a lot of entries its faster to drop indexes and constrains and then rebuilding them afterwards instead of maintaining them on each insert
657678
// also set some session parameters optimized for bulk inserts
658679
func PrepareBulkInsert(ctx context.Context, tx pgx.Tx) error {

vulndb/vulndb_service.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -369,17 +369,15 @@ func (s *VulnDBService) ImportRC(ctx context.Context, opts shared.ImportOptions)
369369
if err != nil {
370370
slog.Error("integrity validation failed, attempting fallback retry", "failingTables", failingTables, "error", err)
371371
monitoring.Alert("vulndb integrity check failed, retrying with limited table set", err)
372-
// we need to create a new transaction for the retry, so we rollback the previous one and start a new one
373-
if rbErr := tx.Rollback(ctx); rbErr != nil && rbErr != pgx.ErrTxClosed {
374-
return fmt.Errorf("could not rollback transaction after failed import: %w (rollback error: %v)", err, rbErr)
375-
}
376372

377-
tx, err = conn.Begin(ctx) //nolint:errcheck
378-
if err != nil {
379-
return fmt.Errorf("could not begin transaction for full import retry: %w", err)
373+
slog.Info("retrying with full import for limited tables", "tables", failingTables)
374+
375+
// since we did not commit anything until now, the staging tables still contain some data
376+
// for the import, we just need to make sure to clean them up before re-applying the data from the working directory
377+
if err := clearStagingTables(ctx, tx); err != nil {
378+
return fmt.Errorf("could not clear staging tables for retry: %w", err)
380379
}
381380

382-
slog.Info("retrying with full import for limited tables", "tables", failingTables)
383381
_, err = s.applyFromWorkingDir(ctx, tx, workingDir, time.Time{}, integrity, opts.Bulk, failingTables)
384382
if err != nil {
385383
if rbErr := tx.Rollback(ctx); rbErr != nil && rbErr != pgx.ErrTxClosed {
@@ -872,7 +870,7 @@ func streamToDatabase(ctx context.Context, tx pgx.Tx, vulnRowsIn <-chan vulndbRo
872870
return fmt.Errorf("could not create staging tables: %w", err)
873871
}
874872

875-
var cveCount, relationshipCount, affectedComponentCount, cveAffectedComponentCount, exploitCount, malPkgCount int
873+
var cveCount, relationshipCount, affectedComponentCount, cveAffectedComponentCount, exploitCount, malPkgCount, malAffectedComponentCount int
876874
var cvesTime, relationshipsTime, affectedComponentsTime, cveAffectedComponentsTime, exploitsTime, malPkgTime time.Duration
877875
ticker := time.NewTicker(4 * time.Second)
878876
defer ticker.Stop()
@@ -962,6 +960,7 @@ func streamToDatabase(ctx context.Context, tx pgx.Tx, vulnRowsIn <-chan vulndbRo
962960
}
963961
malPkgTime += time.Since(t)
964962
malPkgCount += len(malPkg.pkgs)
963+
malAffectedComponentCount += len(malPkg.comps)
965964
}
966965
}
967966

@@ -982,6 +981,7 @@ func streamToDatabase(ctx context.Context, tx pgx.Tx, vulnRowsIn <-chan vulndbRo
982981
"cve_affected_component", cveAffectedComponentCount,
983982
"exploits", exploitCount,
984983
"malicious_packages", malPkgCount,
984+
"malicious_affected_components", malAffectedComponentCount,
985985
"took", time.Since(start),
986986
)
987987
return nil

0 commit comments

Comments
 (0)