Skip to content
16 changes: 8 additions & 8 deletions go.mod
Comment thread
github-license-compliance[bot] marked this conversation as resolved.
Fixed
Comment thread
bramwelt marked this conversation as resolved.
Comment thread
github-license-compliance[bot] marked this conversation as resolved.
Fixed
Comment thread
github-license-compliance[bot] marked this conversation as resolved.
Fixed
Comment thread
github-license-compliance[bot] marked this conversation as resolved.
Fixed
Comment thread
github-license-compliance[bot] marked this conversation as resolved.
Fixed
Comment thread
github-license-compliance[bot] marked this conversation as resolved.
Fixed
Comment thread
github-license-compliance[bot] marked this conversation as resolved.
Fixed
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ require (
github.com/auth0/go-jwt-middleware/v2 v2.2.2
github.com/nats-io/nats.go v1.37.0
github.com/remychantenay/slog-otel v1.3.4
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0
go.opentelemetry.io/contrib/propagators/jaeger v1.40.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.66.0
go.opentelemetry.io/contrib/propagators/jaeger v1.41.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.16.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.16.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0
go.opentelemetry.io/otel/log v0.16.0
go.opentelemetry.io/otel/sdk v1.40.0
go.opentelemetry.io/otel/sdk v1.41.0
Comment thread
bramwelt marked this conversation as resolved.
go.opentelemetry.io/otel/sdk/log v0.16.0
go.opentelemetry.io/otel/sdk/metric v1.40.0
go.opentelemetry.io/otel/sdk/metric v1.41.0
Comment on lines 20 to +23
goa.design/goa/v3 v3.25.3
)

