Skip to content

Commit a6e6622

Browse files
authored
chipingress: add RegisterSchemas method and noop client (#1646)
* chipingress: add RegisterSchemas method; add noop client implementation * export chipingress noop client
1 parent fe3b522 commit a6e6622

3 files changed

Lines changed: 220 additions & 0 deletions

File tree

pkg/chipingress/client.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"google.golang.org/grpc/metadata"
1818

1919
ceformat "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"
20+
cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
2021
ce "github.com/cloudevents/sdk-go/v2"
2122

2223
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb"
@@ -27,6 +28,7 @@ import (
2728
type Client interface {
2829
pb.ChipIngressClient
2930
Close() error
31+
RegisterSchemas(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error)
3032
}
3133

3234
type client struct {
@@ -128,6 +130,23 @@ func (c *client) Close() error {
128130
return c.conn.Close()
129131
}
130132

133+
// RegisterSchemas registers one or more schemas with the Chip Ingress service.
134+
func (c *client) RegisterSchemas(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error) {
135+
request := &pb.RegisterSchemaRequest{Schemas: schemas}
136+
137+
resp, err := c.client.RegisterSchema(ctx, request)
138+
if err != nil {
139+
return nil, fmt.Errorf("failed to register schema: %w", err)
140+
}
141+
142+
registeredMap := make(map[string]int)
143+
for _, schema := range resp.Registered {
144+
registeredMap[schema.Subject] = int(schema.Version)
145+
}
146+
147+
return registeredMap, nil
148+
}
149+
131150
// WithBasicAuth sets the basic-auth credentials for the ChipIngress service.
132151
// Default is to require TLS for security.
133152
func WithBasicAuth(user, pass string) Opt {
@@ -281,3 +300,44 @@ func EventsToBatch(events []CloudEvent) (*CloudEventBatch, error) {
281300
}
282301
return batch, nil
283302
}
303+
304+
var _ Client = (*NoopClient)(nil)
305+
306+
// NoopClient is a no-op implementation of the Client interface.
307+
// All methods return successfully without performing any actual operations.
308+
type NoopClient struct{}
309+
310+
// Close is a no-op
311+
func (NoopClient) Close() error {
312+
return nil
313+
}
314+
315+
// Ping is a no-op
316+
func (NoopClient) Ping(ctx context.Context, in *pb.EmptyRequest, opts ...grpc.CallOption) (*pb.PingResponse, error) {
317+
return &pb.PingResponse{Message: "pong"}, nil
318+
}
319+
320+
// Publish is a no-op
321+
func (NoopClient) Publish(ctx context.Context, in *cepb.CloudEvent, opts ...grpc.CallOption) (*pb.PublishResponse, error) {
322+
return &pb.PublishResponse{}, nil
323+
}
324+
325+
// PublishBatch is a no-op
326+
func (NoopClient) PublishBatch(ctx context.Context, in *pb.CloudEventBatch, opts ...grpc.CallOption) (*pb.PublishResponse, error) {
327+
return &pb.PublishResponse{}, nil
328+
}
329+
330+
// StreamEvents is a no-op
331+
func (NoopClient) StreamEvents(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[pb.StreamEventsRequest, pb.StreamEventsResponse], error) {
332+
return nil, nil
333+
}
334+
335+
// RegisterSchema is a no-op
336+
func (NoopClient) RegisterSchema(ctx context.Context, in *pb.RegisterSchemaRequest, opts ...grpc.CallOption) (*pb.RegisterSchemaResponse, error) {
337+
return &pb.RegisterSchemaResponse{}, nil
338+
}
339+
340+
// RegisterSchemas is a no-op
341+
func (NoopClient) RegisterSchemas(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error) {
342+
return make(map[string]int), nil
343+
}

pkg/chipingress/client_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"google.golang.org/grpc/metadata"
1515
"google.golang.org/protobuf/proto"
1616

17+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks"
1718
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb"
1819
)
1920

@@ -56,6 +57,33 @@ func TestClient(t *testing.T) {
5657
assert.NotNil(t, client)
5758
})
5859

60+
t.Run("NoopClient", func(t *testing.T) {
61+
client := &NoopClient{}
62+
assert.NotNil(t, client)
63+
64+
// Test that it implements the Client interface
65+
var _ Client = client
66+
67+
// Test Close returns no error
68+
err := client.Close()
69+
assert.NoError(t, err)
70+
71+
// Test Ping returns success
72+
pingResp, err := client.Ping(context.Background(), &pb.EmptyRequest{})
73+
assert.NoError(t, err)
74+
assert.NotNil(t, pingResp)
75+
assert.Equal(t, "pong", pingResp.Message)
76+
77+
// Test RegisterSchemas returns empty map
78+
schemas := []*pb.Schema{
79+
{Subject: "test", Schema: `{"test":"value"}`, Format: 1},
80+
}
81+
result, err := client.RegisterSchemas(context.Background(), schemas...)
82+
assert.NoError(t, err)
83+
assert.NotNil(t, result)
84+
assert.Empty(t, result)
85+
})
86+
5987
}
6088

6189
func TestNewEvent(t *testing.T) {
@@ -619,3 +647,62 @@ func TestNewClientWithTLS(t *testing.T) {
619647
assert.NotNil(t, client)
620648
}
621649
}
650+
651+
func TestClient_RegisterSchemas(t *testing.T) {
652+
t.Run("successfully registers schemas", func(t *testing.T) {
653+
mockClient := mocks.NewClient(t)
654+
mockClient.EXPECT().RegisterSchema(
655+
context.Background(),
656+
&pb.RegisterSchemaRequest{
657+
Schemas: []*pb.Schema{
658+
{Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1},
659+
{Subject: "schema2", Schema: `{"type":"record","name":"Test2","fields":[{"name":"field2"}]}`, Format: 2},
660+
},
661+
},
662+
).Return(&pb.RegisterSchemaResponse{
663+
Registered: []*pb.RegisteredSchema{
664+
{Subject: "schema1", Version: 1},
665+
{Subject: "schema2", Version: 2},
666+
},
667+
}, nil)
668+
669+
client := &client{
670+
client: mockClient,
671+
conn: nil,
672+
}
673+
674+
schemas := []*pb.Schema{
675+
{Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1},
676+
{Subject: "schema2", Schema: `{"type":"record","name":"Test2","fields":[{"name":"field2"}]}`, Format: 2},
677+
}
678+
679+
result, err := client.RegisterSchemas(context.Background(), schemas...)
680+
assert.NoError(t, err)
681+
assert.Equal(t, map[string]int{"schema1": 1, "schema2": 2}, result)
682+
})
683+
684+
t.Run("returns error when registration fails", func(t *testing.T) {
685+
mockClient := mocks.NewClient(t)
686+
mockClient.EXPECT().RegisterSchema(
687+
context.Background(),
688+
&pb.RegisterSchemaRequest{
689+
Schemas: []*pb.Schema{
690+
{Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1},
691+
},
692+
},
693+
).Return(nil, fmt.Errorf("registration failed"))
694+
695+
client := &client{
696+
client: mockClient,
697+
conn: nil,
698+
}
699+
700+
schemas := []*pb.Schema{
701+
{Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1},
702+
}
703+
704+
result, err := client.RegisterSchemas(context.Background(), schemas...)
705+
assert.Nil(t, result)
706+
assert.EqualError(t, err, "failed to register schema: registration failed")
707+
})
708+
}

pkg/chipingress/mocks/client.go

Lines changed: 73 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)