Skip to content

adds streaming transformers, streaming gob files straight to database#1944

Merged
timbastin merged 1 commit into
mainfrom
streaming-vulndb
May 7, 2026
Merged

adds streaming transformers, streaming gob files straight to database#1944
timbastin merged 1 commit into
mainfrom
streaming-vulndb

Conversation

@timbastin
Copy link
Copy Markdown
Member

No description provided.

Copilot AI review requested due to automatic review settings May 7, 2026 18:06
@timbastin timbastin merged commit ce9151d into main May 7, 2026
8 of 11 checks passed
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a streaming-based VulnDB import/export path that encodes gob entries item-by-item and streams decoded batches directly into PostgreSQL (using staging tables), aiming to reduce peak memory usage during imports. It also adds local benchmarking support and adjusts test/container/workflow setup for the updated Postgres image.

Changes:

  • Switch gob export/import for OSV/exploits/malicious-packages to item-wise gob encoding + batched streaming decode.
  • Add streaming-to-database ingestion (streamToDatabase) using shared temp staging tables and bulk insert helpers.
  • Add a VulnDB import benchmark and update test Postgres container + GitHub workflow Postgres image.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
vulndb/vulndb_service.go Adds streaming gob read/write helpers, debug import path, and streams decoded batches into DB via channels.
vulndb/transformers.go New streaming transformers for gob→model and model→gob conversions.
vulndb/osv_service.go Refactors OSV import to use staging tables and shared helpers; adds getCurrentAffectedComponents + createStagingTables.
vulndb/malicious_packages_checker.go Updates bulk insert to truncate shared staging tables; introduces malRow batch type.
vulndb/gob_types.go Changes malicious-packages gob type shape to per-package records; removes now-unused conversion helpers.
vulndb/exploitdb_service.go Updates exploit bulk insert to use shared staging table (truncate instead of create).
tests/vulndb_import_bench_test.go Adds benchmark for VulnDBService.ImportRC + heap profiling output.
tests/fx_test_helpers.go Generalizes fixture helpers to accept testing.TB (benchmark compatibility).
tests/db_init.go Updates testcontainers Postgres image + init script mounting strategy and startup command.
.github/workflows/vulndb.yaml Updates workflow Postgres image to the new repository/tag.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread vulndb/vulndb_service.go

// debugImport reuses a previously downloaded archive from the current working directory
// instead of pulling from the OCI registry. Set to true only for local profiling/benchmarking.
const debugImport = true
Comment thread vulndb/vulndb_service.go
Comment on lines +436 to +446
currentAffectedComponents, err := getCurrentAffectedComponents(ctx, tx)
if err != nil {
return fmt.Errorf("could not get current affected components: %w", err)
}