Expand All @@ -33,7 +33,7 @@ require (
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.40.0 // indirect
go.opentelemetry.io/otel/metric v1.40.0 // indirect
go.opentelemetry.io/otel/metric v1.41.0 // indirect
Comment on lines 33 to +36
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
)
Expand All @@ -53,14 +53,14 @@ require (
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel v1.40.0
go.opentelemetry.io/otel/trace v1.40.0
go.opentelemetry.io/otel v1.41.0
go.opentelemetry.io/otel/trace v1.41.0
goa.design/clue v1.2.1
golang.org/x/crypto v0.47.0 // indirect
golang.org/x/mod v0.32.0 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/term v0.39.0 // indirect
golang.org/x/text v0.33.0 // indirect
golang.org/x/tools v0.41.0 // indirect
Expand Down
32 changes: 16 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 h1:7iP2uCb7sGddAr30RRS6xjKy7AZ2JtTOPA3oolgVSw8=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0/go.mod h1:c7hN3ddxs/z6q9xwvfLPk+UHlWRQyaeR1LdgfL/66l0=
go.opentelemetry.io/contrib/propagators/jaeger v1.40.0 h1:aXl9uobjJs5vquMLt9ZkI/3zIuz8XQ3TqOKSWx0/xdU=
go.opentelemetry.io/contrib/propagators/jaeger v1.40.0/go.mod h1:ioMePqe6k6c/ovXSkmkMr1mbN5qRBGJxNTVop7/2XO0=
go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms=
go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.66.0 h1:PnV4kVnw0zOmwwFkAzCN5O07fw1YOIQor120zrh0AVo=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.66.0/go.mod h1:ofAwF4uinaf8SXdVzzbL4OsxJ3VfeEg3f/F6CeF49/Y=
go.opentelemetry.io/contrib/propagators/jaeger v1.41.0 h1:uw+ghxLS0Wb98XfAhgJVQvAmmNJzT7jE30wJ4sKBrWs=
go.opentelemetry.io/contrib/propagators/jaeger v1.41.0/go.mod h1:Og4snN98wfeGoKm516W5+v0orAqwRRB5TAOJetk/HdM=
go.opentelemetry.io/otel v1.41.0 h1:YlEwVsGAlCvczDILpUXpIpPSL/VPugt7zHThEMLce1c=
go.opentelemetry.io/otel v1.41.0/go.mod h1:Yt4UwgEKeT05QbLwbyHXEwhnjxNO6D8L5PQP51/46dE=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.16.0 h1:ZVg+kCXxd9LtAaQNKBxAvJ5NpMf7LpvEr4MIZqb0TMQ=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.16.0/go.mod h1:hh0tMeZ75CCXrHd9OXRYxTlCAdxcXioWHFIpYw2rZu8=
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.16.0 h1:djrxvDxAe44mJUrKataUbOhCKhR3F8QCyWucO16hTQs=
Expand All @@ -81,18 +81,18 @@ go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.40.0 h1:MzfofMZN8ulNqob
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.40.0/go.mod h1:E73G9UFtKRXrxhBsHtG00TB5WxX57lpsQzogDkqBTz8=
go.opentelemetry.io/otel/log v0.16.0 h1:DeuBPqCi6pQwtCK0pO4fvMB5eBq6sNxEnuTs88pjsN4=
go.opentelemetry.io/otel/log v0.16.0/go.mod h1:rWsmqNVTLIA8UnwYVOItjyEZDbKIkMxdQunsIhpUMes=
go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g=
go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc=
go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8=
go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE=
go.opentelemetry.io/otel/metric v1.41.0 h1:rFnDcs4gRzBcsO9tS8LCpgR0dxg4aaxWlJxCno7JlTQ=
go.opentelemetry.io/otel/metric v1.41.0/go.mod h1:xPvCwd9pU0VN8tPZYzDZV/BMj9CM9vs00GuBjeKhJps=
go.opentelemetry.io/otel/sdk v1.41.0 h1:YPIEXKmiAwkGl3Gu1huk1aYWwtpRLeskpV+wPisxBp8=
go.opentelemetry.io/otel/sdk v1.41.0/go.mod h1:ahFdU0G5y8IxglBf0QBJXgSe7agzjE4GiTJ6HT9ud90=
go.opentelemetry.io/otel/sdk/log v0.16.0 h1:e/b4bdlQwC5fnGtG3dlXUrNOnP7c8YLVSpSfEBIkTnI=
go.opentelemetry.io/otel/sdk/log v0.16.0/go.mod h1:JKfP3T6ycy7QEuv3Hj8oKDy7KItrEkus8XJE6EoSzw4=
go.opentelemetry.io/otel/sdk/log/logtest v0.16.0 h1:/XVkpZ41rVRTP4DfMgYv1nEtNmf65XPPyAdqV90TMy4=
go.opentelemetry.io/otel/sdk/log/logtest v0.16.0/go.mod h1:iOOPgQr5MY9oac/F5W86mXdeyWZGleIx3uXO98X2R6Y=
go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw=
go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg=
go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw=
go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA=
go.opentelemetry.io/otel/sdk/metric v1.41.0 h1:siZQIYBAUd1rlIWQT2uCxWJxcCO7q3TriaMlf08rXw8=
go.opentelemetry.io/otel/sdk/metric v1.41.0/go.mod h1:HNBuSvT7ROaGtGI50ArdRLUnvRTRGniSUZbxiWxSO8Y=
go.opentelemetry.io/otel/trace v1.41.0 h1:Vbk2co6bhj8L59ZJ6/xFTskY+tGAbOnCtQGVVa9TIN0=
go.opentelemetry.io/otel/trace v1.41.0/go.mod h1:U1NU4ULCoxeDKc09yCWdWe+3QoyweJcISEVa1RBzOis=
go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A=
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
Expand All @@ -109,8 +109,8 @@ golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY=
golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
Expand Down
88 changes: 87 additions & 1 deletion internal/infrastructure/messaging/messaging_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,50 @@ import (
"github.com/linuxfoundation/lfx-v2-access-check/internal/domain/contracts"
"github.com/linuxfoundation/lfx-v2-access-check/pkg/constants"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.39.0"
"go.opentelemetry.io/otel/trace"
)

// tracer is safe to initialize at package level. otel.Tracer() returns a
// delegating tracer that forwards to whatever TracerProvider is registered at
// call time, so otel.SetTracerProvider() updates it regardless of init order.
var tracer = otel.Tracer("github.com/linuxfoundation/lfx-v2-access-check/internal/infrastructure/messaging")

// messagingReplyBodySizeKey records the size of the NATS reply payload.
// Named to distinguish it from messaging.message.body.size (the outbound request).
// There is no semconv standard attribute for reply size yet.
const messagingReplyBodySizeKey = attribute.Key("messaging.message.reply.body.size")

// natsHeaderCarrier adapts nats.Header to the OTel TextMapCarrier interface
// so trace context can be injected into and extracted from NATS message headers.
type natsHeaderCarrier nats.Header

func (c natsHeaderCarrier) Get(key string) string {
vals := c[key]
if len(vals) == 0 {
return ""
}
return vals[0]
}

func (c natsHeaderCarrier) Set(key string, value string) {
c[key] = []string{value}
}

func (c natsHeaderCarrier) Keys() []string {
keys := make([]string, 0, len(c))
for k := range c {
keys = append(keys, k)
}
return keys
}

var _ propagation.TextMapCarrier = natsHeaderCarrier{}

type messagingRepository struct {
conn *nats.Conn
}
Expand Down Expand Up @@ -57,14 +99,58 @@ func NewMessagingRepository(natsURL string) (contracts.MessagingRepository, erro

// Request sends a request message to the specified subject and waits for a response
func (r *messagingRepository) Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error) {
ctx, span := tracer.Start(ctx, "nats.request",
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(
semconv.MessagingSystemKey.String("nats"),
semconv.MessagingOperationTypeSend,
semconv.MessagingDestinationName(subject),
semconv.MessagingMessageBodySize(len(data)),
),
)
defer span.End()

if r.conn == nil {
span.RecordError(constants.ErrNATSConnNotInit)
span.SetStatus(codes.Error, constants.ErrNATSConnNotInit.Error())
return nil, constants.ErrNATSConnNotInit
}

msg, err := r.conn.Request(subject, data, timeout)
// Short-circuit immediately if the context is already canceled or expired.
if err := ctx.Err(); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

// Clamp timeout to the ctx deadline if it is shorter.
if deadline, ok := ctx.Deadline(); ok {
if remaining := time.Until(deadline); remaining < timeout {
timeout = remaining
}
}
// Short-circuit if timeout is zero or negative — either the caller passed
// an invalid duration or the ctx deadline was already past.
if timeout <= 0 {
span.RecordError(context.DeadlineExceeded)
span.SetStatus(codes.Error, context.DeadlineExceeded.Error())
return nil, context.DeadlineExceeded
}

// nats.NewMsg already initializes Header, so no extra make() needed.
natsMsg := nats.NewMsg(subject)
natsMsg.Data = data
otel.GetTextMapPropagator().Inject(ctx, natsHeaderCarrier(natsMsg.Header))
Comment thread
bramwelt marked this conversation as resolved.

msg, err := r.conn.RequestMsg(natsMsg, timeout)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, constants.ErrMsgNATSRequestFailed)
return nil, fmt.Errorf("%s: %w", constants.ErrMsgNATSRequestFailed, err)
}

