Skip to content

Commit d7a31da

Browse files
adri1197Adrian Fernandez De La Torre
authored andcommitted
Setup OTEL provider type
Signed-off-by: Adrian Fernandez De La Torre <adri1197@gmail.com>
1 parent d7cba67 commit d7a31da

8 files changed

Lines changed: 348 additions & 5 deletions

File tree

api/v1beta3/provider_types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,14 @@ const (
5252
PagerDutyProvider string = "pagerduty"
5353
DataDogProvider string = "datadog"
5454
NATSProvider string = "nats"
55+
OTELProvider string = "otel"
5556
)
5657

5758
// ProviderSpec defines the desired state of the Provider.
5859
// +kubebuilder:validation:XValidation:rule="self.type == 'github' || self.type == 'gitlab' || self.type == 'gitea' || self.type == 'bitbucketserver' || self.type == 'bitbucket' || self.type == 'azuredevops' || !has(self.commitStatusExpr)", message="spec.commitStatusExpr is only supported for the 'github', 'gitlab', 'gitea', 'bitbucketserver', 'bitbucket', 'azuredevops' provider types"
5960
type ProviderSpec struct {
6061
// Type specifies which Provider implementation to use.
61-
// +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;generic-hmac;github;gitlab;gitea;bitbucketserver;bitbucket;azuredevops;googlechat;googlepubsub;webex;sentry;azureeventhub;telegram;lark;matrix;opsgenie;alertmanager;grafana;githubdispatch;pagerduty;datadog;nats
62+
// +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;generic-hmac;github;gitlab;gitea;bitbucketserver;bitbucket;azuredevops;googlechat;googlepubsub;webex;sentry;azureeventhub;telegram;lark;matrix;opsgenie;alertmanager;grafana;githubdispatch;pagerduty;datadog;nats;otel
6263
// +required
6364
Type string `json:"type"`
6465

config/crd/bases/notification.toolkit.fluxcd.io_providers.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ spec:
385385
- pagerduty
386386
- datadog
387387
- nats
388+
- otel
388389
type: string
389390
username:
390391
description: Username specifies the name under which events are posted.

go.mod

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ require (
4141
github.com/spf13/pflag v1.0.7
4242
github.com/stretchr/testify v1.10.0
4343
gitlab.com/gitlab-org/api/client-go v0.137.0
44+
go.opentelemetry.io/otel v1.37.0
45+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0
46+
go.opentelemetry.io/otel/sdk v1.37.0
47+
go.opentelemetry.io/otel/trace v1.37.0
4448
golang.org/x/oauth2 v0.30.0
4549
golang.org/x/text v0.27.0
4650
google.golang.org/api v0.243.0
@@ -78,6 +82,7 @@ require (
7882
github.com/blang/semver/v4 v4.0.0 // indirect
7983
github.com/bradleyfalzon/ghinstallation/v2 v2.16.0 // indirect
8084
github.com/carapace-sh/carapace-shlex v1.0.1 // indirect
85+
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
8186
github.com/cespare/xxhash/v2 v2.3.0 // indirect
8287
github.com/chai2010/gettext-go v1.0.2 // indirect
8388
github.com/cloudevents/sdk-go/v2 v2.15.2 // indirect
@@ -122,6 +127,7 @@ require (
122127
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
123128
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
124129
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
130+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect
125131
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
126132
github.com/hashicorp/go-version v1.7.0 // indirect
127133
github.com/inconshreveable/mousetrap v1.1.0 // indirect
@@ -167,9 +173,9 @@ require (
167173
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
168174
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
169175
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
170-
go.opentelemetry.io/otel v1.37.0 // indirect
176+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect
171177
go.opentelemetry.io/otel/metric v1.37.0 // indirect
172-
go.opentelemetry.io/otel/trace v1.37.0 // indirect
178+
go.opentelemetry.io/proto/otlp v1.7.0 // indirect
173179
go.uber.org/multierr v1.11.0 // indirect
174180
go.uber.org/zap v1.27.0 // indirect
175181
go.yaml.in/yaml/v2 v2.4.2 // indirect

go.sum

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ github.com/carapace-sh/carapace-shlex v1.0.1 h1:ww0JCgWpOVuqWG7k3724pJ18Lq8gh5pH
7474
github.com/carapace-sh/carapace-shlex v1.0.1/go.mod h1:lJ4ZsdxytE0wHJ8Ta9S7Qq0XpjgjU0mdfCqiI2FHx7M=
7575
github.com/cdevents/sdk-go v0.4.1 h1:Cr/iH/I51Z+slxKRx9AV7stn6hr2pjRHQ5wpPJhRLTU=
7676
github.com/cdevents/sdk-go v0.4.1/go.mod h1:3IhWLoY4vsyUEzv7XJbyr0BRQ0KPgvNx+wiD2hQGFNU=
77+
github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8=
78+
github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
7779
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
7880
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
7981
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@@ -260,6 +262,8 @@ github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5T
260262
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
261263
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
262264
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
265+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 h1:X5VWvz21y3gzm9Nw/kaUeku/1+uBhcekkmy4IkffJww=
266+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1/go.mod h1:Zanoh4+gvIgluNqcfMVTJueD4wSS5hT7zTt4Mrutd90=
263267
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
264268
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
265269
github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
@@ -419,14 +423,20 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 h1:F7Jx+6h
419423
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0/go.mod h1:UHB22Z8QsdRDrnAtX4PntOl36ajSxcdUMt1sF7Y6E7Q=
420424
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
421425
go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
426+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 h1:Ahq7pZmv87yiyn3jeFz/LekZmPLLdKejuO3NcK9MssM=
427+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0/go.mod h1:MJTqhM0im3mRLw1i8uGHnCvUEeS7VwRyxlLC78PA18M=
428+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0 h1:bDMKF3RUSxshZ5OjOTi8rsHGaPKsAt76FaqgvIUySLc=
429+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0/go.mod h1:dDT67G/IkA46Mr2l9Uj7HsQVwsjASyV9SjGofsiUZDA=
422430
go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE=
423431
go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E=
424-
go.opentelemetry.io/otel/sdk v1.36.0 h1:b6SYIuLRs88ztox4EyrvRti80uXIFy+Sqzoh9kFULbs=
425-
go.opentelemetry.io/otel/sdk v1.36.0/go.mod h1:+lC+mTgD+MUWfjJubi2vvXWcVxyr9rmlshZni72pXeY=
432+
go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI=
433+
go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg=
426434
go.opentelemetry.io/otel/sdk/metric v1.36.0 h1:r0ntwwGosWGaa0CrSt8cuNuTcccMXERFwHX4dThiPis=
427435
go.opentelemetry.io/otel/sdk/metric v1.36.0/go.mod h1:qTNOhFDfKRwX0yXOqJYegL5WRaW376QbB7P4Pb0qva4=
428436
go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
429437
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
438+
go.opentelemetry.io/proto/otlp v1.7.0 h1:jX1VolD6nHuFzOYso2E73H85i92Mv8JQYk0K9vz09os=
439+
go.opentelemetry.io/proto/otlp v1.7.0/go.mod h1:fSKjH6YJ7HDlwzltzyMj036AJ3ejJLCgCSHGj4efDDo=
430440
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
431441
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
432442
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=

internal/notifier/factory.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ var (
6060
apiv1.BitbucketServerProvider: bitbucketServerNotifierFunc,
6161
apiv1.BitbucketProvider: bitbucketNotifierFunc,
6262
apiv1.AzureDevOpsProvider: azureDevOpsNotifierFunc,
63+
apiv1.OTELProvider: otelNotifierFunc,
6364
}
6465
)
6566

@@ -355,3 +356,10 @@ func azureDevOpsNotifierFunc(opts notifierOptions) (Interface, error) {
355356
opts.TLSConfig, opts.ProxyURL, opts.ServiceAccountName, opts.ProviderName,
356357
opts.ProviderNamespace, opts.TokenClient, opts.TokenCache)
357358
}
359+
360+
func otelNotifierFunc(opts notifierOptions) (Interface, error) {
361+
if opts.Token == "" && opts.Password != "" {
362+
opts.Token = opts.Password
363+
}
364+
return NewOTLPTracer(opts.Context, opts.URL, opts.ProxyURL, opts.Headers, opts.TLSConfig, opts.Username, opts.Token)
365+
}

internal/notifier/otel.go

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
/*
2+
Copyright 2025 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package notifier
18+
19+
import (
20+
"context"
21+
"crypto/sha256"
22+
"crypto/tls"
23+
"encoding/base64"
24+
"fmt"
25+
"net/http"
26+
"net/url"
27+
"slices"
28+
29+
"go.opentelemetry.io/otel/attribute"
30+
"go.opentelemetry.io/otel/codes"
31+
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
32+
sdktrace "go.opentelemetry.io/otel/sdk/trace"
33+
"go.opentelemetry.io/otel/trace"
34+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35+
"sigs.k8s.io/controller-runtime/pkg/log"
36+
37+
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
38+
39+
apiv1beta3 "github.com/fluxcd/notification-controller/api/v1beta3"
40+
)
41+
42+
type alertMetadataContextKey struct{}
43+
44+
// Context key functions
45+
func WithAlertMetadata(ctx context.Context, metadata metav1.ObjectMeta) context.Context {
46+
return context.WithValue(ctx, alertMetadataContextKey{}, metadata)
47+
}
48+
49+
func GetAlertMetadata(ctx context.Context) (metav1.ObjectMeta, bool) {
50+
metadata, ok := ctx.Value(alertMetadataContextKey{}).(metav1.ObjectMeta)
51+
return metadata, ok
52+
}
53+
54+
type OTLPTracer struct {
55+
tracerProvider *sdktrace.TracerProvider
56+
tracer trace.Tracer
57+
}
58+
59+
func NewOTLPTracer(ctx context.Context, urlStr string, proxyURL string, headers map[string]string, tlsConfig *tls.Config, username string, password string) (*OTLPTracer, error) {
60+
// Set up OTLP exporter options
61+
httpOptions := []otlptracehttp.Option{
62+
otlptracehttp.WithEndpointURL(urlStr),
63+
}
64+
65+
// Add headers if available
66+
if len(headers) > 0 {
67+
// Add authentication header, if it doesn't exist yet
68+
if headers["Authorization"] == "" {
69+
// If username is not set, password is considered as token
70+
if username == "" {
71+
headers["Authorization"] = "Bearer " + password
72+
} else if username != "" && password != "" {
73+
auth := base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
74+
headers["Authorization"] = "Basic " + auth
75+
}
76+
}
77+
httpOptions = append(httpOptions, otlptracehttp.WithHeaders(headers))
78+
}
79+
80+
// Add TLS config if available
81+
if tlsConfig != nil {
82+
httpOptions = append(httpOptions, otlptracehttp.WithTLSClientConfig(tlsConfig))
83+
}
84+
85+
// Add proxy if available
86+
if proxyURL != "" {
87+
proxyURLparsed, err := url.Parse(proxyURL)
88+
if err != nil {
89+
return nil, fmt.Errorf("failed to proxy URL - %s: %w", proxyURL, err)
90+
} else {
91+
if username != "" && password != "" {
92+
proxyURLparsed.User = url.UserPassword(username, password)
93+
}
94+
httpOptions = append(httpOptions, otlptracehttp.WithProxy(func(*http.Request) (*url.URL, error) {
95+
return proxyURLparsed, nil
96+
}))
97+
}
98+
}
99+
100+
exporter, err := otlptracehttp.New(ctx, httpOptions...)
101+
if err != nil {
102+
return nil, err
103+
}
104+
105+
// Create TracerProvider once
106+
tp := sdktrace.NewTracerProvider(
107+
sdktrace.WithBatcher(exporter),
108+
)
109+
110+
log.FromContext(ctx).Info("Successfully created OTEL tracer")
111+
return &OTLPTracer{
112+
tracerProvider: tp,
113+
tracer: tp.Tracer("flux:notification-controller"),
114+
}, nil
115+
}
116+
117+
// Post implements the notifier.Interface
118+
func (t *OTLPTracer) Post(ctx context.Context, event eventv1.Event) error {
119+
// Skip Git commit status update event.
120+
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) {
121+
return nil
122+
}
123+
124+
logger := log.FromContext(ctx).WithValues(
125+
"event", event.Reason,
126+
"object", fmt.Sprintf("%s/%s/%s", event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name),
127+
"severity", event.Severity,
128+
)
129+
logger.Info("OTEL Post function called", "event", event.Reason)
130+
131+
alert, ok := GetAlertMetadata(ctx)
132+
if !ok {
133+
return fmt.Errorf("alert metadata not found in context")
134+
}
135+
136+
// Extract revision from event metadata
137+
revision := extractMetadata(event.Metadata, "revision")
138+
139+
// TraceID: <AlertUID>:<revisionID>
140+
logger.V(1).Info("Generating trace IDs", "alertUID", string(alert.UID), "revision", revision)
141+
traceIDStr := generateID(string(alert.UID), revision)
142+
// spanIDStr := generateID(string(event.InvolvedObject.UID),
143+
// fmt.Sprintf("%s/%s/%s", event.InvolvedObject.Kind,
144+
// event.InvolvedObject.Namespace, event.InvolvedObject.Name))
145+
146+
var traceID trace.TraceID
147+
// var spanID trace.SpanID
148+
copy(traceID[:], traceIDStr[:16])
149+
// copy(spanID[:], spanIDStr[:8])
150+
151+
// Determine span relationship based on Flux object hierarchy
152+
var spanCtx context.Context = t.createSpanContext(ctx, event, traceID)
153+
154+
// Create single span with proper attributes
155+
if event.InvolvedObject.Kind != "HelmRepository" {
156+
logger.Info("Processing OTEL notification", "alert", alert.Name)
157+
158+
} else {
159+
logger.Info("OTEL notification skipped", "alert", alert.Name)
160+
}
161+
162+
span := t.processSpan(spanCtx, event)
163+
// Set status based on event severity
164+
if event.Severity == eventv1.EventSeverityError {
165+
span.SetStatus(codes.Error, event.Message)
166+
} else {
167+
span.SetStatus(codes.Ok, event.Message)
168+
}
169+
170+
defer span.End()
171+
172+
serviceName := fmt.Sprintf("%s: %s/%s", apiv1beta3.AlertKind, alert.Namespace, alert.Name)
173+
logger.Info("Successfully sent trace to OTLP endpoint",
174+
"alert", serviceName,
175+
)
176+
177+
return nil
178+
}
179+
180+
func (t *OTLPTracer) createSpanContext(ctx context.Context, event eventv1.Event, traceID trace.TraceID) context.Context {
181+
kind := event.InvolvedObject.Kind
182+
183+
spanContext := trace.NewSpanContext(trace.SpanContextConfig{
184+
TraceID: traceID,
185+
TraceFlags: trace.FlagsSampled,
186+
})
187+
188+
// Root spans: Sources that start the deployment flow
189+
if isSource(kind) {
190+
return trace.ContextWithSpanContext(context.Background(),
191+
spanContext.WithTraceFlags(spanContext.TraceFlags()))
192+
}
193+
194+
// Child spans: Everything else inherits from the same trace
195+
return trace.ContextWithSpanContext(ctx,
196+
spanContext.WithTraceFlags(spanContext.TraceFlags()))
197+
}
198+
199+
func (t *OTLPTracer) processSpan(ctx context.Context, event eventv1.Event) trace.Span {
200+
// Build span attributes including metadata
201+
eventAttrs := []attribute.KeyValue{
202+
attribute.String("object.uid", string(event.InvolvedObject.UID)),
203+
attribute.String("object.kind", event.InvolvedObject.Kind),
204+
attribute.String("object.name", event.InvolvedObject.Name),
205+
attribute.String("object.namespace", event.InvolvedObject.Namespace),
206+
}
207+
208+
// Add metadata as event attributes
209+
for k, v := range event.Metadata {
210+
eventAttrs = append(eventAttrs, attribute.String(k, v))
211+
}
212+
213+
// Start span
214+
spanName := fmt.Sprintf("%s: %s/%s", event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name)
215+
_, span := t.tracer.Start(ctx, spanName,
216+
trace.WithAttributes(eventAttrs...),
217+
trace.WithTimestamp(event.Timestamp.Time))
218+
219+
return span
220+
}
221+
222+
// Add cleanup method
223+
func (t *OTLPTracer) Close(ctx context.Context) error {
224+
return t.tracerProvider.Shutdown(ctx)
225+
}
226+
227+
// Add this function to generate trace and span ID
228+
func generateID(UID string, rest string) []byte {
229+
input := fmt.Sprintf("%s:%s", UID, rest)
230+
hash := sha256.Sum256([]byte(input))
231+
return hash[:]
232+
}
233+
234+
func extractMetadata(metadata map[string]string, key string) string {
235+
if v, ok := metadata[key]; ok {
236+
return v
237+
}
238+
return "unknown"
239+
}
240+
241+
func isSource(kind string) bool {
242+
sourceKinds := []string{"GitRepository", "HelmChart", "OCIRepository", "Bucket"}
243+
return slices.Contains(sourceKinds, kind)
244+
}

0 commit comments

Comments
 (0)