Skip to content

Commit 8651b60

Browse files
TriggerEventStore LOOP gRPC Server (#1888)
* Add protos * Create event_store_test.go * Fix panic on closed channel * Update base_trigger.go
1 parent d2349cf commit 8651b60

11 files changed

Lines changed: 1465 additions & 34 deletions

File tree

pkg/capabilities/base_trigger.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -343,12 +343,27 @@ func (b *BaseTriggerCapability[T]) trySend(event PendingEvent) {
343343
Id: event.EventId,
344344
}
345345

346+
if !safeSend(sendCh, wrapped) {
347+
b.metrics.IncInboxFull(event.TriggerId)
348+
b.lggr.Warnf("inbox full or closed for trigger %s", event.TriggerId)
349+
return
350+
}
351+
352+
b.lggr.Infof("event dispatched: capability =%s trigger=%s event=%s attempt=%d",
353+
b.capabilityId, event.TriggerId, event.EventId, attempts)
354+
}
355+
356+
func safeSend[T any](ch chan<- T, val T) (sent bool) {
357+
defer func() {
358+
if recover() != nil {
359+
sent = false
360+
}
361+
}()
362+
346363
select {
347-
case sendCh <- wrapped:
348-
b.lggr.Infof("event dispatched: capability =%s trigger=%s event=%s attempt=%d",
349-
b.capabilityId, event.TriggerId, event.EventId, attempts)
364+
case ch <- val:
365+
return true
350366
default:
351-
b.metrics.IncInboxFull(event.TriggerId)
352-
b.lggr.Warnf("inbox full for trigger %s", event.TriggerId)
367+
return false
353368
}
354369
}

pkg/capabilities/pb/capabilities.pb.go

Lines changed: 26 additions & 17 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/capabilities/pb/capabilities.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ message InitialiseRequest {
182182
uint32 keystore_id = 10;
183183
uint32 org_resolver_id = 11;
184184
uint32 cre_settings_id = 12;
185+
uint32 trigger_event_store_id = 13;
185186
}
186187

187188
message CapabilityInfosReply {

pkg/loop/internal/core/services/capability/standard/standard_capabilities.go

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/smartcontractkit/chainlink-common/pkg/logger"
1414
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/capability"
1515
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/errorlog"
16+
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/eventstore"
1617
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/gateway"
1718
keystoreservice "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/keystore"
1819
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/keyvalue"
@@ -196,19 +197,34 @@ func (c *StandardCapabilitiesClient) Initialise(ctx context.Context, dependencie
196197
resources = append(resources, creSettingsRes)
197198
}
198199

200+
triggerEventStore := dependencies.TriggerEventStore
201+
var triggerEventStoreID uint32
202+
if triggerEventStore != nil {
203+
var triggerEventStoreRes net.Resource
204+
triggerEventStoreID, triggerEventStoreRes, err = c.ServeNew("TriggerEventStore", func(s *grpc.Server) {
205+
pb.RegisterEventStoreServer(s, eventstore.NewServer(triggerEventStore))
206+
})
207+
if err != nil {
208+
c.CloseAll(resources...)
209+
return fmt.Errorf("failed to serve trigger event store: %w", err)
210+
}
211+
resources = append(resources, triggerEventStoreRes)
212+
}
213+
199214
_, err = c.StandardCapabilitiesClient.Initialise(ctx, &capabilitiespb.InitialiseRequest{
200-
Config: config,
201-
ErrorLogId: errorLogID,
202-
PipelineRunnerId: pipelineRunnerID,
203-
TelemetryId: telemetryID,
204-
CapRegistryId: capabilitiesRegistryID,
205-
KeyValueStoreId: keyValueStoreID,
206-
RelayerSetId: relayerSetID,
207-
OracleFactoryId: oracleFactoryID,
208-
GatewayConnectorId: gatewayConnectorID,
209-
KeystoreId: keyStoreID,
210-
OrgResolverId: orgResolverID,
211-
CreSettingsId: creSettingsID,
215+
Config: config,
216+
ErrorLogId: errorLogID,
217+
PipelineRunnerId: pipelineRunnerID,
218+
TelemetryId: telemetryID,
219+
CapRegistryId: capabilitiesRegistryID,
220+
KeyValueStoreId: keyValueStoreID,
221+
RelayerSetId: relayerSetID,
222+
OracleFactoryId: oracleFactoryID,
223+
GatewayConnectorId: gatewayConnectorID,
224+
KeystoreId: keyStoreID,
225+
OrgResolverId: orgResolverID,
226+
CreSettingsId: creSettingsID,
227+
TriggerEventStoreId: triggerEventStoreID,
212228
})
213229

214230
if err != nil {
@@ -375,6 +391,17 @@ func (s *standardCapabilitiesServer) Initialise(ctx context.Context, request *ca
375391
creSettings = settings.NewClient(s.Logger, creSettingsConn)
376392
}
377393

394+
var triggerEventStoreClient capabilities.EventStore
395+
if request.TriggerEventStoreId > 0 {
396+
triggerEventStoreConn, err := s.Dial(request.TriggerEventStoreId)
397+
if err != nil {
398+
s.CloseAll(resources...)
399+
return nil, net.ErrConnDial{Name: "TriggerEventStore", ID: request.TriggerEventStoreId, Err: err}
400+
}
401+
resources = append(resources, net.Resource{Closer: triggerEventStoreConn, Name: "TriggerEventStore"})
402+
triggerEventStoreClient = eventstore.NewClient(triggerEventStoreConn)
403+
}
404+
378405
dependencies := core.StandardCapabilitiesDependencies{
379406
Config: request.Config,
380407
TelemetryService: telemetry,
@@ -388,6 +415,7 @@ func (s *standardCapabilitiesServer) Initialise(ctx context.Context, request *ca
388415
P2PKeystore: keyStore,
389416
OrgResolver: orgResolver,
390417
CRESettings: creSettings,
418+
TriggerEventStore: triggerEventStoreClient,
391419
}
392420

393421
if err = s.impl.Initialise(ctx, dependencies); err != nil {
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package eventstore
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"google.golang.org/grpc"
8+
"google.golang.org/protobuf/types/known/emptypb"
9+
"google.golang.org/protobuf/types/known/timestamppb"
10+
11+
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
12+
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/pb"
13+
)
14+
15+
var _ capabilities.EventStore = (*Client)(nil)
16+
17+
type Client struct {
18+
grpc pb.EventStoreClient
19+
}
20+
21+
func NewClient(cc grpc.ClientConnInterface) *Client {
22+
return &Client{grpc: pb.NewEventStoreClient(cc)}
23+
}
24+
25+
func (c *Client) Insert(ctx context.Context, rec capabilities.PendingEvent) error {
26+
ev := &pb.PendingEventProto{
27+
TriggerId: rec.TriggerId,
28+
EventId: rec.EventId,
29+
Payload: rec.Payload,
30+
FirstAt: timestamppb.New(rec.FirstAt),
31+
Attempts: int32(rec.Attempts),
32+
}
33+
if !rec.LastSentAt.IsZero() {
34+
ev.LastSentAt = timestamppb.New(rec.LastSentAt)
35+
}
36+
_, err := c.grpc.Insert(ctx, &pb.InsertEventRequest{Event: ev})
37+
return err
38+
}
39+
40+
func (c *Client) UpdateDelivery(ctx context.Context, triggerId string, eventId string, lastSentAt time.Time, attempts int) error {
41+
_, err := c.grpc.UpdateDelivery(ctx, &pb.UpdateDeliveryRequest{
42+
TriggerId: triggerId,
43+
EventId: eventId,
44+
LastSentAt: timestamppb.New(lastSentAt),
45+
Attempts: int32(attempts),
46+
})
47+
return err
48+
}
49+
50+
func (c *Client) List(ctx context.Context) ([]capabilities.PendingEvent, error) {
51+
resp, err := c.grpc.List(ctx, &emptypb.Empty{})
52+
if err != nil {
53+
return nil, err
54+
}
55+
events := make([]capabilities.PendingEvent, 0, len(resp.GetEvents()))
56+
for _, ev := range resp.GetEvents() {
57+
rec := capabilities.PendingEvent{
58+
TriggerId: ev.GetTriggerId(),
59+
EventId: ev.GetEventId(),
60+
Payload: ev.GetPayload(),
61+
Attempts: int(ev.GetAttempts()),
62+
}
63+
if t := ev.GetFirstAt(); t != nil {
64+
rec.FirstAt = t.AsTime()
65+
}
66+
if t := ev.GetLastSentAt(); t != nil {
67+
rec.LastSentAt = t.AsTime()
68+
}
69+
events = append(events, rec)
70+
}
71+
return events, nil
72+
}
73+
74+
func (c *Client) DeleteEvent(ctx context.Context, triggerId string, eventId string) error {
75+
_, err := c.grpc.DeleteEvent(ctx, &pb.DeleteEventRequest{
76+
TriggerId: triggerId,
77+
EventId: eventId,
78+
})
79+
return err
80+
}
81+
82+
func (c *Client) DeleteEventsForTrigger(ctx context.Context, triggerID string) error {
83+
_, err := c.grpc.DeleteEventsForTrigger(ctx, &pb.DeleteEventsForTriggerRequest{
84+
TriggerId: triggerID,
85+
})
86+
return err
87+
}

0 commit comments

Comments
 (0)