Skip to content

Commit ce9151d

Browse files
authored
Merge pull request #1944 from l3montree-dev/streaming-vulndb
adds streaming transformers, streaming gob files straight to database
2 parents 6e4c348 + f5b95ed commit ce9151d

10 files changed

Lines changed: 652 additions & 352 deletions

.github/workflows/vulndb.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
FRONTEND_URL: "doesntmatter"
2323
services:
2424
postgres:
25-
image: ghcr.io/l3montree-dev/devguard-postgresql:v0.5.3@sha256:a06c9e7c8ee334790cc66d52e89ff5ef05352ab264841d3d9f3659c046732251
25+
image: ghcr.io/l3montree-dev/devguard/postgresql:v1.3.1
2626
env:
2727
POSTGRES_DB: ${{env.POSTGRES_DB}}
2828
POSTGRES_USER: ${{env.POSTGRES_USER}}

tests/db_init.go

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ import (
44
"context"
55
"log"
66
"log/slog"
7+
"os"
8+
"path/filepath"
79
"time"
810

911
"github.com/jackc/pgx/v5/pgxpool"
1012
"github.com/l3montree-dev/devguard/database"
1113
"github.com/l3montree-dev/devguard/shared"
14+
"github.com/moby/moby/api/types/container"
1215
"github.com/testcontainers/testcontainers-go"
1316
"github.com/testcontainers/testcontainers-go/modules/postgres"
1417
)
@@ -34,13 +37,40 @@ func InitRawDatabaseContainer(initDBSQLPath string) (*pgxpool.Pool, func()) {
3437
dbUser := "user"
3538
dbPassword := "password"
3639

40+
// The image has a read-only Nix filesystem so docker cp (used by WithInitScripts)
41+
// cannot write into the container. Instead we bind-mount the init SQL file,
42+
// mirroring how docker-compose.yaml mounts ./initdb.sql.
43+
absInitSQL, err := filepath.Abs(initDBSQLPath)
44+
if err != nil {
45+
panic("could not resolve initdb SQL path: " + err.Error())
46+
}
47+
if _, err := os.Stat(absInitSQL); err != nil {
48+
panic("initdb SQL file not found: " + absInitSQL)
49+
}
50+
3751
postgresC, err := postgres.Run(ctx,
38-
"ghcr.io/l3montree-dev/devguard-postgresql:v0.4.16",
52+
"ghcr.io/l3montree-dev/devguard/postgresql:v1.3.1",
3953
postgres.WithDatabase(dbName),
4054
postgres.WithUsername(dbUser),
4155
postgres.WithPassword(dbPassword),
42-
postgres.WithInitScripts(initDBSQLPath),
4356
postgres.BasicWaitStrategies(),
57+
testcontainers.WithLogger(log.Default()),
58+
// The postgres module overrides CMD to "postgres -c fsync=off", which drops the
59+
// image's config_file arg and makes postgres listen only on 127.0.0.1. We restore
60+
// the config_file so listen_addresses='*' takes effect for port mapping.
61+
testcontainers.WithCmd("postgres",
62+
"-c", "config_file=/etc/postgresql/postgresql.conf",
63+
"-c", "fsync=off",
64+
),
65+
testcontainers.WithTmpfs(map[string]string{
66+
"/run/postgresql": "rw",
67+
}),
68+
testcontainers.WithHostConfigModifier(func(hc *container.HostConfig) {
69+
hc.ShmSize = 1 << 30 // 1 GiB — matches shm_size in docker-compose.yaml
70+
// Bind-mount the init SQL; WithInitScripts uses docker cp which fails on the
71+
// read-only Nix filesystem of this image.
72+
hc.Binds = append(hc.Binds, absInitSQL+":/docker-entrypoint-initdb.d/init.sql:ro")
73+
}),
4474
)
4575

4676
terminate := func() {
@@ -49,10 +79,40 @@ func InitRawDatabaseContainer(initDBSQLPath string) (*pgxpool.Pool, func()) {
4979
}
5080
}
5181
if err != nil {
82+
if postgresC != nil {
83+
if logs, lerr := postgresC.Logs(ctx); lerr == nil {
84+
log.Printf("=== container logs ===")
85+
buf := make([]byte, 64*1024)
86+
for {
87+
n, rerr := logs.Read(buf)
88+
if n > 0 {
89+
log.Printf("%s", buf[:n])
90+
}
91+
if rerr != nil {
92+
break
93+
}
94+
}
95+
logs.Close()
96+
}
97+
}
5298
slog.Info("failed to start postgres container", "error", err)
5399
panic(err)
54100
}
55101

102+
if logs, lerr := postgresC.Logs(ctx); lerr == nil {
103+
buf := make([]byte, 64*1024)
104+
for {
105+
n, rerr := logs.Read(buf)
106+
if n > 0 {
107+
log.Printf("=== postgres startup logs ===\n%s", buf[:n])
108+
}
109+
if rerr != nil {
110+
break
111+
}
112+
}
113+
logs.Close()
114+
}
115+
56116
host, _ := postgresC.Host(ctx)
57117
port, _ := postgresC.MappedPort(ctx, "5432")
58118

tests/fx_test_helpers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131

3232
// TestFixture provides a complete test environment with database and FX app
3333
type TestFixture struct {
34-
T *testing.T
34+
T testing.TB
3535
App *TestApp
3636
DB shared.DB
3737
// pool is the underlying pgx connection pool
@@ -40,7 +40,7 @@ type TestFixture struct {
4040
}
4141

4242
// NewTestFixture creates a complete test environment with database container and FX app
43-
func NewTestFixture(t *testing.T, sqlInitFile string, options *TestAppOptions) *TestFixture {
43+
func NewTestFixture(t testing.TB, sqlInitFile string, options *TestAppOptions) *TestFixture {
4444
t.Helper()
4545

4646
// Initialize database container

tests/vulndb_import_bench_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright (C) 2026 l3montree GmbH
2+
//
3+
// This program is free software: you can redistribute it and/or modify
4+
// it under the terms of the GNU Affero General Public License as
5+
// published by the Free Software Foundation, either version 3 of the
6+
// License, or (at your option) any later version.
7+
//
8+
// This program is distributed in the hope that it will be useful,
9+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
// GNU Affero General Public License for more details.
12+
//
13+
// You should have received a copy of the GNU Affero General Public License
14+
// along with this program. If not, see <https://www.gnu.org/licenses/>.
15+
16+
package tests
17+
18+
import (
19+
"context"
20+
"os"
21+
"runtime"
22+
"runtime/pprof"
23+
"sync/atomic"
24+
"testing"
25+
"time"
26+
)
27+
28+
// BenchmarkImportRC measures the full memory and time cost of VulnDBService.ImportRC,
29+
// including gob decoding and all database writes via a real PostgreSQL container.
30+
//
31+
// The benchmark writes a heap profile to mem.prof after each run.
32+
// View it as an interactive HTML flamegraph with:
33+
//
34+
// go tool pprof -http=:8080 mem.prof
35+
//
36+
// Enable the debugImport const in vulndb/vulndb_service.go to cache vulndb.tar.zst
37+
// locally (in the working directory) so the archive is only downloaded once across
38+
// repeated runs.
39+
//
40+
// Run with:
41+
//
42+
// go test -bench=BenchmarkImportRC -benchmem -run=^$ -timeout=30m ./tests/
43+
func BenchmarkImportRC(b *testing.B) {
44+
ctx := context.Background()
45+
46+
fixture := NewTestFixture(b, "../initdb.sql", &TestAppOptions{SuppressLogs: true})
47+
48+
for b.Loop() {
49+
var memBefore, memAfter runtime.MemStats
50+
runtime.GC()
51+
runtime.ReadMemStats(&memBefore)
52+
53+
var peakHeap atomic.Uint64
54+
done := make(chan struct{})
55+
go func() {
56+
var ms runtime.MemStats
57+
for {
58+
select {
59+
case <-done:
60+
return
61+
case <-time.After(100 * time.Millisecond):
62+
runtime.ReadMemStats(&ms)
63+
if ms.HeapInuse > peakHeap.Load() {
64+
peakHeap.Store(ms.HeapInuse)
65+
}
66+
}
67+
}
68+
}()
69+
70+
if err := fixture.App.VulnDBService.ImportRC(ctx); err != nil {
71+
close(done)
72+
b.Fatalf("ImportRC failed: %v", err)
73+
}
74+
close(done)
75+
76+
runtime.GC()
77+
runtime.ReadMemStats(&memAfter)
78+
79+
b.ReportMetric(float64(peakHeap.Load())/1024/1024, "peak_heap_MiB")
80+
b.ReportMetric(float64(memAfter.HeapInuse-memBefore.HeapInuse)/1024/1024, "heap_MiB")
81+
b.ReportMetric(float64(memAfter.TotalAlloc-memBefore.TotalAlloc)/1024/1024, "total_alloc_MiB")
82+
83+
// Write a heap profile after every iteration so the last (or only) run is always captured.
84+
f, err := os.Create("mem.prof")
85+
if err != nil {
86+
b.Fatalf("could not create mem.prof: %v", err)
87+
}
88+
if err := pprof.WriteHeapProfile(f); err != nil {
89+
f.Close()
90+
b.Fatalf("could not write heap profile: %v", err)
91+
}
92+
f.Close()
93+
}
94+
}

vulndb/exploitdb_service.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -105,24 +105,8 @@ func insertExploitsBulk(ctx context.Context, tx pgx.Tx, exploits []models.Exploi
105105
if len(exploits) == 0 {
106106
return nil
107107
}
108-
if _, err := tx.Exec(ctx, `
109-
CREATE TEMP TABLE exploits_stage (
110-
id text,
111-
published date,
112-
updated date,
113-
author text,
114-
type text,
115-
verified boolean,
116-
source_url text,
117-
description text,
118-
cve_id text,
119-
tags text,
120-
forks integer,
121-
watchers integer,
122-
subscribers integer,
123-
stars integer
124-
) ON COMMIT DROP`); err != nil {
125-
return fmt.Errorf("could not create exploits staging table: %w", err)
108+
if _, err := tx.Exec(ctx, `TRUNCATE exploits_stage`); err != nil {
109+
return fmt.Errorf("could not truncate exploits staging table: %w", err)
126110
}
127111

128112
if _, err := tx.CopyFrom(ctx, pgx.Identifier{"exploits_stage"},

vulndb/gob_types.go

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type GobMaliciousComponent struct {
5252
// GobMaliciousPackagesExport bundles the full malicious-packages snapshot.
5353
// models.MaliciousPackage only contains plain types and is gob-safe directly.
5454
type GobMaliciousPackagesExport struct {
55-
Packages []models.MaliciousPackage
55+
Package models.MaliciousPackage
5656
Components []GobMaliciousComponent
5757
}
5858

@@ -120,17 +120,6 @@ func gobExploitToModel(g GobExploit) models.Exploit {
120120
}
121121
}
122122

123-
func gobExploitsToModels(gs []GobExploit, lastImportTime time.Time) []models.Exploit {
124-
out := make([]models.Exploit, 0, len(gs))
125-
for _, g := range gs {
126-
if g.Updated != nil && g.Updated.Before(lastImportTime) {
127-
continue
128-
}
129-
out = append(out, gobExploitToModel(g))
130-
}
131-
return out
132-
}
133-
134123
// --- Malicious package conversions ---
135124

136125
func maliciousComponentToGob(c models.MaliciousAffectedComponent) GobMaliciousComponent {
@@ -160,31 +149,3 @@ func gobComponentToModel(g GobMaliciousComponent) models.MaliciousAffectedCompon
160149
VersionFixed: g.VersionFixed,
161150
}
162151
}
163-
164-
func malPackagesExportToGob(packages []models.MaliciousPackage, components []models.MaliciousAffectedComponent) GobMaliciousPackagesExport {
165-
gobComps := make([]GobMaliciousComponent, len(components))
166-
for i, c := range components {
167-
gobComps[i] = maliciousComponentToGob(c)
168-
}
169-
return GobMaliciousPackagesExport{Packages: packages, Components: gobComps}
170-
}
171-
172-
func gobMalPackagesExportToModels(g GobMaliciousPackagesExport, lastImportTime time.Time) ([]models.MaliciousPackage, []models.MaliciousAffectedComponent) {
173-
// build a map of package ID to last import time for all packages in the export
174-
pkgImportTimes := make(map[string]struct{})
175-
filteredPkgs := make([]models.MaliciousPackage, 0, len(g.Packages))
176-
for _, pkg := range g.Packages {
177-
if pkg.Modified.After(lastImportTime) {
178-
pkgImportTimes[pkg.ID] = struct{}{}
179-
filteredPkgs = append(filteredPkgs, pkg)
180-
}
181-
}
182-
comps := make([]models.MaliciousAffectedComponent, 0, len(g.Components))
183-
for _, c := range g.Components {
184-
if _, ok := pkgImportTimes[c.MaliciousPackageID]; !ok {
185-
continue
186-
}
187-
comps = append(comps, gobComponentToModel(c))
188-
}
189-
return filteredPkgs, comps
190-
}

vulndb/malicious_packages_checker.go

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ type MaliciousPackageChecker struct {
5050
httpClient *http.Client
5151
}
5252

53+
type malRow struct {
54+
pkgs []models.MaliciousPackage
55+
comps []models.MaliciousAffectedComponent
56+
}
57+
58+
5359
func NewMaliciousPackageChecker(
5460
repository *repositories.MaliciousPackageRepository,
5561
) (*MaliciousPackageChecker, error) {
@@ -286,15 +292,8 @@ func (c *MaliciousPackageChecker) IsMalicious(ctx context.Context, ecosystem, pa
286292

287293
func insertMaliciousPackagesBulk(ctx context.Context, tx pgx.Tx, pkgs []models.MaliciousPackage, comps []models.MaliciousAffectedComponent) error {
288294
if len(pkgs) > 0 {
289-
if _, err := tx.Exec(ctx, `
290-
CREATE TEMP TABLE mal_pkgs_stage (
291-
id text,
292-
summary text,
293-
details text,
294-
published timestamptz,
295-
modified timestamptz
296-
) ON COMMIT DROP`); err != nil {
297-
return fmt.Errorf("could not create malicious packages staging table: %w", err)
295+
if _, err := tx.Exec(ctx, `TRUNCATE mal_pkgs_stage`); err != nil {
296+
return fmt.Errorf("could not truncate malicious packages staging table: %w", err)
298297
}
299298
if _, err := tx.CopyFrom(ctx, pgx.Identifier{"mal_pkgs_stage"},
300299
[]string{"id", "summary", "details", "published", "modified"},
@@ -317,19 +316,8 @@ func insertMaliciousPackagesBulk(ctx context.Context, tx pgx.Tx, pkgs []models.M
317316
}
318317

319318
if len(comps) > 0 {
320-
if _, err := tx.Exec(ctx, `
321-
CREATE TEMP TABLE mal_comps_stage (
322-
id text,
323-
malicious_package_id text,
324-
purl text,
325-
ecosystem text,
326-
version text,
327-
semver_introduced text,
328-
semver_fixed text,
329-
version_introduced text,
330-
version_fixed text
331-
) ON COMMIT DROP`); err != nil {
332-
return fmt.Errorf("could not create malicious components staging table: %w", err)
319+
if _, err := tx.Exec(ctx, `TRUNCATE mal_comps_stage`); err != nil {
320+
return fmt.Errorf("could not truncate malicious components staging table: %w", err)
333321
}
334322
if _, err := tx.CopyFrom(ctx, pgx.Identifier{"mal_comps_stage"},
335323
[]string{"id", "malicious_package_id", "purl", "ecosystem", "version", "semver_introduced", "semver_fixed", "version_introduced", "version_fixed"},

0 commit comments

Comments
 (0)