adds streaming transformers, streaming gob files straight to database#1944
Merged
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces a streaming-based VulnDB import/export path that encodes gob entries item-by-item and streams decoded batches directly into PostgreSQL (using staging tables), aiming to reduce peak memory usage during imports. It also adds local benchmarking support and adjusts test/container/workflow setup for the updated Postgres image.
Changes:
- Switch gob export/import for OSV/exploits/malicious-packages to item-wise gob encoding + batched streaming decode.
- Add streaming-to-database ingestion (
streamToDatabase) using shared temp staging tables and bulk insert helpers. - Add a VulnDB import benchmark and update test Postgres container + GitHub workflow Postgres image.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
vulndb/vulndb_service.go |
Adds streaming gob read/write helpers, debug import path, and streams decoded batches into DB via channels. |
vulndb/transformers.go |
New streaming transformers for gob→model and model→gob conversions. |
vulndb/osv_service.go |
Refactors OSV import to use staging tables and shared helpers; adds getCurrentAffectedComponents + createStagingTables. |
vulndb/malicious_packages_checker.go |
Updates bulk insert to truncate shared staging tables; introduces malRow batch type. |
vulndb/gob_types.go |
Changes malicious-packages gob type shape to per-package records; removes now-unused conversion helpers. |
vulndb/exploitdb_service.go |
Updates exploit bulk insert to use shared staging table (truncate instead of create). |
tests/vulndb_import_bench_test.go |
Adds benchmark for VulnDBService.ImportRC + heap profiling output. |
tests/fx_test_helpers.go |
Generalizes fixture helpers to accept testing.TB (benchmark compatibility). |
tests/db_init.go |
Updates testcontainers Postgres image + init script mounting strategy and startup command. |
.github/workflows/vulndb.yaml |
Updates workflow Postgres image to the new repository/tag. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| // debugImport reuses a previously downloaded archive from the current working directory | ||
| // instead of pulling from the OCI registry. Set to true only for local profiling/benchmarking. | ||
| const debugImport = true |
Comment on lines
+436
to
+446
| currentAffectedComponents, err := getCurrentAffectedComponents(ctx, tx) | ||
| if err != nil { | ||
| return fmt.Errorf("could not get current affected components: %w", err) | ||
| } | ||
|
|
||
| group.Go(func() error { | ||
| t := time.Now() | ||
| if err := readGobFile(workingDir+"/osv.gob", &osvEntries); err != nil { | ||
| if err := readGobFileStream[OSVEntry, vulndbRows](workingDir+"/osv.gob", vulndbChan, gobOSVEntryStreamingTransformer(ctx, currentAffectedComponents)); err != nil { | ||
| return fmt.Errorf("could not read OSV gob: %w", err) | ||
| } | ||
| slog.Info("finished decoding OSV gob", "entries", len(osvEntries), "took", time.Since(t).Round(time.Millisecond)) | ||
| close(vulndbChan) | ||
| slog.Info("decoded osv.gob", "took", time.Since(t)) |
Comment on lines
440
to
+487
| group.Go(func() error { | ||
| t := time.Now() | ||
| if err := readGobFile(workingDir+"/osv.gob", &osvEntries); err != nil { | ||
| if err := readGobFileStream[OSVEntry, vulndbRows](workingDir+"/osv.gob", vulndbChan, gobOSVEntryStreamingTransformer(ctx, currentAffectedComponents)); err != nil { | ||
| return fmt.Errorf("could not read OSV gob: %w", err) | ||
| } | ||
| slog.Info("finished decoding OSV gob", "entries", len(osvEntries), "took", time.Since(t).Round(time.Millisecond)) | ||
| close(vulndbChan) | ||
| slog.Info("decoded osv.gob", "took", time.Since(t)) | ||
| return nil | ||
| }) | ||
| group.Go(func() error { | ||
| t := time.Now() | ||
| if err := readGobFile(workingDir+"/epss.gob", &epssData); err != nil { | ||
| return fmt.Errorf("could not read EPSS gob: %w", err) | ||
| } | ||
| slog.Info("finished decoding EPSS gob", "entries", len(epssData), "took", time.Since(t).Round(time.Millisecond)) | ||
| slog.Info("decoded epss.gob", "entries", len(epssData), "took", time.Since(t)) | ||
| return nil | ||
| }) | ||
| group.Go(func() error { | ||
| t := time.Now() | ||
| if err := readGobFile(workingDir+"/cisakev.gob", &kevEntries); err != nil { | ||
| return fmt.Errorf("could not read CISA KEV gob: %w", err) | ||
| } | ||
| slog.Info("finished decoding CISA KEV gob", "entries", len(kevEntries), "took", time.Since(t).Round(time.Millisecond)) | ||
| slog.Info("decoded cisakev.gob", "entries", len(kevEntries), "took", time.Since(t)) | ||
| return nil | ||
| }) | ||
| group.Go(func() error { | ||
| t := time.Now() | ||
| if err := readGobFile(workingDir+"/exploits.gob", &gobExploit); err != nil { | ||
| if err := readGobFileStream(workingDir+"/exploits.gob", exploitChan, gobExploitStreamingTransformer(lastImportTime)); err != nil { | ||
| return fmt.Errorf("could not read exploits gob: %w", err) | ||
| } | ||
| slog.Info("finished decoding exploits gob", "entries", len(gobExploit), "took", time.Since(t).Round(time.Millisecond)) | ||
| close(exploitChan) | ||
| slog.Info("decoded exploits.gob", "took", time.Since(t)) | ||
| return nil | ||
| }) | ||
| group.Go(func() error { | ||
| t := time.Now() | ||
| if err := readGobFile(workingDir+"/maliciouspackages.gob", &malExport); err != nil { | ||
| if err := readGobFileStream[GobMaliciousPackagesExport, malRow](workingDir+"/maliciouspackages.gob", malPkgChan, gobMalPackagesStreamingTransformer(lastImportTime)); err != nil { | ||
| return fmt.Errorf("could not read malicious packages gob: %w", err) | ||
| } | ||
| slog.Info("finished decoding malicious packages gob", "took", time.Since(t).Round(time.Millisecond)) | ||
| close(malPkgChan) | ||
| slog.Info("decoded maliciouspackages.gob", "took", time.Since(t)) | ||
| return nil | ||
| }) | ||
|
|
||
| group.Go(func() error { | ||
| return streamToDatabase(ctx, tx, vulndbChan, exploitChan, malPkgChan) | ||
| }) | ||
|
|
Comment on lines
+546
to
+573
| const batchSize = 1_000 | ||
|
|
||
| func readGobFileStream[T any, Transformed any](path string, out chan<- Transformed, transformer func([]T) Transformed) error { | ||
| fd, err := os.Open(path) | ||
| if err != nil { | ||
| return fmt.Errorf("could not open gob file %s: %w", path, err) | ||
| } | ||
| defer fd.Close() | ||
| decoder := gob.NewDecoder(fd) | ||
| batch := make([]T, 0, batchSize) | ||
| for { | ||
| var item T | ||
| if err := decoder.Decode(&item); err != nil { | ||
| if err == io.EOF { | ||
| break | ||
| } | ||
| return fmt.Errorf("could not decode gob file %s: %w", path, err) | ||
| } | ||
| batch = append(batch, item) | ||
| if len(batch) == batchSize { | ||
| out <- transformer(batch) | ||
| batch = batch[:0] | ||
| } | ||
| } | ||
| if len(batch) > 0 { | ||
| out <- transformer(batch) | ||
| } | ||
| return nil |
| openChans++ | ||
| } | ||
| for openChans > 0 { | ||
| select { |
| services: | ||
| postgres: | ||
| image: ghcr.io/l3montree-dev/devguard-postgresql:v0.5.3@sha256:a06c9e7c8ee334790cc66d52e89ff5ef05352ab264841d3d9f3659c046732251 | ||
| image: ghcr.io/l3montree-dev/devguard/postgresql:v1.3.1 |
Comment on lines
+43
to
+74
| func BenchmarkImportRC(b *testing.B) { | ||
| ctx := context.Background() | ||
|
|
||
| fixture := NewTestFixture(b, "../initdb.sql", &TestAppOptions{SuppressLogs: true}) | ||
|
|
||
| for b.Loop() { | ||
| var memBefore, memAfter runtime.MemStats | ||
| runtime.GC() | ||
| runtime.ReadMemStats(&memBefore) | ||
|
|
||
| var peakHeap atomic.Uint64 | ||
| done := make(chan struct{}) | ||
| go func() { | ||
| var ms runtime.MemStats | ||
| for { | ||
| select { | ||
| case <-done: | ||
| return | ||
| case <-time.After(100 * time.Millisecond): | ||
| runtime.ReadMemStats(&ms) | ||
| if ms.HeapInuse > peakHeap.Load() { | ||
| peakHeap.Store(ms.HeapInuse) | ||
| } | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| if err := fixture.App.VulnDBService.ImportRC(ctx); err != nil { | ||
| close(done) | ||
| b.Fatalf("ImportRC failed: %v", err) | ||
| } | ||
| close(done) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.