Skip to content

Commit 4f194a5

Browse files
committed
buncha stuff
1 parent b0e215a commit 4f194a5

17 files changed

Lines changed: 466 additions & 493 deletions

File tree

deployment/clouddeploy/gke-workers/base/importer.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ spec:
2525
env:
2626
- name: GITTER_HOST
2727
value: http://gitter-service:8888
28+
- name: IMPORT_TRACE_SAMPLE_RATE # for the overall importer trace
29+
value: "1.0"
30+
- name: TRACE_SAMPLE_RATE # for the individual vulnerability entries
31+
value: "0.05"
2832
securityContext:
2933
privileged: true
3034
resources:

go/cmd/importer/main.go

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os"
99
"os/signal"
1010
"path/filepath"
11+
"strconv"
1112
"syscall"
1213
"time"
1314

@@ -19,10 +20,18 @@ import (
1920
"github.com/google/osv.dev/go/logger"
2021
"github.com/google/osv.dev/go/osv/clients"
2122
"github.com/hashicorp/go-retryablehttp"
23+
"go.opentelemetry.io/otel"
24+
"go.opentelemetry.io/otel/attribute"
25+
"go.opentelemetry.io/otel/trace"
26+
"google.golang.org/api/option"
2227
)
2328

2429
func main() {
2530
logger.InitGlobalLogger()
31+
defer logger.Close()
32+
ctx, span := otel.Tracer("importer").Start(context.Background(), "importer",
33+
trace.WithAttributes(attribute.Float64("override_sample_rate", importerSampleRate())))
34+
defer span.End()
2635

2736
strictValidation := flag.Bool("strict-validation", false, "Fail to import entries that do not pass validation. "+
2837
"Note: this only applies to SourceRepositories with strict_validation=true")
@@ -35,14 +44,15 @@ func main() {
3544

3645
project := os.Getenv("GOOGLE_CLOUD_PROJECT")
3746
if project == "" {
38-
logger.Fatal("GOOGLE_CLOUD_PROJECT environment variable is not set")
47+
logger.FatalContext(ctx, "GOOGLE_CLOUD_PROJECT environment variable is not set")
3948
}
4049

4150
config := importer.Config{
4251
StrictValidation: *strictValidation,
4352
DeleteThreshold: *deleteThresholdPct,
4453
NumWorkers: *numWorkers,
4554
GitWorkDir: filepath.Join(*workDir, "sources"),
55+
SampleRate: vulnerabilitySampleRate(),
4656
}
4757

4858
httpClient := retryablehttp.NewClient()
@@ -52,36 +62,61 @@ func main() {
5262
httpClient.Logger = importer.RetryableHTTPLeveledLogger{}
5363
config.HTTPClient = httpClient.StandardClient()
5464

55-
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
65+
ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
5666
defer stop()
5767

5868
datastoreClient, err := datastore.NewClient(ctx, project)
5969
if err != nil {
60-
logger.Fatal("Failed to create datastore client", slog.Any("error", err))
70+
logger.FatalContext(ctx, "Failed to create datastore client", slog.Any("error", err))
6171
}
6272
config.SourceRepoStore = db.NewSourceRepositoryStore(datastoreClient)
6373
// Needed for deletions only
6474
config.VulnerabilityStore = db.NewVulnerabilityStore(datastoreClient)
6575

6676
psClient, err := pubsub.NewClient(ctx, project)
6777
if err != nil {
68-
logger.Fatal("Failed to create pubsub client", slog.Any("error", err))
78+
logger.FatalContext(ctx, "Failed to create pubsub client", slog.Any("error", err))
6979
}
7080
config.Publisher = &clients.GCPPublisher{Publisher: psClient.Publisher(importer.TasksTopic)}
7181

72-
storageClient, err := storage.NewClient(ctx)
82+
// We are posssibly reading a lot of vulnerabilities from GCS, so disable telemetry (disables trace spans).
83+
storageClient, err := storage.NewClient(ctx, option.WithTelemetryDisabled())
7384
if err != nil {
74-
logger.Fatal("Failed to create GCS client", slog.Any("error", err))
85+
logger.FatalContext(ctx, "Failed to create GCS client", slog.Any("error", err))
7586
}
7687
config.GCSProvider = clients.NewGCSStorageProvider(storageClient)
7788

7889
if *runDelete {
7990
if err := importer.RunDeletions(ctx, config); err != nil {
80-
logger.Fatal("Importer-deleter failed", slog.Any("error", err))
91+
logger.FatalContext(ctx, "Importer-deleter failed", slog.Any("error", err))
8192
}
8293
} else {
8394
if err := importer.Run(ctx, config); err != nil {
84-
logger.Fatal("Importer failed", slog.Any("error", err))
95+
logger.FatalContext(ctx, "Importer failed", slog.Any("error", err))
8596
}
8697
}
8798
}
99+
100+
// importerSampleRate returns the sample rate for the importer (not the individual vulnerability entries).
101+
// It is set to 0.05 (5%) by default, but can be overridden by the
102+
// IMPORT_TRACE_SAMPLE_RATE environment variable.
103+
func importerSampleRate() float64 {
104+
rate := 0.05
105+
if val := os.Getenv("IMPORT_TRACE_SAMPLE_RATE"); val != "" {
106+
rate, _ = strconv.ParseFloat(val, 64)
107+
}
108+
109+
return rate
110+
}
111+
112+
// vulnerabilitySampleRate returns the sample rate for individual vulnerability entries.
113+
// It is set to 0.05 (5%) by default, but can be overridden by the
114+
// TRACE_SAMPLE_RATE environment variable.
115+
func vulnerabilitySampleRate() float64 {
116+
rate := 0.05
117+
if val := os.Getenv("TRACE_SAMPLE_RATE"); val != "" {
118+
rate, _ = strconv.ParseFloat(val, 64)
119+
}
120+
121+
return rate
122+
}

go/internal/importer/bucket.go

Lines changed: 35 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,13 @@ import (
1616
)
1717

1818
type bucketSourceRecord struct {
19-
bucket clients.CloudStorage
20-
objectPath string
21-
keyPath string
22-
hasUpdateTime bool
23-
lastUpdated time.Time
24-
format RecordFormat
25-
sourceRepository string
26-
strict bool
27-
isDeleted bool
19+
bucket clients.CloudStorage
20+
objectPath string
2821
}
2922

3023
var _ SourceRecord = bucketSourceRecord{}
3124

3225
func (b bucketSourceRecord) Open(ctx context.Context) (io.ReadCloser, error) {
33-
if b.isDeleted {
34-
return nil, errors.New("cannot open a deleted record")
35-
}
3626
data, err := b.bucket.ReadObject(ctx, b.objectPath)
3727
if err != nil {
3828
return nil, err
@@ -41,43 +31,11 @@ func (b bucketSourceRecord) Open(ctx context.Context) (io.ReadCloser, error) {
4131
return io.NopCloser(bytes.NewReader(data)), nil
4232
}
4333

44-
func (b bucketSourceRecord) KeyPath() string {
45-
return b.keyPath
46-
}
47-
48-
func (b bucketSourceRecord) Format() RecordFormat {
49-
return b.format
50-
}
51-
52-
func (b bucketSourceRecord) LastUpdated() (time.Time, bool) {
53-
return b.lastUpdated, b.hasUpdateTime
54-
}
55-
56-
func (b bucketSourceRecord) SourceRepository() string {
57-
return b.sourceRepository
58-
}
59-
60-
func (b bucketSourceRecord) SourcePath() string {
61-
return b.objectPath
62-
}
63-
64-
func (b bucketSourceRecord) ShouldSendModifiedTime() bool {
65-
return b.hasUpdateTime
66-
}
67-
68-
func (b bucketSourceRecord) IsDeleted() bool {
69-
return b.isDeleted
70-
}
71-
72-
func (b bucketSourceRecord) Strictness() bool {
73-
return b.strict
74-
}
75-
76-
func handleImportBucket(ctx context.Context, ch chan<- SourceRecord, config Config, sourceRepo *models.SourceRepository) error {
34+
func handleImportBucket(ctx context.Context, ch chan<- WorkItem, config Config, sourceRepo *models.SourceRepository) error {
7735
if sourceRepo.Type != models.SourceRepositoryTypeBucket || sourceRepo.Bucket == nil {
7836
return errors.New("invalid SourceRepository for bucket import")
7937
}
80-
logger.Info("Importing bucket source repository",
38+
logger.InfoContext(ctx, "Importing bucket source repository",
8139
slog.String("source", sourceRepo.Name), slog.String("bucket", sourceRepo.Bucket.Name))
8240

8341
compiledIgnorePatterns := compileIgnorePatterns(sourceRepo)
@@ -100,7 +58,7 @@ func handleImportBucket(ctx context.Context, ch chan<- SourceRecord, config Conf
10058
return err
10159
}
10260
if hasUpdateTime {
103-
if obj.Attrs.CustomTime.Before(lastUpdated) {
61+
if obj.Attrs.Updated.Before(lastUpdated) {
10462
continue
10563
}
10664
}
@@ -111,37 +69,42 @@ func handleImportBucket(ctx context.Context, ch chan<- SourceRecord, config Conf
11169
if shouldIgnore(base, sourceRepo.IDPrefixes, compiledIgnorePatterns) {
11270
continue
11371
}
114-
ch <- bucketSourceRecord{
115-
bucket: bucket,
116-
objectPath: obj.Name,
117-
lastUpdated: lastUpdated,
118-
hasUpdateTime: hasUpdateTime,
119-
format: format,
120-
keyPath: sourceRepo.KeyPath,
121-
sourceRepository: sourceRepo.Name,
122-
strict: sourceRepo.Strictness,
72+
ch <- WorkItem{
73+
Context: ctx,
74+
SourceRecord: bucketSourceRecord{
75+
bucket: bucket,
76+
objectPath: obj.Name,
77+
},
78+
SourceRepository: sourceRepo.Name,
79+
SourcePath: obj.Name,
80+
LastUpdated: lastUpdated,
81+
HasLastUpdated: hasUpdateTime,
82+
Format: format,
83+
KeyPath: sourceRepo.KeyPath,
84+
Strict: sourceRepo.Strictness,
85+
ShouldSendModifiedTime: hasUpdateTime,
12386
}
12487
}
12588

12689
sourceRepo.Bucket.LastUpdated = &timeOfRun
12790
sourceRepo.Bucket.IgnoreLastImportTime = false
12891
if err := config.SourceRepoStore.Update(ctx, sourceRepo.Name, sourceRepo); err != nil {
129-
logger.Error("Failed to update source repository", slog.Any("error", err), slog.String("source", sourceRepo.Name))
92+
logger.ErrorContext(ctx, "Failed to update source repository", slog.Any("error", err), slog.String("source", sourceRepo.Name))
13093
return err
13194
}
132-
logger.Info("Finished importing bucket source repository",
95+
logger.InfoContext(ctx, "Finished importing bucket source repository",
13396
slog.String("source", sourceRepo.Name),
13497
slog.String("bucket", sourceRepo.Bucket.Name))
13598

13699
return nil
137100
}
138101

139-
func handleDeleteBucket(ctx context.Context, ch chan<- SourceRecord, config Config, sourceRepo *models.SourceRepository) error {
102+
func handleDeleteBucket(ctx context.Context, ch chan<- WorkItem, config Config, sourceRepo *models.SourceRepository) error {
140103
if sourceRepo.Type != models.SourceRepositoryTypeBucket || sourceRepo.Bucket == nil {
141104
return errors.New("invalid SourceRepository for bucket deletion")
142105
}
143106

144-
logger.Info("Processing bucket deletions",
107+
logger.InfoContext(ctx, "Processing bucket deletions",
145108
slog.String("source", sourceRepo.Name), slog.String("bucket", sourceRepo.Bucket.Name))
146109

147110
// Get all objects in the bucket
@@ -167,7 +130,7 @@ func handleDeleteBucket(ctx context.Context, ch chan<- SourceRecord, config Conf
167130
}
168131

169132
if len(vulnsInDatastore) == 0 {
170-
logger.Info("No vulnerabilities found in Datastore for source", slog.String("source", sourceRepo.Name))
133+
logger.InfoContext(ctx, "No vulnerabilities found in Datastore for source", slog.String("source", sourceRepo.Name))
171134
return nil
172135
}
173136

@@ -180,7 +143,7 @@ func handleDeleteBucket(ctx context.Context, ch chan<- SourceRecord, config Conf
180143
}
181144

182145
if len(toDelete) == 0 {
183-
logger.Info("No vulnerabilities to delete", slog.String("source", sourceRepo.Name))
146+
logger.InfoContext(ctx, "No vulnerabilities to delete", slog.String("source", sourceRepo.Name))
184147
return nil
185148
}
186149

@@ -191,7 +154,7 @@ func handleDeleteBucket(ctx context.Context, ch chan<- SourceRecord, config Conf
191154
}
192155
percentage := (float64(len(toDelete)) / float64(len(vulnsInDatastore))) * 100.0
193156
if percentage >= threshold {
194-
logger.Error("Cowardly refusing to delete missing records (threshold exceeded)",
157+
logger.ErrorContext(ctx, "Cowardly refusing to delete missing records (threshold exceeded)",
195158
slog.String("source", sourceRepo.Name),
196159
slog.Int("to_delete", len(toDelete)),
197160
slog.Int("total", len(vulnsInDatastore)),
@@ -203,10 +166,15 @@ func handleDeleteBucket(ctx context.Context, ch chan<- SourceRecord, config Conf
203166

204167
// Trigger deletions
205168
for _, entry := range toDelete {
206-
ch <- bucketSourceRecord{
207-
sourceRepository: entry.Source,
208-
objectPath: entry.Path,
209-
isDeleted: true,
169+
ch <- WorkItem{
170+
Context: ctx,
171+
SourceRecord: bucketSourceRecord{
172+
bucket: bucket,
173+
objectPath: entry.Path,
174+
},
175+
SourceRepository: entry.Source,
176+
SourcePath: entry.Path,
177+
IsDeleted: true,
210178
}
211179
}
212180

go/internal/importer/bucket_test.go

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,12 @@ func TestHandleImportBucket(t *testing.T) {
4848

4949
// Set up mock bucket with a file
5050
mockBucket := testutils.NewMockStorage()
51-
lastUpdated := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
52-
olderTime := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)
53-
newerTime := time.Date(2023, 2, 1, 0, 0, 0, 0, time.UTC)
54-
55-
_ = mockBucket.WriteObject(ctx, "a/b/valid.json", []byte(`{}`), &clients.WriteOptions{
56-
CustomTime: &olderTime, // older than last updated, will be skipped
57-
})
58-
_ = mockBucket.WriteObject(ctx, "a/b/newer.json", []byte(`{}`), &clients.WriteOptions{
59-
CustomTime: &newerTime,
60-
})
61-
_ = mockBucket.WriteObject(ctx, "a/b/ignored.json", []byte(`{}`), &clients.WriteOptions{
62-
CustomTime: &newerTime,
63-
})
51+
_ = mockBucket.WriteObject(ctx, "a/b/valid.json", []byte(`{}`), &clients.WriteOptions{})
52+
time.Sleep(50 * time.Millisecond)
53+
lastUpdated := time.Now()
54+
time.Sleep(50 * time.Millisecond)
55+
_ = mockBucket.WriteObject(ctx, "a/b/newer.json", []byte(`{}`), &clients.WriteOptions{})
56+
_ = mockBucket.WriteObject(ctx, "a/b/ignored.json", []byte(`{}`), &clients.WriteOptions{})
6457

6558
provider := &mockCloudStorageProvider{
6659
buckets: map[string]clients.CloudStorage{
@@ -89,7 +82,7 @@ func TestHandleImportBucket(t *testing.T) {
8982
},
9083
}
9184

92-
ch := make(chan SourceRecord, 10)
85+
ch := make(chan WorkItem, 10)
9386

9487
err := handleImportBucket(ctx, ch, config, sourceRepo)
9588
if err != nil {
@@ -99,7 +92,7 @@ func TestHandleImportBucket(t *testing.T) {
9992
close(ch)
10093
records := make([]bucketSourceRecord, 0, 10)
10194
for r := range ch {
102-
records = append(records, r.(bucketSourceRecord))
95+
records = append(records, r.SourceRecord.(bucketSourceRecord))
10396
}
10497

10598
if len(records) != 1 {

0 commit comments

Comments
 (0)