diff --git a/go.mod b/go.mod index d5332c7..483c902 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,8 @@ require ( github.com/auth0/go-jwt-middleware/v2 v2.2.2 github.com/nats-io/nats.go v1.37.0 github.com/remychantenay/slog-otel v1.3.4 - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 - go.opentelemetry.io/contrib/propagators/jaeger v1.40.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.66.0 + go.opentelemetry.io/contrib/propagators/jaeger v1.41.0 go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.16.0 go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.16.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 @@ -18,9 +18,9 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0 go.opentelemetry.io/otel/log v0.16.0 - go.opentelemetry.io/otel/sdk v1.40.0 + go.opentelemetry.io/otel/sdk v1.41.0 go.opentelemetry.io/otel/sdk/log v0.16.0 - go.opentelemetry.io/otel/sdk/metric v1.40.0 + go.opentelemetry.io/otel/sdk/metric v1.41.0 goa.design/goa/v3 v3.25.3 ) @@ -33,7 +33,7 @@ require ( go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.40.0 // indirect - go.opentelemetry.io/otel/metric v1.40.0 // indirect + go.opentelemetry.io/otel/metric v1.41.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect ) @@ -53,14 +53,14 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/stretchr/testify v1.11.1 - go.opentelemetry.io/otel v1.40.0 - go.opentelemetry.io/otel/trace v1.40.0 + go.opentelemetry.io/otel v1.41.0 + go.opentelemetry.io/otel/trace v1.41.0 goa.design/clue v1.2.1 golang.org/x/crypto v0.47.0 // indirect golang.org/x/mod v0.32.0 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/sync v0.19.0 // indirect - golang.org/x/sys v0.40.0 // indirect + golang.org/x/sys v0.41.0 // indirect golang.org/x/term v0.39.0 // indirect golang.org/x/text v0.33.0 // indirect golang.org/x/tools v0.41.0 // indirect diff --git a/go.sum b/go.sum index 169ab06..9c2dff6 100644 --- a/go.sum +++ b/go.sum @@ -57,12 +57,12 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 h1:7iP2uCb7sGddAr30RRS6xjKy7AZ2JtTOPA3oolgVSw8= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0/go.mod h1:c7hN3ddxs/z6q9xwvfLPk+UHlWRQyaeR1LdgfL/66l0= -go.opentelemetry.io/contrib/propagators/jaeger v1.40.0 h1:aXl9uobjJs5vquMLt9ZkI/3zIuz8XQ3TqOKSWx0/xdU= -go.opentelemetry.io/contrib/propagators/jaeger v1.40.0/go.mod h1:ioMePqe6k6c/ovXSkmkMr1mbN5qRBGJxNTVop7/2XO0= -go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= -go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.66.0 h1:PnV4kVnw0zOmwwFkAzCN5O07fw1YOIQor120zrh0AVo= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.66.0/go.mod h1:ofAwF4uinaf8SXdVzzbL4OsxJ3VfeEg3f/F6CeF49/Y= +go.opentelemetry.io/contrib/propagators/jaeger v1.41.0 h1:uw+ghxLS0Wb98XfAhgJVQvAmmNJzT7jE30wJ4sKBrWs= +go.opentelemetry.io/contrib/propagators/jaeger v1.41.0/go.mod h1:Og4snN98wfeGoKm516W5+v0orAqwRRB5TAOJetk/HdM= +go.opentelemetry.io/otel v1.41.0 h1:YlEwVsGAlCvczDILpUXpIpPSL/VPugt7zHThEMLce1c= +go.opentelemetry.io/otel v1.41.0/go.mod h1:Yt4UwgEKeT05QbLwbyHXEwhnjxNO6D8L5PQP51/46dE= go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.16.0 h1:ZVg+kCXxd9LtAaQNKBxAvJ5NpMf7LpvEr4MIZqb0TMQ= go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.16.0/go.mod h1:hh0tMeZ75CCXrHd9OXRYxTlCAdxcXioWHFIpYw2rZu8= go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.16.0 h1:djrxvDxAe44mJUrKataUbOhCKhR3F8QCyWucO16hTQs= @@ -81,18 +81,18 @@ go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.40.0 h1:MzfofMZN8ulNqob go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.40.0/go.mod h1:E73G9UFtKRXrxhBsHtG00TB5WxX57lpsQzogDkqBTz8= go.opentelemetry.io/otel/log v0.16.0 h1:DeuBPqCi6pQwtCK0pO4fvMB5eBq6sNxEnuTs88pjsN4= go.opentelemetry.io/otel/log v0.16.0/go.mod h1:rWsmqNVTLIA8UnwYVOItjyEZDbKIkMxdQunsIhpUMes= -go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= -go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= -go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= -go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/metric v1.41.0 h1:rFnDcs4gRzBcsO9tS8LCpgR0dxg4aaxWlJxCno7JlTQ= +go.opentelemetry.io/otel/metric v1.41.0/go.mod h1:xPvCwd9pU0VN8tPZYzDZV/BMj9CM9vs00GuBjeKhJps= +go.opentelemetry.io/otel/sdk v1.41.0 h1:YPIEXKmiAwkGl3Gu1huk1aYWwtpRLeskpV+wPisxBp8= +go.opentelemetry.io/otel/sdk v1.41.0/go.mod h1:ahFdU0G5y8IxglBf0QBJXgSe7agzjE4GiTJ6HT9ud90= go.opentelemetry.io/otel/sdk/log v0.16.0 h1:e/b4bdlQwC5fnGtG3dlXUrNOnP7c8YLVSpSfEBIkTnI= go.opentelemetry.io/otel/sdk/log v0.16.0/go.mod h1:JKfP3T6ycy7QEuv3Hj8oKDy7KItrEkus8XJE6EoSzw4= go.opentelemetry.io/otel/sdk/log/logtest v0.16.0 h1:/XVkpZ41rVRTP4DfMgYv1nEtNmf65XPPyAdqV90TMy4= go.opentelemetry.io/otel/sdk/log/logtest v0.16.0/go.mod h1:iOOPgQr5MY9oac/F5W86mXdeyWZGleIx3uXO98X2R6Y= -go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= -go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= -go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= -go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= +go.opentelemetry.io/otel/sdk/metric v1.41.0 h1:siZQIYBAUd1rlIWQT2uCxWJxcCO7q3TriaMlf08rXw8= +go.opentelemetry.io/otel/sdk/metric v1.41.0/go.mod h1:HNBuSvT7ROaGtGI50ArdRLUnvRTRGniSUZbxiWxSO8Y= +go.opentelemetry.io/otel/trace v1.41.0 h1:Vbk2co6bhj8L59ZJ6/xFTskY+tGAbOnCtQGVVa9TIN0= +go.opentelemetry.io/otel/trace v1.41.0/go.mod h1:U1NU4ULCoxeDKc09yCWdWe+3QoyweJcISEVa1RBzOis= go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -109,8 +109,8 @@ golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= -golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= -golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY= golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww= golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= diff --git a/internal/infrastructure/messaging/messaging_repository.go b/internal/infrastructure/messaging/messaging_repository.go index 39f214b..f529a42 100644 --- a/internal/infrastructure/messaging/messaging_repository.go +++ b/internal/infrastructure/messaging/messaging_repository.go @@ -13,8 +13,50 @@ import ( "github.com/linuxfoundation/lfx-v2-access-check/internal/domain/contracts" "github.com/linuxfoundation/lfx-v2-access-check/pkg/constants" "github.com/nats-io/nats.go" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.39.0" + "go.opentelemetry.io/otel/trace" ) +// tracer is safe to initialize at package level. otel.Tracer() returns a +// delegating tracer that forwards to whatever TracerProvider is registered at +// call time, so otel.SetTracerProvider() updates it regardless of init order. +var tracer = otel.Tracer("github.com/linuxfoundation/lfx-v2-access-check/internal/infrastructure/messaging") + +// messagingReplyBodySizeKey records the size of the NATS reply payload. +// Named to distinguish it from messaging.message.body.size (the outbound request). +// There is no semconv standard attribute for reply size yet. +const messagingReplyBodySizeKey = attribute.Key("messaging.message.reply.body.size") + +// natsHeaderCarrier adapts nats.Header to the OTel TextMapCarrier interface +// so trace context can be injected into and extracted from NATS message headers. +type natsHeaderCarrier nats.Header + +func (c natsHeaderCarrier) Get(key string) string { + vals := c[key] + if len(vals) == 0 { + return "" + } + return vals[0] +} + +func (c natsHeaderCarrier) Set(key string, value string) { + c[key] = []string{value} +} + +func (c natsHeaderCarrier) Keys() []string { + keys := make([]string, 0, len(c)) + for k := range c { + keys = append(keys, k) + } + return keys +} + +var _ propagation.TextMapCarrier = natsHeaderCarrier{} + type messagingRepository struct { conn *nats.Conn } @@ -57,14 +99,58 @@ func NewMessagingRepository(natsURL string) (contracts.MessagingRepository, erro // Request sends a request message to the specified subject and waits for a response func (r *messagingRepository) Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error) { + ctx, span := tracer.Start(ctx, "nats.request", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + semconv.MessagingSystemKey.String("nats"), + semconv.MessagingOperationTypeSend, + semconv.MessagingDestinationName(subject), + semconv.MessagingMessageBodySize(len(data)), + ), + ) + defer span.End() + if r.conn == nil { + span.RecordError(constants.ErrNATSConnNotInit) + span.SetStatus(codes.Error, constants.ErrNATSConnNotInit.Error()) return nil, constants.ErrNATSConnNotInit } - msg, err := r.conn.Request(subject, data, timeout) + // Short-circuit immediately if the context is already canceled or expired. + if err := ctx.Err(); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + // Clamp timeout to the ctx deadline if it is shorter. + if deadline, ok := ctx.Deadline(); ok { + if remaining := time.Until(deadline); remaining < timeout { + timeout = remaining + } + } + // Short-circuit if timeout is zero or negative — either the caller passed + // an invalid duration or the ctx deadline was already past. + if timeout <= 0 { + span.RecordError(context.DeadlineExceeded) + span.SetStatus(codes.Error, context.DeadlineExceeded.Error()) + return nil, context.DeadlineExceeded + } + + // nats.NewMsg already initializes Header, so no extra make() needed. + natsMsg := nats.NewMsg(subject) + natsMsg.Data = data + otel.GetTextMapPropagator().Inject(ctx, natsHeaderCarrier(natsMsg.Header)) + + msg, err := r.conn.RequestMsg(natsMsg, timeout) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, constants.ErrMsgNATSRequestFailed) return nil, fmt.Errorf("%s: %w", constants.ErrMsgNATSRequestFailed, err) } + + span.SetAttributes(messagingReplyBodySizeKey.Int(len(msg.Data))) + span.SetStatus(codes.Ok, "") return msg.Data, nil } diff --git a/internal/infrastructure/messaging/messaging_repository_test.go b/internal/infrastructure/messaging/messaging_repository_test.go index b37195d..f537855 100644 --- a/internal/infrastructure/messaging/messaging_repository_test.go +++ b/internal/infrastructure/messaging/messaging_repository_test.go @@ -8,6 +8,11 @@ import ( "strings" "testing" "time" + + "go.opentelemetry.io/otel" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + semconv "go.opentelemetry.io/otel/semconv/v1.39.0" ) func TestNewMessagingRepository_InvalidURL(t *testing.T) { @@ -178,6 +183,41 @@ func TestMessagingRepository_ErrorHandling(t *testing.T) { } } +func TestMessagingRepository_Request_CreatesSpan(t *testing.T) { + exporter := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(exporter)) + orig := otel.GetTracerProvider() + otel.SetTracerProvider(tp) + t.Cleanup(func() { otel.SetTracerProvider(orig) }) + + repo := &messagingRepository{conn: nil} + _, err := repo.Request(context.Background(), "test.subject", []byte("data"), 1*time.Second) + if err == nil { + t.Fatal("Expected error with nil connection, got none") + } + + spans := exporter.GetSpans() + if len(spans) != 1 { + t.Fatalf("Expected 1 span, got %d", len(spans)) + } + + span := spans[0] + if span.Name != "nats.request" { + t.Errorf("Expected span name 'nats.request', got %q", span.Name) + } + + attrMap := make(map[string]string) + for _, a := range span.Attributes { + attrMap[string(a.Key)] = a.Value.AsString() + } + if attrMap[string(semconv.MessagingSystemKey)] != "nats" { + t.Errorf("Expected messaging.system=nats, got %q", attrMap[string(semconv.MessagingSystemKey)]) + } + if attrMap[string(semconv.MessagingDestinationNameKey)] != "test.subject" { + t.Errorf("Expected messaging.destination.name=test.subject, got %q", attrMap[string(semconv.MessagingDestinationNameKey)]) + } +} + func TestMessagingRepository_Request_EdgeCases(t *testing.T) { repo := &messagingRepository{conn: nil} ctx := context.Background()