span.SetAttributes(messagingReplyBodySizeKey.Int(len(msg.Data)))
span.SetStatus(codes.Ok, "")
return msg.Data, nil
}

Expand Down
40 changes: 40 additions & 0 deletions internal/infrastructure/messaging/messaging_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"strings"
"testing"
"time"

"go.opentelemetry.io/otel"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.39.0"
)

func TestNewMessagingRepository_InvalidURL(t *testing.T) {
Expand Down Expand Up @@ -178,6 +183,41 @@ func TestMessagingRepository_ErrorHandling(t *testing.T) {
}
}

func TestMessagingRepository_Request_CreatesSpan(t *testing.T) {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(exporter))
orig := otel.GetTracerProvider()
otel.SetTracerProvider(tp)
t.Cleanup(func() { otel.SetTracerProvider(orig) })

repo := &messagingRepository{conn: nil}
_, err := repo.Request(context.Background(), "test.subject", []byte("data"), 1*time.Second)
if err == nil {
t.Fatal("Expected error with nil connection, got none")
}

spans := exporter.GetSpans()
if len(spans) != 1 {
t.Fatalf("Expected 1 span, got %d", len(spans))
}

span := spans[0]
if span.Name != "nats.request" {
t.Errorf("Expected span name 'nats.request', got %q", span.Name)
}

attrMap := make(map[string]string)
for _, a := range span.Attributes {
attrMap[string(a.Key)] = a.Value.AsString()
}
if attrMap[string(semconv.MessagingSystemKey)] != "nats" {
t.Errorf("Expected messaging.system=nats, got %q", attrMap[string(semconv.MessagingSystemKey)])
}
if attrMap[string(semconv.MessagingDestinationNameKey)] != "test.subject" {
t.Errorf("Expected messaging.destination.name=test.subject, got %q", attrMap[string(semconv.MessagingDestinationNameKey)])
}
}

func TestMessagingRepository_Request_EdgeCases(t *testing.T) {
repo := &messagingRepository{conn: nil}
ctx := context.Background()
Expand Down
Loading