group.Go(func() error {
t := time.Now()
if err := readGobFile(workingDir+"/osv.gob", &osvEntries); err != nil {
if err := readGobFileStream[OSVEntry, vulndbRows](workingDir+"/osv.gob", vulndbChan, gobOSVEntryStreamingTransformer(ctx, currentAffectedComponents)); err != nil {
return fmt.Errorf("could not read OSV gob: %w", err)
}
slog.Info("finished decoding OSV gob", "entries", len(osvEntries), "took", time.Since(t).Round(time.Millisecond))
close(vulndbChan)
slog.Info("decoded osv.gob", "took", time.Since(t))
Comment thread vulndb/vulndb_service.go
Comment on lines 440 to +487
group.Go(func() error {
t := time.Now()
if err := readGobFile(workingDir+"/osv.gob", &osvEntries); err != nil {
if err := readGobFileStream[OSVEntry, vulndbRows](workingDir+"/osv.gob", vulndbChan, gobOSVEntryStreamingTransformer(ctx, currentAffectedComponents)); err != nil {
return fmt.Errorf("could not read OSV gob: %w", err)
}
slog.Info("finished decoding OSV gob", "entries", len(osvEntries), "took", time.Since(t).Round(time.Millisecond))
close(vulndbChan)
slog.Info("decoded osv.gob", "took", time.Since(t))
return nil
})
group.Go(func() error {
t := time.Now()
if err := readGobFile(workingDir+"/epss.gob", &epssData); err != nil {
return fmt.Errorf("could not read EPSS gob: %w", err)
}
slog.Info("finished decoding EPSS gob", "entries", len(epssData), "took", time.Since(t).Round(time.Millisecond))
slog.Info("decoded epss.gob", "entries", len(epssData), "took", time.Since(t))
return nil
})
group.Go(func() error {
t := time.Now()
if err := readGobFile(workingDir+"/cisakev.gob", &kevEntries); err != nil {
return fmt.Errorf("could not read CISA KEV gob: %w", err)
}
slog.Info("finished decoding CISA KEV gob", "entries", len(kevEntries), "took", time.Since(t).Round(time.Millisecond))
slog.Info("decoded cisakev.gob", "entries", len(kevEntries), "took", time.Since(t))
return nil
})
group.Go(func() error {
t := time.Now()
if err := readGobFile(workingDir+"/exploits.gob", &gobExploit); err != nil {
if err := readGobFileStream(workingDir+"/exploits.gob", exploitChan, gobExploitStreamingTransformer(lastImportTime)); err != nil {
return fmt.Errorf("could not read exploits gob: %w", err)
}
slog.Info("finished decoding exploits gob", "entries", len(gobExploit), "took", time.Since(t).Round(time.Millisecond))
close(exploitChan)
slog.Info("decoded exploits.gob", "took", time.Since(t))
return nil
})
group.Go(func() error {
t := time.Now()
if err := readGobFile(workingDir+"/maliciouspackages.gob", &malExport); err != nil {
if err := readGobFileStream[GobMaliciousPackagesExport, malRow](workingDir+"/maliciouspackages.gob", malPkgChan, gobMalPackagesStreamingTransformer(lastImportTime)); err != nil {
return fmt.Errorf("could not read malicious packages gob: %w", err)
}
slog.Info("finished decoding malicious packages gob", "took", time.Since(t).Round(time.Millisecond))
close(malPkgChan)
slog.Info("decoded maliciouspackages.gob", "took", time.Since(t))
return nil
})

group.Go(func() error {
return streamToDatabase(ctx, tx, vulndbChan, exploitChan, malPkgChan)
})

Comment thread vulndb/vulndb_service.go
Comment on lines +546 to +573
const batchSize = 1_000

func readGobFileStream[T any, Transformed any](path string, out chan<- Transformed, transformer func([]T) Transformed) error {
fd, err := os.Open(path)
if err != nil {
return fmt.Errorf("could not open gob file %s: %w", path, err)
}
defer fd.Close()
decoder := gob.NewDecoder(fd)
batch := make([]T, 0, batchSize)
for {
var item T
if err := decoder.Decode(&item); err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("could not decode gob file %s: %w", path, err)
}
batch = append(batch, item)
if len(batch) == batchSize {
out <- transformer(batch)
batch = batch[:0]
}
}
if len(batch) > 0 {
out <- transformer(batch)
}
return nil
Comment thread vulndb/vulndb_service.go
openChans++
}
for openChans > 0 {
select {
services:
postgres:
image: ghcr.io/l3montree-dev/devguard-postgresql:v0.5.3@sha256:a06c9e7c8ee334790cc66d52e89ff5ef05352ab264841d3d9f3659c046732251
image: ghcr.io/l3montree-dev/devguard/postgresql:v1.3.1
Comment on lines +43 to +74
func BenchmarkImportRC(b *testing.B) {
ctx := context.Background()

fixture := NewTestFixture(b, "../initdb.sql", &TestAppOptions{SuppressLogs: true})

for b.Loop() {
var memBefore, memAfter runtime.MemStats
runtime.GC()
runtime.ReadMemStats(&memBefore)

var peakHeap atomic.Uint64
done := make(chan struct{})
go func() {
var ms runtime.MemStats
for {
select {
case <-done:
return
case <-time.After(100 * time.Millisecond):
runtime.ReadMemStats(&ms)
if ms.HeapInuse > peakHeap.Load() {
peakHeap.Store(ms.HeapInuse)
}
}
}
}()

if err := fixture.App.VulnDBService.ImportRC(ctx); err != nil {
close(done)
b.Fatalf("ImportRC failed: %v", err)
}
close(done)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants