Skip to content

Commit e9826d4

Browse files
[BCF-2728] Add telemetry to LOOPP deps (#205)
1 parent 48c9bf5 commit e9826d4

15 files changed

Lines changed: 220 additions & 254 deletions

pkg/loop/internal/pb/reporting_plugin_service.pb.go

Lines changed: 34 additions & 24 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/loop/internal/pb/reporting_plugin_service.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ message NewReportingPluginFactoryRequest {
2020
uint32 providerID = 1;
2121
uint32 errorLogID = 2;
2222
uint32 pipelineRunnerID = 3;
23-
ReportingPluginServiceConfig ReportingPluginServiceConfig = 4;
23+
uint32 telemetryID = 4;
24+
ReportingPluginServiceConfig ReportingPluginServiceConfig = 5;
2425
}
2526

2627
// NewReportingPluginFactoryReply has return arguments for [github.com/smartcontractkit/chainlink-relay/pkg/loop/reporting_plugins/LOOPPService.NewReportingPluginFactory].

pkg/loop/internal/pb/telemetry.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/loop/internal/pb/telemetry_grpc.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/loop/internal/reporting_plugin_service.go

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,29 +26,44 @@ func NewReportingPluginServiceClient(broker Broker, brokerCfg BrokerConfig, conn
2626
return &ReportingPluginServiceClient{pluginClient: pc, reportingPluginService: pb.NewReportingPluginServiceClient(pc), serviceClient: newServiceClient(pc.brokerExt, pc)}
2727
}
2828

29-
func (m *ReportingPluginServiceClient) NewReportingPluginFactory(ctx context.Context, config types.ReportingPluginServiceConfig, grpcProvider grpc.ClientConnInterface, pipelineRunner types.PipelineRunnerService, errorLog types.ErrorLog) (types.ReportingPluginFactory, error) {
29+
func (m *ReportingPluginServiceClient) NewReportingPluginFactory(
30+
ctx context.Context,
31+
config types.ReportingPluginServiceConfig,
32+
grpcProvider grpc.ClientConnInterface,
33+
pipelineRunner types.PipelineRunnerService,
34+
telemetry types.TelemetryClient,
35+
errorLog types.ErrorLog,
36+
) (types.ReportingPluginFactory, error) {
3037
cc := m.newClientConn("ReportingPluginServiceFactory", func(ctx context.Context) (id uint32, deps resources, err error) {
3138
providerID, providerRes, err := m.serve("PluginProvider", proxy.NewProxy(grpcProvider))
3239
if err != nil {
3340
return 0, nil, err
3441
}
3542
deps.Add(providerRes)
3643

37-
errorLogID, errorLogRes, err := m.serveNew("ErrorLog", func(s *grpc.Server) {
38-
pb.RegisterErrorLogServer(s, &errorLogServer{impl: errorLog})
44+
pipelineRunnerID, pipelineRunnerRes, err := m.serveNew("PipelineRunner", func(s *grpc.Server) {
45+
pb.RegisterPipelineRunnerServiceServer(s, &pipelineRunnerServiceServer{impl: pipelineRunner})
3946
})
4047
if err != nil {
4148
return 0, nil, err
4249
}
43-
deps.Add(errorLogRes)
50+
deps.Add(pipelineRunnerRes)
4451

45-
pipelineRunnerID, pipelineRunnerRes, err := m.serveNew("PipelineRunner", func(s *grpc.Server) {
46-
pb.RegisterPipelineRunnerServiceServer(s, &pipelineRunnerServiceServer{impl: pipelineRunner})
52+
telemetryID, telemetryRes, err := m.serveNew("Telemetry", func(s *grpc.Server) {
53+
pb.RegisterTelemetryServer(s, NewTelemetryServer(telemetry))
4754
})
4855
if err != nil {
4956
return 0, nil, err
5057
}
51-
deps.Add(pipelineRunnerRes)
58+
deps.Add(telemetryRes)
59+
60+
errorLogID, errorLogRes, err := m.serveNew("ErrorLog", func(s *grpc.Server) {
61+
pb.RegisterErrorLogServer(s, &errorLogServer{impl: errorLog})
62+
})
63+
if err != nil {
64+
return 0, nil, err
65+
}
66+
deps.Add(errorLogRes)
5267

5368
reply, err := m.reportingPluginService.NewReportingPluginFactory(ctx, &pb.NewReportingPluginFactoryRequest{
5469
ReportingPluginServiceConfig: &pb.ReportingPluginServiceConfig{
@@ -60,6 +75,7 @@ func (m *ReportingPluginServiceClient) NewReportingPluginFactory(ctx context.Con
6075
ProviderID: providerID,
6176
ErrorLogID: errorLogID,
6277
PipelineRunnerID: pipelineRunnerID,
78+
TelemetryID: telemetryID,
6379
})
6480
if err != nil {
6581
return 0, nil, err
@@ -110,23 +126,31 @@ func (m *reportingPluginServiceServer) NewReportingPluginFactory(ctx context.Con
110126
pipelineRunnerRes := resource{pipelineRunnerConn, "PipelineRunner"}
111127
pipelineRunner := newPipelineRunnerClient(pipelineRunnerConn)
112128

129+
telemetryConn, err := m.dial(request.TelemetryID)
130+
if err != nil {
131+
m.closeAll(errorLogRes, providerRes, pipelineRunnerRes)
132+
return nil, ErrConnDial{Name: "Telemetry", ID: request.TelemetryID, Err: err}
133+
}
134+
telemetryRes := resource{telemetryConn, "Telemetry"}
135+
telemetry := NewTelemetryClient(telemetryConn)
136+
113137
config := types.ReportingPluginServiceConfig{
114138
ProviderType: request.ReportingPluginServiceConfig.ProviderType,
115139
PluginConfig: request.ReportingPluginServiceConfig.PluginConfig,
116140
PluginName: request.ReportingPluginServiceConfig.PluginName,
117141
Command: request.ReportingPluginServiceConfig.Command,
118142
}
119143

120-
factory, err := m.impl.NewReportingPluginFactory(ctx, config, providerConn, pipelineRunner, errorLog)
144+
factory, err := m.impl.NewReportingPluginFactory(ctx, config, providerConn, pipelineRunner, telemetry, errorLog)
121145
if err != nil {
122-
m.closeAll(providerRes, errorLogRes, pipelineRunnerRes)
146+
m.closeAll(providerRes, errorLogRes, pipelineRunnerRes, telemetryRes)
123147
return nil, err
124148
}
125149

126150
id, _, err := m.serveNew("ReportingPluginProvider", func(s *grpc.Server) {
127151
pb.RegisterServiceServer(s, &serviceServer{srv: factory})
128152
pb.RegisterReportingPluginFactoryServer(s, newReportingPluginFactoryServer(factory, m.brokerExt))
129-
}, providerRes, errorLogRes, pipelineRunnerRes)
153+
}, providerRes, errorLogRes, pipelineRunnerRes, telemetryRes)
130154
if err != nil {
131155
return nil, err
132156
}

pkg/loop/internal/telemetry.go

Lines changed: 22 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -3,70 +3,56 @@ package internal
33
import (
44
"context"
55
"errors"
6-
"fmt"
76

8-
"github.com/smartcontractkit/libocr/commontypes"
97
"google.golang.org/grpc"
108
"google.golang.org/protobuf/types/known/emptypb"
119

12-
"github.com/smartcontractkit/chainlink-relay/pkg/logger"
1310
"github.com/smartcontractkit/chainlink-relay/pkg/loop/internal/pb"
1411
"github.com/smartcontractkit/chainlink-relay/pkg/types"
1512
)
1613

17-
var _ types.Telemetry = (*telemetryClient)(nil)
18-
var _ types.MonitoringEndpointGenerator = (*telemetryClient)(nil)
19-
var _ commontypes.MonitoringEndpoint = (*telemetryEndpoint)(nil)
14+
var _ types.TelemetryClient = (*telemetryClient)(nil)
2015

2116
type TelemetryClient struct {
2217
*telemetryClient
2318
}
2419

2520
type telemetryClient struct {
2621
grpc pb.TelemetryClient
27-
28-
lggr logger.Logger
2922
}
3023

3124
type telemetryEndpoint struct {
32-
lggr logger.Logger
33-
3425
grpc pb.TelemetryClient
3526
relayID pb.RelayID
3627
contractID string
3728
telemetryType string
3829
}
3930

40-
func (t *telemetryEndpoint) SendLog(log []byte) {
41-
_, err := t.grpc.Send(context.Background(), &pb.TelemetryMessage{
31+
func (t *telemetryEndpoint) SendLog(ctx context.Context, log []byte) error {
32+
_, err := t.grpc.Send(ctx, &pb.TelemetryMessage{
4233
RelayID: &t.relayID,
4334
ContractID: t.contractID,
4435
TelemetryType: t.telemetryType,
4536
Payload: log,
4637
})
47-
if err != nil {
48-
t.lggr.Errorw("cannot send telemetry", "err", err)
49-
}
38+
return err
5039
}
5140

52-
// GenMonitoringEndpoint generates a new monitoring endpoint, returns nil if one cannot be generated
53-
func (t *telemetryClient) GenMonitoringEndpoint(network string, chainID string, contractID string, telemetryType string) commontypes.MonitoringEndpoint {
41+
// NewEndpoint generates a new monitoring endpoint, returns nil if one cannot be generated
42+
func (t *telemetryClient) NewEndpoint(ctx context.Context, network string, chainID string, contractID string, telemetryType string) (types.TelemetryClientEndpoint, error) {
5443
if contractID == "" {
55-
t.lggr.Errorw("cannot generate monitoring endpoint, contractID is empty", "contractID", contractID, "telemetryType", telemetryType, "network", network, "chainID", chainID)
56-
return nil
44+
return nil, errors.New("contractID cannot be empty")
5745
}
5846
if telemetryType == "" {
59-
t.lggr.Errorw("cannot generate monitoring endpoint, telemetryType is empty", "contractID", contractID, "telemetryType", telemetryType, "network", network, "chainID", chainID)
60-
return nil
47+
return nil, errors.New("telemetryType cannot be empty")
6148
}
6249
if network == "" {
63-
t.lggr.Errorw("cannot generate monitoring endpoint, network is empty", "contractID", contractID, "telemetryType", telemetryType, "network", network, "chainID", chainID)
64-
return nil
50+
return nil, errors.New("network cannot be empty")
6551
}
6652
if chainID == "" {
67-
t.lggr.Errorw("cannot generate monitoring endpoint, chainID is empty", "contractID", contractID, "telemetryType", telemetryType, "network", network, "chainID", chainID)
68-
return nil
53+
return nil, errors.New("chainId cannot be empty")
6954
}
55+
7056
return &telemetryEndpoint{
7157
grpc: t.grpc,
7258
relayID: pb.RelayID{
@@ -75,8 +61,7 @@ func (t *telemetryClient) GenMonitoringEndpoint(network string, chainID string,
7561
},
7662
contractID: contractID,
7763
telemetryType: telemetryType,
78-
lggr: t.lggr,
79-
}
64+
}, nil
8065
}
8166

8267
// Send sends payload to the desired endpoint based on network and chainID
@@ -111,58 +96,28 @@ func (t *telemetryClient) Send(ctx context.Context, network string, chainID stri
11196
return nil
11297
}
11398

114-
func NewTelemetryClient(cc grpc.ClientConnInterface, lggr logger.Logger) *telemetryClient {
115-
return &telemetryClient{grpc: pb.NewTelemetryClient(cc), lggr: lggr}
99+
func NewTelemetryClient(cc grpc.ClientConnInterface) *telemetryClient {
100+
return &telemetryClient{grpc: pb.NewTelemetryClient(cc)}
116101
}
117102

118103
var _ pb.TelemetryServer = (*telemetryServer)(nil)
119104

120105
type telemetryServer struct {
121106
pb.UnimplementedTelemetryServer
122107

123-
impl types.MonitoringEndpointGenerator
124-
endpoints map[string]commontypes.MonitoringEndpoint
108+
impl types.TelemetryService
125109
}
126110

127111
func (t *telemetryServer) Send(ctx context.Context, message *pb.TelemetryMessage) (*emptypb.Empty, error) {
128-
e, err := t.getOrCreateEndpoint(message)
129-
if err != nil {
130-
return nil, err
131-
}
132-
e.SendLog(message.Payload)
133-
134-
return nil, nil
135-
}
136-
137-
func (t *telemetryServer) getOrCreateEndpoint(m *pb.TelemetryMessage) (commontypes.MonitoringEndpoint, error) {
138-
if m.ContractID == "" {
139-
return nil, errors.New("contractID cannot be empty")
140-
}
141-
if m.TelemetryType == "" {
142-
return nil, errors.New("telemetryType cannot be empty")
143-
}
144-
if m.RelayID == nil {
145-
return nil, errors.New("RelayID cannot be nil")
112+
var network, chainID string
113+
if message.RelayID != nil {
114+
network = message.RelayID.Network
115+
chainID = message.RelayID.ChainId
146116
}
147-
if m.RelayID.Network == "" {
148-
return nil, errors.New("RelayID.Network cannot be empty")
149-
}
150-
if m.RelayID.ChainId == "" {
151-
return nil, errors.New("RelayID.ChainId cannot be empty")
152-
}
153-
154-
key := makeKey(m)
155-
e, ok := t.endpoints[key]
156-
if !ok {
157-
e = t.impl.GenMonitoringEndpoint(m.RelayID.Network, m.RelayID.ChainId, m.ContractID, m.TelemetryType)
158-
}
159-
return e, nil
160-
}
161-
162-
func makeKey(m *pb.TelemetryMessage) string {
163-
return fmt.Sprintf("%s_%s_%s_%s", m.RelayID.Network, m.RelayID.ChainId, m.ContractID, m.TelemetryType)
117+
err := t.impl.Send(ctx, network, chainID, message.ContractID, message.TelemetryType, message.Payload)
118+
return &emptypb.Empty{}, err
164119
}
165120

166-
func NewTelemetryServer(impl types.MonitoringEndpointGenerator) *telemetryServer {
121+
func NewTelemetryServer(impl types.TelemetryService) *telemetryServer {
167122
return &telemetryServer{impl: impl}
168123
}

0 commit comments

Comments
 (0)