diff --git a/go.mod b/go.mod index 828e5ad6c2..9fcffb365f 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/scylladb/go-reflectx v1.0.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.67 - github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.1 + github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 diff --git a/go.sum b/go.sum index f34ce7b7b0..81d1a79c16 100644 --- a/go.sum +++ b/go.sum @@ -326,8 +326,8 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/smartcontractkit/chain-selectors v1.0.67 h1:gxTqP/JC40KDe3DE1SIsIKSTKTZEPyEU1YufO1admnw= github.com/smartcontractkit/chain-selectors v1.0.67/go.mod h1:xsKM0aN3YGcQKTPRPDDtPx2l4mlTN1Djmg0VVXV40b8= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.1 h1:ca2z5OXgnbBPQRxpwXwBLJsUA1+cAp5ncfW4Ssvd6eY= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.1/go.mod h1:NZv/qKYGFRnkjOYBouajnDfFoZ+WDa6H2KNmSf1dnKc= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 h1:hvqATtrZ0iMRTI80cpBot/3JFbjz2j+2tvpfooVhRHw= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976 h1:mF3FiDUoV0QbJcks9R2y7ydqntNL1Z0VCPBJgx/Ms+0= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 h1:1/KdO5AbUr3CmpLjMPuJXPo2wHMbfB8mldKLsg7D4M8= diff --git a/pkg/beholder/chip_client.go b/pkg/beholder/chip_client.go new file mode 100644 index 0000000000..bbb8a49bce --- /dev/null +++ b/pkg/beholder/chip_client.go @@ -0,0 +1,45 @@ +package beholder + +import ( + "context" + "fmt" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" +) + +type ChipIngressClient interface { + RegisterSchema(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error) +} + +type chipIngressClient struct { + client chipingress.Client +} + +func NewChipIngressClient(client chipingress.Client) (ChipIngressClient, error) { + if client == nil { + return nil, fmt.Errorf("chip ingress client is nil") + } + + return &chipIngressClient{ + client: client, + }, nil +} + +// RegisterSchema registers one or more schemas with the Chip Ingress service. Returns a map of subject to version for each registered schema. +func (sr *chipIngressClient) RegisterSchema(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error) { + request := &pb.RegisterSchemaRequest{ + Schemas: schemas, + } + + resp, err := sr.client.RegisterSchema(ctx, request) + if err != nil { + return nil, fmt.Errorf("failed to register schema: %w", err) + } + + registeredMap := make(map[string]int) + for _, schema := range resp.Registered { + registeredMap[schema.Subject] = int(schema.Version) + } + + return registeredMap, err +} diff --git a/pkg/beholder/chip_client_test.go b/pkg/beholder/chip_client_test.go new file mode 100644 index 0000000000..52eb307c0d --- /dev/null +++ b/pkg/beholder/chip_client_test.go @@ -0,0 +1,72 @@ +package beholder_test + +import ( + "fmt" + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "testing" +) + +func TestNewChipClient(t *testing.T) { + t.Run("returns error when client is nil", func(t *testing.T) { + registry, err := beholder.NewChipIngressClient(nil) + assert.Nil(t, registry) + assert.EqualError(t, err, "chip ingress client is nil") + }) + + t.Run("returns schema registry when client is valid", func(t *testing.T) { + mockClient := mocks.NewClient(t) + registry, err := beholder.NewChipIngressClient(mockClient) + require.NoError(t, err) + assert.NotNil(t, registry) + }) +} + +func TestRegisterSchema(t *testing.T) { + t.Run("successfully registers schemas", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient. + On("RegisterSchema", mock.Anything, mock.Anything). + Return(&pb.RegisterSchemaResponse{ + Registered: []*pb.RegisteredSchema{ + {Subject: "schema1", Version: 1}, + {Subject: "schema2", Version: 2}, + }, + }, nil) + + registry, err := beholder.NewChipIngressClient(mockClient) + require.NoError(t, err) + + schemas := []*pb.Schema{ + {Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1}, + {Subject: "schema2", Schema: `{"type":"record","name":"Test2","fields":[{"name":"field2"}]}`, Format: 2}, + } + + result, err := registry.RegisterSchema(t.Context(), schemas...) + require.NoError(t, err) + assert.Equal(t, map[string]int{"schema1": 1, "schema2": 2}, result) + }) + + t.Run("returns error when registration fails", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient. + On("RegisterSchema", mock.Anything, mock.Anything). + Return(nil, fmt.Errorf("registration failed")) + + registry, err := beholder.NewChipIngressClient(mockClient) + require.NoError(t, err) + + schemas := []*pb.Schema{ + {Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1}, + } + + result, err := registry.RegisterSchema(t.Context(), schemas...) + assert.Nil(t, result) + assert.EqualError(t, err, "failed to register schema: registration failed") + }) +} diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index dfbf30a743..f986e787cc 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -36,6 +36,8 @@ type Client struct { Meter otelmetric.Meter // Message Emitter Emitter Emitter + // Chip + Chip ChipIngressClient // Providers LoggerProvider otellog.LoggerProvider @@ -213,10 +215,11 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro // This will eventually be removed in favor of chip-ingress emitter // and logs will be sent via OTLP using the regular Logger instead of calling Emit emitter := NewMessageEmitter(messageLogger) + var chipIngressClient chipingress.Client + // if chip ingress is enabled, create dual source emitter that sends to both otel collector and chip ingress // eventually we will remove the dual source emitter and just use chip ingress - - if cfg.ChipIngressEmitterEnabled { + if cfg.ChipIngressEmitterEnabled || cfg.ChipIngressEmitterGRPCEndpoint != "" { chipIngressOpts := make([]chipingress.Opt, 0, 2) if cfg.ChipIngressInsecureConnection { @@ -231,7 +234,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro chipIngressOpts = append(chipIngressOpts, chipingress.WithTokenAuth(headerProvider)) } - chipIngressClient, err := chipingress.NewClient( + chipIngressClient, err = chipingress.NewClient( cfg.ChipIngressEmitterGRPCEndpoint, chipIngressOpts..., ) @@ -251,13 +254,22 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro } } + // Create interface/wrapper to chip-ingress for schema registry + var chip ChipIngressClient + if chipIngressClient != nil { + chip, err = NewChipIngressClient(chipIngressClient) + if err != nil { + return nil, fmt.Errorf("failed to create interface to chip ingress: %w", err) + } + } + onClose := func() (err error) { for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} { err = errors.Join(err, provider.Shutdown(context.Background())) } return } - return &Client{cfg, logger, tracer, meter, emitter, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil + return &Client{cfg, logger, tracer, meter, emitter, chip, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil } // Closes all providers, flushes all data and stops all background processes diff --git a/pkg/beholder/client_test.go b/pkg/beholder/client_test.go index ed4d5547a9..969117a41f 100644 --- a/pkg/beholder/client_test.go +++ b/pkg/beholder/client_test.go @@ -480,6 +480,42 @@ func TestNewGRPCClient_ChipIngressEmitter(t *testing.T) { }) } +func TestNewClient_Chip(t *testing.T) { + t.Run("chip interface available with chip-ingress endpoint provided", func(t *testing.T) { + client, err := beholder.NewClient(beholder.Config{ + OtelExporterGRPCEndpoint: "grpc-endpoint", + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "chip-ingress.example.com:9090", + ChipIngressInsecureConnection: false, + }) + require.NoError(t, err) + assert.NotNil(t, client) + assert.NotNil(t, client.Chip) + }) + + t.Run("chip interface can be enabled when chip ingress dual emitter is not enabled ", func(t *testing.T) { + client, err := beholder.NewClient(beholder.Config{ + OtelExporterGRPCEndpoint: "grpc-endpoint", + ChipIngressEmitterEnabled: false, + ChipIngressEmitterGRPCEndpoint: "chip-ingress.example.com:9090", + ChipIngressInsecureConnection: false, + }) + require.NoError(t, err) + assert.NotNil(t, client) + assert.NotNil(t, client.Chip) + assert.NotNil(t, client.Emitter) + }) + + t.Run("chip interface is nil when chip ingress config is missing", func(t *testing.T) { + client, err := beholder.NewClient(beholder.Config{ + OtelExporterGRPCEndpoint: "grpc-endpoint", + ChipIngressEmitterEnabled: true, + }) + require.Error(t, err) + assert.Nil(t, client) + }) +} + // mockLogExporter is a no-op exporter for testing purposes. type mockLogExporter struct{} diff --git a/pkg/beholder/httpclient.go b/pkg/beholder/httpclient.go index 6af8d13a43..15b98faacf 100644 --- a/pkg/beholder/httpclient.go +++ b/pkg/beholder/httpclient.go @@ -186,7 +186,7 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro } return } - return &Client{cfg, logger, tracer, meter, emitter, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil + return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil } func newHTTPTracerProvider(config Config, resource *sdkresource.Resource, tlsConfig *tls.Config) (*sdktrace.TracerProvider, error) { diff --git a/pkg/beholder/noop.go b/pkg/beholder/noop.go index 14226001f0..82e10ce847 100644 --- a/pkg/beholder/noop.go +++ b/pkg/beholder/noop.go @@ -35,7 +35,7 @@ func NewNoopClient() *Client { // MessageEmitter messageEmitter := noopMessageEmitter{} - return &Client{cfg, logger, tracer, meter, messageEmitter, loggerProvider, tracerProvider, meterProvider, loggerProvider, noopOnClose} + return &Client{cfg, logger, tracer, meter, messageEmitter, nil, loggerProvider, tracerProvider, meterProvider, loggerProvider, noopOnClose} } // NewStdoutClient creates a new Client with exporters which send telemetry data to standard output diff --git a/pkg/loop/server.go b/pkg/loop/server.go index b3e9fd8ff9..4523e48a88 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -16,8 +16,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/services/otelhealth" - "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-common/pkg/services/promhealth" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" )