Skip to content

Commit 9639a9c

Browse files
committed
implements partial imports, adds logs to health endpoint
1 parent 1689467 commit 9639a9c

8 files changed

Lines changed: 287 additions & 166 deletions

File tree

cmd/devguard-cli/commands/vulndb_import.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func newImportCommand() *cobra.Command {
1717
var full bool
1818
var batchSize int
1919
var bulk bool
20+
var limitedToTables []string
2021

2122
importCmd := &cobra.Command{
2223
Use: "import",
@@ -26,9 +27,10 @@ func newImportCommand() *cobra.Command {
2627
shared.LoadConfig() // nolint
2728
migrateDB()
2829
opts := shared.ImportOptions{
29-
Full: full,
30-
BatchSize: batchSize,
31-
Bulk: bulk,
30+
Full: full,
31+
BatchSize: batchSize,
32+
Bulk: bulk,
33+
LimitedToTables: limitedToTables,
3234
}
3335
app := fx.New(
3436
fx.NopLogger,
@@ -58,6 +60,7 @@ func newImportCommand() *cobra.Command {
5860
importCmd.Flags().BoolVar(&full, "full", false, "Force a full import, ignoring the last-import watermark")
5961
importCmd.Flags().IntVar(&batchSize, "batchSize", 5000, "Number of OSV entries per batch (default 5000)")
6062
importCmd.Flags().BoolVar(&bulk, "bulk", false, "Load all gob data into RAM before writing (faster but uses ~2-3 GB memory)")
63+
importCmd.Flags().StringSliceVar(&limitedToTables, "limitedToTables", []string{}, "Comma-separated list of tables to limit the import to (e.g. --limitedToTables=cves,exploits,malicious_packages)")
6164

6265
return importCmd
6366
}

router/apiv1_router.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package router
22

33
import (
4+
"context"
45
"encoding/json"
6+
"log/slog"
57
"os"
68
"runtime"
79
"time"
@@ -179,13 +181,18 @@ func NewAPIV1Router(srv api.Server,
179181
// Check database connectivity
180182
sqlDB, err := db.DB()
181183
if err != nil {
184+
slog.Info("failed to get database instance", "error", err)
182185
return ctx.JSON(503, map[string]string{
183186
"status": "unhealthy",
184187
"error": "failed to get database instance",
185188
})
186189
}
187190

188-
if err := sqlDB.Ping(); err != nil {
191+
ctxWithTimeout, cancel := context.WithTimeout(ctx.Request().Context(), 5*time.Second)
192+
defer cancel()
193+
194+
if err := sqlDB.PingContext(ctxWithTimeout); err != nil {
195+
slog.Info("database ping failed", "error", err)
189196
return ctx.JSON(503, map[string]string{
190197
"status": "unhealthy",
191198
"error": "database ping failed",

shared/common_interfaces.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,8 @@ type ImportOptions struct {
633633
BatchSize int
634634
// Bulk loads all gob data into RAM before writing — no channels, single DB flush.
635635
// Faster than streaming but uses significantly more memory (~2-3 GB).
636-
Bulk bool
636+
Bulk bool
637+
LimitedToTables []string
637638
}
638639

639640
type VulnDBService interface {

utils/slice.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,7 @@ func CompareSlices[T any, K comparable](a, b []T, serializer func(T) K) CompareR
119119
}
120120

121121
func Any[T any](s []T, f func(T) bool) bool {
122-
for _, v := range s {
123-
if f(v) {
124-
return true
125-
}
126-
}
127-
return false
122+
return slices.ContainsFunc(s, f)
128123
}
129124

130125
func All[T any](s []T, f func(T) bool) bool {
@@ -157,3 +152,9 @@ func ContainsAll[T comparable](s []T, needed []T) bool {
157152
return Contains(s, n)
158153
})
159154
}
155+
156+
func ContainsAny[T comparable](s []T, needed []T) bool {
157+
return Any(needed, func(n T) bool {
158+
return Contains(s, n)
159+
})
160+
}

vulndb/integrity.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,31 +103,33 @@ type integrityInformation struct {
103103
ImportTimestamp time.Time `json:"import_timestamp"`
104104
}
105105

106-
func validateIntegrityInformation(workingDir string, groundTruth integrityInformation, localIntegrityInformation []tableIntegrityInformation) error {
107-
didErr := false
106+
// returns a string slice with failing tables
107+
// if nil, then all tables are valid
108+
func validateIntegrityInformation(workingDir string, groundTruth integrityInformation, localIntegrityInformation []tableIntegrityInformation) ([]string, bool) {
109+
failingTables := make([]string, 0)
108110
for _, tableIntegrity := range localIntegrityInformation {
109111
found := false
110112
for _, tableGroundTruth := range groundTruth.TableIntegrity {
111113
if tableGroundTruth.TableName == tableIntegrity.TableName {
112114
if !tableIntegrity.isEqual(tableGroundTruth) {
113115
slog.Error("invalid checksum when importing", "table", tableIntegrity.TableName, "expectedCount", tableGroundTruth.TotalCount, "actualCount", tableIntegrity.TotalCount, "expectedChecksum", fmt.Sprintf("%x", tableGroundTruth.Checksum), "actualChecksum", fmt.Sprintf("%x", tableIntegrity.Checksum))
114-
115-
didErr = true
116+
failingTables = append(failingTables, tableIntegrity.TableName)
116117
} else {
117118
found = true
118119
break
119120
}
120121
}
121122
}
122123
if !found {
123-
return fmt.Errorf("could not find integrity information for table %s", tableIntegrity.TableName)
124+
slog.Error("unexpected table found when importing", "table", tableIntegrity.TableName, "count", tableIntegrity.TotalCount, "checksum", fmt.Sprintf("%x", tableIntegrity.Checksum))
125+
failingTables = append(failingTables, tableIntegrity.TableName)
124126
}
125127
}
126-
if didErr {
127-
return fmt.Errorf("integrity validation failed for one or more tables when importing from %s", workingDir)
128+
if len(failingTables) > 0 {
129+
return failingTables, false
128130
}
129131

130-
return nil
132+
return nil, true
131133
}
132134

133135
func calculateTotalIntegrityInformation(ctx context.Context, tx pgx.Tx) ([]tableIntegrityInformation, error) {

vulndb/osv_service.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ func (s osvService) fetchAndImportOSV(ctx context.Context, tx pgx.Tx, importStar
172172
if err := createStagingTables(ctx, tx); err != nil {
173173
return nil, nil, fmt.Errorf("could not create staging tables: %w", err)
174174
}
175-
vulnRows, malRows := gobOSVToVulnAndMalFilterTransformer(ctx, time.Time{}, nil)(allOSVVulns)
175+
malRows := gobOSVToMalFilterTransformer(time.Time{})(allOSVVulns)
176+
vulnRows := gobOSVToVulnFilterTransformer(time.Time{}, nil)(allOSVVulns)
176177
fakeRows, fakeComps := buildFakePackages()
177178
malRows.pkgs = append(malRows.pkgs, fakeRows...)
178179
malRows.comps = append(malRows.comps, fakeComps...)

vulndb/transformers.go

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,39 @@ func osvEntryToMaliciousPackageTransformer(entry *dtos.OSV) (models.MaliciousPac
5555
return pkg, components
5656
}
5757

58-
func gobOSVToVulnAndMalFilterTransformer(ctx context.Context, lastImportTime time.Time, existing map[int64][]int64) func([]OSVEntry) (vulndbRows, malRows) {
58+
func gobOSVToMalFilterTransformer(lastImportTime time.Time) func([]OSVEntry) malRows {
59+
return func(elements []OSVEntry) malRows {
60+
malPkgs := make([]models.MaliciousPackage, 0)
61+
malComps := make([]models.MaliciousAffectedComponent, 0)
62+
63+
for i := range elements {
64+
if !lastImportTime.IsZero() && !elements[i].ModifiedTimestamp.After(lastImportTime) {
65+
continue
66+
}
67+
68+
// check if malicious package or vulnerability
69+
if strings.HasPrefix(elements[i].OSV.ID, "MAL-") {
70+
pkg, comps := osvEntryToMaliciousPackageTransformer(elements[i].OSV)
71+
malPkgs = append(malPkgs, pkg)
72+
malComps = append(malComps, comps...)
73+
continue
74+
}
75+
}
76+
return malRows{
77+
pkgs: malPkgs,
78+
comps: malComps,
79+
}
80+
}
81+
}
82+
func gobOSVToVulnFilterTransformer(lastImportTime time.Time, existing map[int64][]int64) func([]OSVEntry) vulndbRows {
5983
if existing == nil {
6084
existing = make(map[int64][]int64)
6185
}
62-
return func(elements []OSVEntry) (vulndbRows, malRows) {
86+
return func(elements []OSVEntry) vulndbRows {
6387
cves := make([]models.CVE, 0, len(elements))
6488
cveRelationships := make([]models.CVERelationship, 0, len(elements)*2)
6589
affectedComponents := make([]models.AffectedComponent, 0, len(elements)*12)
6690
cveAffectedComponents := make([]cveAffectedComponentRow, 0, len(elements)*55)
67-
malPkgs := make([]models.MaliciousPackage, 0)
68-
malComps := make([]models.MaliciousAffectedComponent, 0)
6991

7092
for i := range elements {
7193
if !lastImportTime.IsZero() && !elements[i].ModifiedTimestamp.After(lastImportTime) {
@@ -74,9 +96,6 @@ func gobOSVToVulnAndMalFilterTransformer(ctx context.Context, lastImportTime tim
7496

7597
// check if malicious package or vulnerability
7698
if strings.HasPrefix(elements[i].OSV.ID, "MAL-") {
77-
pkg, comps := osvEntryToMaliciousPackageTransformer(elements[i].OSV)
78-
malPkgs = append(malPkgs, pkg)
79-
malComps = append(malComps, comps...)
8099
continue
81100
}
82101
relationships := transformer.OSVToCVERelationships(elements[i].OSV)
@@ -107,28 +126,33 @@ func gobOSVToVulnAndMalFilterTransformer(ctx context.Context, lastImportTime tim
107126
}
108127
}
109128
return vulndbRows{
110-
CVEs: cves,
111-
CVERelationships: cveRelationships,
112-
AffectedComponents: affectedComponents,
113-
CVEAffectedComponents: cveAffectedComponents,
114-
}, malRows{
115-
pkgs: malPkgs,
116-
comps: malComps,
117-
}
129+
CVEs: cves,
130+
CVERelationships: cveRelationships,
131+
AffectedComponents: affectedComponents,
132+
CVEAffectedComponents: cveAffectedComponents,
133+
}
118134
}
119135
}
120136

121-
func gobOSVEntryStreamer(ctx context.Context, lastImportTime time.Time, existing map[int64][]int64, vulndbChan chan<- vulndbRows, malPkgsChan chan<- malRows) func([]OSVEntry) error {
122-
transform := gobOSVToVulnAndMalFilterTransformer(ctx, lastImportTime, existing)
137+
func gobOSVStreamer(ctx context.Context, lastImportTime time.Time, existing map[int64][]int64, vulndbChan chan<- vulndbRows) func([]OSVEntry) error {
138+
transform := gobOSVToVulnFilterTransformer(lastImportTime, existing)
123139
return func(elements []OSVEntry) error {
124-
vulndbRows, malRows := transform(elements)
140+
vulndbRows := transform(elements)
125141
select {
126-
case malPkgsChan <- malRows:
142+
case vulndbChan <- vulndbRows:
127143
case <-ctx.Done():
128144
return ctx.Err()
129145
}
146+
return nil
147+
}
148+
}
149+
150+
func gobOSVMalPkgStreamer(ctx context.Context, lastImportTime time.Time, malPkgsChan chan<- malRows) func([]OSVEntry) error {
151+
transform := gobOSVToMalFilterTransformer(lastImportTime)
152+
return func(elements []OSVEntry) error {
153+
malRows := transform(elements)
130154
select {
131-
case vulndbChan <- vulndbRows:
155+
case malPkgsChan <- malRows:
132156
case <-ctx.Done():
133157
return ctx.Err()
134158
}

0 commit comments

Comments
 (0)