@@ -18,6 +18,9 @@ package server
1818
1919import (
2020 "context"
21+ "crypto/sha256"
22+ "encoding/base64"
23+ "encoding/hex"
2124 "errors"
2225 "fmt"
2326 "net/http"
@@ -27,6 +30,15 @@ import (
2730 "strings"
2831 "time"
2932
33+ "go.opentelemetry.io/otel"
34+ "go.opentelemetry.io/otel/attribute"
35+ "go.opentelemetry.io/otel/codes"
36+ "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
37+ "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
38+ "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
39+ sdktrace "go.opentelemetry.io/otel/sdk/trace"
40+ semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
41+ "go.opentelemetry.io/otel/trace"
3042 corev1 "k8s.io/api/core/v1"
3143 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3244 "k8s.io/apimachinery/pkg/labels"
@@ -88,11 +100,24 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
88100 "providerName" : alert .Spec .ProviderRef .Name ,
89101 })
90102 ctx := log .IntoContext (ctx , alertLogger )
103+ // OTEL processing
104+ var provider apiv1beta3.Provider
105+ providerName := types.NamespacedName {
106+ Namespace : alert .Namespace ,
107+ Name : alert .Spec .ProviderRef .Name ,
108+ }
109+ if err := s .kubeClient .Get (ctx , providerName , & provider ); err == nil {
110+ s .processTracing (ctx , event , alert , & provider )
111+ }
112+
91113 if err := s .dispatchNotification (ctx , event , alert ); err != nil {
92114 alertLogger .Error (err , "failed to dispatch notification" )
93115 s .Eventf (alert , corev1 .EventTypeWarning , "NotificationDispatchFailed" ,
94116 "failed to dispatch notification for %s: %s" , involvedObjectString (event .InvolvedObject ), err )
95117 }
118+ // else {
119+
120+ // }
96121 }
97122
98123 w .WriteHeader (http .StatusAccepted )
@@ -609,3 +634,182 @@ func excludeInternalMetadata(event *eventv1.Event) {
609634 delete (event .Metadata , key )
610635 }
611636}
637+
638+ // Add this function to generate root span ID
639+ func generateRootSpanID (alertUID , sourceRevision string ) string {
640+ input := fmt .Sprintf ("%s:%s" , alertUID , sourceRevision )
641+ hash := sha256 .Sum256 ([]byte (input ))
642+ return hex .EncodeToString (hash [:])
643+ }
644+
645+ // Add this function to check if provider supports OTLP
646+ func isOTLPProvider (providerType string ) bool {
647+ otlpProviders := []string {"jaeger" , "tempo" , "otlp" , "generic" }
648+ return slices .Contains (otlpProviders , providerType )
649+ }
650+
651+ // Add this function to process tracing
652+ func (s * EventServer ) processTracing (ctx context.Context , event * eventv1.Event , alert * apiv1beta3.Alert , provider * apiv1beta3.Provider ) {
653+
654+ if isOTLPProvider (provider .Spec .Type ) {
655+ s .setupOTLPExporter (ctx , provider )
656+ s .sendOTLPTrace (ctx , event , alert , provider )
657+ } else {
658+ s .logTraceWarning (ctx , event , alert )
659+ }
660+ }
661+
662+ // Add this function to send OTLP traces
663+ func (s * EventServer ) sendOTLPTrace (ctx context.Context , event * eventv1.Event , alert * apiv1beta3.Alert , provider * apiv1beta3.Provider ) {
664+ revision , hasRevision := event .GetRevision ()
665+ if ! hasRevision {
666+ return
667+ }
668+
669+ var spanCtx context.Context = ctx
670+ spanID := generateRootSpanID (string (alert .UID ), revision )
671+ tracer := otel .Tracer ("flux-notification-controller" )
672+
673+ // If a source kind is considered a potential root span
674+ if isSourceKind (event .InvolvedObject .Kind ) {
675+ if ! s .spanExists (ctx , spanID , provider ) {
676+ spanCtx = context .Background () // Create root span
677+ }
678+ }
679+
680+ _ , span := tracer .Start (spanCtx , event .InvolvedObject .Kind )
681+ defer span .End ()
682+
683+ span .SetAttributes (
684+ semconv .ServiceName ("flux-notification-controller" ),
685+ attribute .String ("flux.trace.id" , spanID ),
686+ attribute .String ("flux.object.kind" , event .InvolvedObject .Kind ),
687+ attribute .String ("flux.object.name" , event .InvolvedObject .Name ),
688+ attribute .String ("flux.object.namespace" , event .InvolvedObject .Namespace ),
689+ attribute .String ("flux.event.reason" , event .Reason ),
690+ )
691+
692+ span .AddEvent (event .Message , trace .WithTimestamp (event .Timestamp .Time ))
693+ if event .Severity == "error" {
694+ span .SetStatus (codes .Error , event .Message )
695+ }
696+ }
697+
698+ func (s * EventServer ) setupOTLPExporter (ctx context.Context , provider * apiv1beta3.Provider ) {
699+ httpOptions := []otlptracehttp.Option {otlptracehttp .WithEndpoint (provider .Spec .Address )}
700+ grpcOptions := []otlptracegrpc.Option {otlptracegrpc .WithEndpoint (provider .Spec .Address )}
701+
702+ // NOTE: Posibly reuse extractAuthFromSecret()
703+ // Add authentication if secretRef is set
704+ if provider .Spec .SecretRef != nil {
705+ var secret corev1.Secret
706+ secretName := types.NamespacedName {
707+ Namespace : provider .Namespace ,
708+ Name : provider .Spec .SecretRef .Name ,
709+ }
710+ if err := s .kubeClient .Get (ctx , secretName , & secret ); err == nil {
711+ var headers map [string ]string
712+ if token , ok := secret .Data ["token" ]; ok {
713+ headers = map [string ]string {"Authorization" : "Bearer " + string (token )}
714+
715+ } else {
716+ user , userOk := secret .Data ["username" ]
717+ pass , passOk := secret .Data ["password" ]
718+ if userOk && passOk {
719+ auth := base64 .StdEncoding .EncodeToString ([]byte (string (user ) + ":" + string (pass )))
720+ headers = map [string ]string {"Authorization" : "Basic " + auth }
721+ }
722+ }
723+ httpOptions = append (httpOptions , otlptracehttp .WithHeaders (headers ))
724+ grpcOptions = append (grpcOptions , otlptracegrpc .WithHeaders (headers ))
725+ }
726+ }
727+
728+ var exporter * otlptrace.Exporter
729+ var err error
730+
731+ if err != nil {
732+ return
733+ }
734+
735+ if strings .HasPrefix (provider .Spec .Address , "http" ) {
736+ httpOptions = append (httpOptions , otlptracehttp .WithInsecure ())
737+ exporter , err = otlptracehttp .New (ctx , httpOptions ... )
738+ } else {
739+ grpcOptions = append (grpcOptions , otlptracegrpc .WithInsecure ())
740+ exporter , err = otlptracegrpc .New (ctx , grpcOptions ... )
741+ }
742+
743+ if err != nil {
744+ return
745+ }
746+
747+ defer exporter .Shutdown (ctx )
748+
749+ tp := sdktrace .NewTracerProvider (sdktrace .WithBatcher (exporter ))
750+ otel .SetTracerProvider (tp )
751+ defer tp .Shutdown (ctx )
752+ }
753+
754+ // Query to get the spanID
755+ func (s * EventServer ) spanExists (ctx context.Context , spanID string , provider * apiv1beta3.Provider ) bool {
756+ if ! isOTLPProvider (provider .Spec .Type ) {
757+ return false
758+ }
759+
760+ queryURL := fmt .Sprintf ("%s/api/traces/%s" , provider .Spec .Address , spanID )
761+ req , err := http .NewRequestWithContext (ctx , "GET" , queryURL , nil )
762+ if err != nil {
763+ return false
764+ }
765+
766+ if provider .Spec .SecretRef != nil {
767+ var secret corev1.Secret
768+ secretName := types.NamespacedName {
769+ Namespace : provider .Namespace ,
770+ Name : provider .Spec .SecretRef .Name ,
771+ }
772+ if err := s .kubeClient .Get (ctx , secretName , & secret ); err == nil {
773+ if token , ok := secret .Data ["token" ]; ok {
774+ req .Header .Set ("Authorization" , "Bearer " + string (token ))
775+ } else {
776+ user , userOk := secret .Data ["username" ]
777+ pass , passOk := secret .Data ["password" ]
778+ if userOk && passOk {
779+ req .SetBasicAuth (string (user ), string (pass ))
780+ }
781+ }
782+ }
783+ }
784+
785+ resp , err := http .DefaultClient .Do (req )
786+ if err != nil {
787+ return false
788+ }
789+ defer resp .Body .Close ()
790+
791+ return resp .StatusCode == http .StatusOK
792+ }
793+
794+ // Add this function to log trace warnings for non-OTLP providers
795+ func (s * EventServer ) logTraceWarning (ctx context.Context , event * eventv1.Event , alert * apiv1beta3.Alert ) {
796+ logger := log .FromContext (ctx )
797+ spanType := "child"
798+ if isSourceKind (event .InvolvedObject .Kind ) {
799+ spanType = "root"
800+ }
801+
802+ logger .Info ("trace information (provider does not support OTLP)" ,
803+ "spanType" , spanType ,
804+ "alertUID" , string (alert .UID ),
805+ "eventReason" , event .Reason ,
806+ "objectKind" , event .InvolvedObject .Kind ,
807+ "objectName" , event .InvolvedObject .Name ,
808+ "objectNamespace" , event .InvolvedObject .Namespace ,
809+ )
810+ }
811+
812+ func isSourceKind (kind string ) bool {
813+ sourceKinds := []string {"GitRepository" , "OCIRepository" , "HelmRepository" , "Bucket" }
814+ return slices .Contains (sourceKinds , kind )
815+ }
0 commit comments