Skip to content

Commit fe55e02

Browse files
authored
feat: instrument error reporting and tracing for go jobs (#4819)
1. Makes error level (and the hypothetical levels above) logs create a new error in the error reporting dashboard, which should hopefully link to the logs 2. adds traces to all our go jobs, which should maybe let us use the trace explorer to see how long things take in a fancy waterfall graph (related to #3547) - The exporter will create one big span for the exporter, and sub-spans for each ecosystem - The relations will create one big span for the relations, and a sub-span for alias/upstream/related - gitter traces each http request that is made to it. We can add propagate the same trace from the worker/importer to correlate requests, which should be fun. This doesn't affect vulnfeeds (yet), since it uses a duplicate logging thing. To instrument tracing, you have to propagate the context correctly, and the new `logger.[LEVEL]Context()` commands.
1 parent cc26493 commit fe55e02

15 files changed

Lines changed: 451 additions & 212 deletions

File tree

deployment/clouddeploy/gke-workers/base/exporter.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,7 @@ spec:
3131
limits:
3232
cpu: "20"
3333
memory: "64Gi"
34+
env:
35+
- name: TRACE_SAMPLE_RATE
36+
value: "1.0"
3437
restartPolicy: Never

deployment/clouddeploy/gke-workers/base/generate-sitemap.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,7 @@ spec:
2222
limits:
2323
cpu: "1"
2424
memory: "6G"
25+
env:
26+
- name: TRACE_SAMPLE_RATE
27+
value: "1.0"
2528
restartPolicy: OnFailure

deployment/clouddeploy/gke-workers/base/record-checker.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,7 @@ spec:
2222
limits:
2323
cpu: "1"
2424
memory: "2G"
25+
env:
26+
- name: TRACE_SAMPLE_RATE
27+
value: "1.0"
2528
restartPolicy: Never

deployment/clouddeploy/gke-workers/base/relations.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,7 @@ spec:
2222
limits:
2323
cpu: "1"
2424
memory: "13G"
25+
env:
26+
- name: TRACE_SAMPLE_RATE
27+
value: "1.0"
2528
restartPolicy: Never

go/cmd/custommetrics/main.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
monitoring "cloud.google.com/go/monitoring/apiv3/v2"
1313
"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
1414
"github.com/google/osv.dev/go/logger"
15+
"go.opentelemetry.io/otel"
1516
"google.golang.org/api/iterator"
1617
"google.golang.org/genproto/googleapis/api/metric"
1718
"google.golang.org/genproto/googleapis/api/monitoredres"
@@ -24,14 +25,19 @@ const (
2425
)
2526

2627
func main() {
28+
logger.InitGlobalLogger()
29+
defer logger.Close()
30+
31+
ctx, span := otel.Tracer("custommetrics").Start(context.Background(), "custommetrics")
32+
defer span.End()
33+
2734
project := os.Getenv("GOOGLE_CLOUD_PROJECT")
2835
if project == "" {
29-
logger.Fatal("GOOGLE_CLOUD_PROJECT must be set")
36+
logger.FatalContext(ctx, "GOOGLE_CLOUD_PROJECT must be set")
3037
}
31-
ctx := context.Background()
3238
cl, err := monitoring.NewMetricClient(ctx)
3339
if err != nil {
34-
logger.Fatal("failed to create monitoring client", slog.Any("err", err))
40+
logger.FatalContext(ctx, "failed to create monitoring client", slog.Any("err", err))
3541
}
3642
defer cl.Close()
3743

@@ -40,10 +46,10 @@ func main() {
4046
for _, cron := range crons {
4147
timeSince, err := getCronFreshness(ctx, cl, project, cron)
4248
if err != nil {
43-
logger.Fatal("error getting freshness", slog.String("cronjob", cron), slog.Any("err", err))
49+
logger.FatalContext(ctx, "error getting freshness", slog.String("cronjob", cron), slog.Any("err", err))
4450
}
4551
if err := writeCronFreshness(ctx, cl, project, cron, timeSince); err != nil {
46-
logger.Fatal("error writing freshness", slog.String("cronjob", cron), slog.Any("err", err))
52+
logger.FatalContext(ctx, "error writing freshness", slog.String("cronjob", cron), slog.Any("err", err))
4753
}
4854
}
4955
}
@@ -59,14 +65,14 @@ func getCronFreshness(ctx context.Context, cl *monitoring.MetricClient, project
5965
it := cl.ListTimeSeries(ctx, req)
6066
ts, err := it.Next()
6167
if errors.Is(err, iterator.Done) {
62-
logger.Warn("last_successful_time was not found for past day", slog.String("cronjob", cronjob))
68+
logger.WarnContext(ctx, "last_successful_time was not found for past day", slog.String("cronjob", cronjob))
6369
return int64(lookbackDuration / time.Second), nil
6470
} else if err != nil {
6571
return 0, err
6672
}
6773
points := ts.GetPoints()
6874
if len(points) == 0 { // I'm pretty sure iterator.Done would be returned instead.
69-
logger.Warn("time series has no points", slog.String("cronjob", cronjob))
75+
logger.WarnContext(ctx, "time series has no points", slog.String("cronjob", cronjob))
7076
return int64(lookbackDuration / time.Second), nil
7177
}
7278
val := points[0].GetValue().GetDoubleValue()

go/cmd/exporter/exporter.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/google/osv.dev/go/logger"
1616
"github.com/google/osv.dev/go/osv/clients"
1717
"github.com/ossf/osv-schema/bindings/go/osvschema"
18+
"go.opentelemetry.io/otel"
1819
)
1920

2021
const gcsProtoPrefix = "all/pb/"
@@ -23,6 +24,10 @@ const gcsProtoPrefix = "all/pb/"
2324
// sets up the worker pipeline, and starts the GCS object iteration.
2425
func main() {
2526
logger.InitGlobalLogger()
27+
defer logger.Close()
28+
29+
ctx, span := otel.Tracer("exporter").Start(context.Background(), "exporter")
30+
defer span.End()
2631

2732
outBucketName := flag.String("bucket", "osv-test-vulnerabilities", "Output bucket or directory name. If -local is true, this is a local path; otherwise, it's a GCS bucket name.")
2833
vulnBucketName := flag.String("osv-vulns-bucket", os.Getenv("OSV_VULNERABILITIES_BUCKET"), "GCS bucket to read vulnerability protobufs from. Can also be set with the OSV_VULNERABILITIES_BUCKET environment variable.")
@@ -31,22 +36,22 @@ func main() {
3136

3237
flag.Parse()
3338

34-
logger.Info("exporter starting",
39+
logger.InfoContext(ctx, "exporter starting",
3540
slog.String("bucket", *outBucketName),
3641
slog.String("osv-vulns-bucket", *vulnBucketName),
3742
slog.Bool("upload-to-gcs", *uploadToGCS),
3843
slog.Int("workers", *numWorkers))
3944

4045
if *vulnBucketName == "" {
41-
logger.Fatal("OSV_VULNERABILITIES_BUCKET must be set")
46+
logger.FatalContext(ctx, "OSV_VULNERABILITIES_BUCKET must be set")
4247
}
4348

44-
ctx, cancel := context.WithCancel(context.Background())
49+
ctx, cancel := context.WithCancel(ctx)
4550
defer cancel()
4651

4752
storageClient, err := storage.NewClient(ctx)
4853
if err != nil {
49-
logger.Fatal("failed to create storage client", slog.Any("err", err))
54+
logger.FatalContext(ctx, "failed to create storage client", slog.Any("err", err))
5055
}
5156
defer storageClient.Close()
5257

@@ -93,13 +98,13 @@ func main() {
9398
MainLoop:
9499
for path, err := range vulnClient.Objects(ctx, gcsProtoPrefix) {
95100
if err != nil {
96-
logger.Fatal("failed to list objects", slog.Any("err", err))
101+
logger.FatalContext(ctx, "failed to list objects", slog.Any("err", err))
97102
}
98103
// Only log when we see a new ID prefix (i.e. roughly once per data source)
99104
prefix := filepath.Base(path)
100105
prefix, _, _ = strings.Cut(prefix, "-")
101106
if prefix != prevPrefix {
102-
logger.Info("iterating vulnerabilities", slog.String("now_at", path))
107+
logger.InfoContext(ctx, "iterating vulnerabilities", slog.String("now_at", path))
103108
prevPrefix = prefix
104109
}
105110
select {
@@ -117,17 +122,17 @@ MainLoop:
117122
writerWg.Wait()
118123

119124
if ctx.Err() != nil {
120-
logger.Fatal("exporter cancelled")
125+
logger.FatalContext(ctx, "exporter cancelled")
121126
}
122-
logger.Info("export completed successfully")
127+
logger.InfoContext(ctx, "export completed successfully")
123128
}
124129

125130
// ecosystemRouter receives vulnerabilities from inCh and fans them out to the
126131
// appropriate ecosystemWorker. It creates workers on-demand for each new
127132
// ecosystem encountered. It also sends every vulnerability to the allEcosystemWorker.
128133
func ecosystemRouter(ctx context.Context, inCh <-chan *osvschema.Vulnerability, outCh chan<- writeMsg, wg *sync.WaitGroup) {
129134
defer wg.Done()
130-
logger.Info("ecosystem router starting")
135+
logger.InfoContext(ctx, "ecosystem router starting")
131136
workers := make(map[string]*ecosystemWorker)
132137
var workersWg sync.WaitGroup
133138
vulnCounter := 0
@@ -190,8 +195,8 @@ RouterLoop:
190195
allEcosystemWorker.Finish()
191196
workersWg.Wait()
192197
if ctx.Err() == nil {
193-
logger.Info("ecosystem router finished, all vulnerabilities dispatched", slog.Int("total_vulnerabilities", vulnCounter))
198+
logger.InfoContext(ctx, "ecosystem router finished, all vulnerabilities dispatched", slog.Int("total_vulnerabilities", vulnCounter))
194199
} else {
195-
logger.Info("ecosystem router cancelled", slog.Any("err", ctx.Err()))
200+
logger.InfoContext(ctx, "ecosystem router cancelled", slog.Any("err", ctx.Err()))
196201
}
197202
}

go/cmd/exporter/worker.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818

1919
"github.com/google/osv.dev/go/logger"
2020
"github.com/ossf/osv-schema/bindings/go/osvschema"
21+
"go.opentelemetry.io/otel"
2122
"google.golang.org/protobuf/encoding/protojson"
2223
)
2324

@@ -60,7 +61,10 @@ type vulnData struct {
6061
// zip, csv, and (for GIT) vanir files.
6162
func (w *ecosystemWorker) run(ctx context.Context, outCh chan<- writeMsg, wg *sync.WaitGroup) {
6263
defer wg.Done()
63-
logger.Info("new ecosystem worker started", slog.String("ecosystem", w.ecosystem))
64+
ctx, span := otel.Tracer("exporter").Start(ctx, w.ecosystem)
65+
defer span.End()
66+
67+
logger.InfoContext(ctx, "new ecosystem worker started", slog.String("ecosystem", w.ecosystem))
6468
var allVulns []vulnData
6569
var csvData [][]string
6670
var vanirVulns []vulnData
@@ -72,7 +76,7 @@ WorkLoop:
7276
// Wait to receive a vulnerability, or be cancelled.
7377
select {
7478
case <-ctx.Done():
75-
logger.Warn("ecosystem worker cancelled", slog.String("ecosystem", w.ecosystem), slog.Any("err", ctx.Err()))
79+
logger.WarnContext(ctx, "ecosystem worker cancelled", slog.String("ecosystem", w.ecosystem), slog.Any("err", ctx.Err()))
7680
return
7781
case v, ok = <-w.inCh:
7882
if !ok {
@@ -82,15 +86,15 @@ WorkLoop:
8286
// Process vulnerability.
8387
b, err := marshalToJSON(v)
8488
if err != nil {
85-
logger.Error("failed to marshal vulnerability to json", slog.String("id", v.GetId()), slog.Any("err", err))
89+
logger.ErrorContext(ctx, "failed to marshal vulnerability to json", slog.String("id", v.GetId()), slog.Any("err", err))
8690
continue
8791
}
8892

8993
// Wait to send the result, or be cancelled.
9094
select {
9195
case outCh <- writeMsg{path: filepath.Join(w.ecosystem, v.GetId()) + ".json", mimeType: "application/json", data: b}:
9296
case <-ctx.Done():
93-
logger.Warn("ecosystem worker cancelled", slog.String("ecosystem", w.ecosystem), slog.Any("err", ctx.Err()))
97+
logger.WarnContext(ctx, "ecosystem worker cancelled", slog.String("ecosystem", w.ecosystem), slog.Any("err", ctx.Err()))
9498
return
9599
}
96100

@@ -110,13 +114,13 @@ WorkLoop:
110114
}
111115
}
112116

113-
logger.Info("All vulnerabilities processed", slog.String("ecosystem", w.ecosystem))
117+
logger.InfoContext(ctx, "All vulnerabilities processed", slog.String("ecosystem", w.ecosystem))
114118
writeModifiedIDCSV(ctx, filepath.Join(w.ecosystem, modifiedCSVFilename), csvData, outCh)
115119
writeZIP(ctx, filepath.Join(w.ecosystem, allZipFilename), allVulns, outCh)
116120
if w.ecosystem == gitEcosystem {
117121
writeVanir(ctx, vanirVulns, outCh)
118122
}
119-
logger.Info("ecosystem worker finished processing", slog.String("ecosystem", w.ecosystem))
123+
logger.InfoContext(ctx, "ecosystem worker finished processing", slog.String("ecosystem", w.ecosystem))
120124
}
121125

122126
// Finish signals the worker to stop processing by closing its input channel.
@@ -153,23 +157,26 @@ func newAllEcosystemWorker(ctx context.Context, outCh chan<- writeMsg, wg *sync.
153157
// and generates the global all.zip, modified_id.csv, and ecosystems.txt files.
154158
func (w *allEcosystemWorker) run(ctx context.Context, outCh chan<- writeMsg, wg *sync.WaitGroup) {
155159
defer wg.Done()
156-
logger.Info("all-ecosystem worker started")
160+
ctx, span := otel.Tracer("exporter").Start(ctx, "all-ecosystems")
161+
defer span.End()
162+
163+
logger.InfoContext(ctx, "all-ecosystem worker started")
157164
var allVulns []vulnData
158165
var csvData [][]string
159166
ecosystems := make(map[string]struct{})
160167
WorkLoop:
161168
for {
162169
select {
163170
case <-ctx.Done():
164-
logger.Warn("all-ecosystem worker cancelled", slog.Any("err", ctx.Err()))
171+
logger.WarnContext(ctx, "all-ecosystem worker cancelled", slog.Any("err", ctx.Err()))
165172
return
166173
case v, ok := <-w.inCh:
167174
if !ok {
168175
break WorkLoop
169176
}
170177
b, err := marshalToJSON(v.Vulnerability)
171178
if err != nil {
172-
logger.Error("failed to marshal vulnerability to json", slog.String("id", v.GetId()), slog.Any("err", err))
179+
logger.ErrorContext(ctx, "failed to marshal vulnerability to json", slog.String("id", v.GetId()), slog.Any("err", err))
173180
continue
174181
}
175182
modified := v.GetModified().AsTime()
@@ -186,7 +193,7 @@ WorkLoop:
186193
slices.Sort(ecos)
187194
ecoString := strings.Join(ecos, "\n") + "\n"
188195
write(ctx, ecosystemsFilename, []byte(ecoString), "text/plain", outCh)
189-
logger.Info("all-ecosystem worker finished processing")
196+
logger.InfoContext(ctx, "all-ecosystem worker finished processing")
190197
}
191198

192199
// Finish signals the worker to stop processing by closing its input channel.
@@ -219,7 +226,7 @@ func write(ctx context.Context, path string, data []byte, mimeType string, outCh
219226

220227
// writeModifiedIDCSV constructs and writes a modified_id.csv file.
221228
func writeModifiedIDCSV(ctx context.Context, path string, csvData [][]string, outCh chan<- writeMsg) {
222-
logger.Info("constructing csv file", slog.String("path", path))
229+
logger.InfoContext(ctx, "constructing csv file", slog.String("path", path))
223230
slices.SortFunc(csvData, func(a, b []string) int {
224231
return cmp.Or(
225232
-cmp.Compare(a[0], b[0]), // Modified date, descending
@@ -230,17 +237,17 @@ func writeModifiedIDCSV(ctx context.Context, path string, csvData [][]string, ou
230237
var buf bytes.Buffer
231238
wr := csv.NewWriter(&buf)
232239
if err := wr.WriteAll(csvData); err != nil {
233-
logger.Error("failed writing csv", slog.String("path", path), slog.Any("err", err))
240+
logger.ErrorContext(ctx, "failed writing csv", slog.String("path", path), slog.Any("err", err))
234241
return
235242
}
236243
wr.Flush()
237-
logger.Info("writing csv file", slog.String("path", path))
244+
logger.InfoContext(ctx, "writing csv file", slog.String("path", path))
238245
write(ctx, path, buf.Bytes(), "text/csv", outCh)
239246
}
240247

241248
// writeZIP constructs and writes an all.zip file.
242249
func writeZIP(ctx context.Context, path string, allVulns []vulnData, outCh chan<- writeMsg) {
243-
logger.Info("constructing zip file", slog.String("path", path))
250+
logger.InfoContext(ctx, "constructing zip file", slog.String("path", path))
244251
slices.SortFunc(allVulns, func(a, b vulnData) int {
245252
return cmp.Compare(a.id, b.id)
246253
})
@@ -253,18 +260,18 @@ func writeZIP(ctx context.Context, path string, allVulns []vulnData, outCh chan<
253260
Method: zip.Deflate,
254261
})
255262
if err != nil {
256-
logger.Error("failed to create vuln json in zip file", slog.String("id", vuln.id), slog.Any("err", err))
263+
logger.ErrorContext(ctx, "failed to create vuln json in zip file", slog.String("id", vuln.id), slog.Any("err", err))
257264
continue
258265
}
259266
r := bytes.NewReader(vuln.data)
260267
if _, err := io.Copy(w, r); err != nil {
261-
logger.Error("failed to write vuln json in zip file", slog.String("id", vuln.id), slog.Any("err", err))
268+
logger.ErrorContext(ctx, "failed to write vuln json in zip file", slog.String("id", vuln.id), slog.Any("err", err))
262269
}
263270
}
264271
if err := wr.Close(); err != nil {
265-
logger.Error("failed to close zip writer", slog.String("path", path), slog.Any("err", err))
272+
logger.ErrorContext(ctx, "failed to close zip writer", slog.String("path", path), slog.Any("err", err))
266273
}
267-
logger.Info("writing zip file", slog.String("path", path))
274+
logger.InfoContext(ctx, "writing zip file", slog.String("path", path))
268275
write(ctx, path, buf.Bytes(), "application/zip", outCh)
269276
}
270277

@@ -277,7 +284,7 @@ func writeVanir(ctx context.Context, vanirVulns []vulnData, outCh chan<- writeMs
277284
}
278285
finalJSON, err := json.Marshal(vulns)
279286
if err != nil {
280-
logger.Error("failed to marshal vanir JSON file", slog.Any("err", err))
287+
logger.ErrorContext(ctx, "failed to marshal vanir JSON file", slog.Any("err", err))
281288
return
282289
}
283290
write(ctx, filepath.Join(gitEcosystem, vanirVulnsFilename), finalJSON, "application/json", outCh)

0 commit comments

Comments
 (0)