Skip to content

Commit 0857a45

Browse files
authored
Merge pull request #6209 from oasisprotocol/peternose/trivial/refactor-service-descriptor
go/consensus/cometbft: Refactor service descriptor
2 parents 5911b1d + 00855a8 commit 0857a45

11 files changed

Lines changed: 139 additions & 119 deletions

File tree

.changelog/6209.trivial.md

Whitespace-only changes.

go/consensus/cometbft/api/api.go

Lines changed: 24 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -307,59 +307,48 @@ type ServiceEvent struct {
307307
}
308308

309309
// ServiceDescriptor is a CometBFT consensus service descriptor.
310-
type ServiceDescriptor interface {
311-
// Name returns the name of this service.
312-
Name() string
313-
314-
// EventType returns the event type associated with the consensus service.
315-
EventType() string
316-
317-
// Queries returns a channel that emits queries that need to be subscribed to.
318-
Queries() <-chan cmtpubsub.Query
319-
}
320-
321-
type serviceDescriptor struct {
310+
type ServiceDescriptor struct {
322311
name string
323312
eventType string
324-
queryCh <-chan cmtpubsub.Query
313+
queryCh chan cmtpubsub.Query
325314
}
326315

327-
func (sd *serviceDescriptor) Name() string {
316+
// NewServiceDescriptor creates a new consensus service descriptor.
317+
func NewServiceDescriptor(name, eventType string, capacity int) *ServiceDescriptor {
318+
return &ServiceDescriptor{
319+
name: name,
320+
eventType: eventType,
321+
queryCh: make(chan cmtpubsub.Query, capacity),
322+
}
323+
}
324+
325+
// Name returns the name of this service.
326+
func (sd *ServiceDescriptor) Name() string {
328327
return sd.name
329328
}
330329

331-
func (sd *serviceDescriptor) EventType() string {
330+
// EventType returns the event type associated with the consensus service.
331+
func (sd *ServiceDescriptor) EventType() string {
332332
return sd.eventType
333333
}
334334

335-
func (sd *serviceDescriptor) Queries() <-chan cmtpubsub.Query {
335+
// Queries returns a channel that emits queries that need to be subscribed to.
336+
func (sd *ServiceDescriptor) Queries() <-chan cmtpubsub.Query {
336337
return sd.queryCh
337338
}
338339

339-
// NewServiceDescriptor creates a new consensus service descriptor.
340-
func NewServiceDescriptor(name, eventType string, queryCh <-chan cmtpubsub.Query) ServiceDescriptor {
341-
return &serviceDescriptor{
342-
name: name,
343-
eventType: eventType,
344-
queryCh: queryCh,
345-
}
346-
}
347-
348-
// NewStaticServiceDescriptor creates a new static consensus service descriptor.
349-
func NewStaticServiceDescriptor(name, eventType string, queries []cmtpubsub.Query) ServiceDescriptor {
350-
ch := make(chan cmtpubsub.Query)
351-
go func() {
352-
for _, q := range queries {
353-
ch <- q
354-
}
355-
}()
356-
return NewServiceDescriptor(name, eventType, ch)
340+
// AddQuery enqueues query that need to be subscribed to.
341+
//
342+
// This method blocks if the query channel is full, which can occur when the
343+
// number of pending queries exceeds the channel's capacity.
344+
func (sd *ServiceDescriptor) AddQuery(query cmtpubsub.Query) {
345+
sd.queryCh <- query
357346
}
358347

359348
// ServiceClient is a consensus service client.
360349
type ServiceClient interface {
361350
// ServiceDescriptor returns the consensus service descriptor.
362-
ServiceDescriptor() ServiceDescriptor
351+
ServiceDescriptor() *ServiceDescriptor
363352

364353
// DeliverHeight delivers a new block height.
365354
DeliverHeight(ctx context.Context, height int64) error

go/consensus/cometbft/api/api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55

66
"github.com/stretchr/testify/require"
77

8-
cmtpubsub "github.com/cometbft/cometbft/libs/pubsub"
98
cmtquery "github.com/cometbft/cometbft/libs/pubsub/query"
109
)
1110

@@ -14,7 +13,8 @@ func TestServiceDescriptor(t *testing.T) {
1413

1514
q1 := cmtquery.MustParse("a='b'")
1615

17-
sd := NewStaticServiceDescriptor("test", "test_type", []cmtpubsub.Query{q1})
16+
sd := NewServiceDescriptor("test", "test_type", 1)
17+
sd.AddQuery(q1)
1818
require.Equal("test", sd.Name())
1919
require.Equal("test_type", sd.EventType())
2020
recvQ1 := <-sd.Queries()

go/consensus/cometbft/beacon/beacon.go

Lines changed: 44 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,17 @@ import (
99
"sync"
1010

1111
cmtabcitypes "github.com/cometbft/cometbft/abci/types"
12-
cmtpubsub "github.com/cometbft/cometbft/libs/pubsub"
1312
cmttypes "github.com/cometbft/cometbft/types"
1413
"github.com/eapache/channels"
1514

16-
beaconAPI "github.com/oasisprotocol/oasis-core/go/beacon/api"
15+
"github.com/oasisprotocol/oasis-core/go/beacon/api"
1716
"github.com/oasisprotocol/oasis-core/go/common/cache/lru"
1817
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
1918
"github.com/oasisprotocol/oasis-core/go/common/logging"
2019
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
2120
consensus "github.com/oasisprotocol/oasis-core/go/consensus/api"
2221
"github.com/oasisprotocol/oasis-core/go/consensus/api/events"
23-
tmapi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api"
22+
cmtapi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api"
2423
app "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/beacon"
2524
)
2625

@@ -30,45 +29,50 @@ const epochCacheCapacity = 128
3029
// ServiceClient is the beacon service client.
3130
type ServiceClient struct {
3231
sync.RWMutex
33-
tmapi.BaseServiceClient
32+
cmtapi.BaseServiceClient
3433

3534
logger *logging.Logger
3635

37-
consensus consensus.Backend
38-
querier *app.QueryFactory
36+
consensus consensus.Backend
37+
querier *app.QueryFactory
38+
descriptor *cmtapi.ServiceDescriptor
3939

4040
epochNotifier *pubsub.Broker
41-
epochLastNotified beaconAPI.EpochTime
42-
epoch beaconAPI.EpochTime
41+
epochLastNotified api.EpochTime
42+
epoch api.EpochTime
4343
epochCurrentBlock int64
4444
epochCache *lru.Cache
4545

4646
vrfNotifier *pubsub.Broker
4747
vrfLastNotified hash.Hash
48-
vrfEvent *beaconAPI.VRFEvent
48+
vrfEvent *api.VRFEvent
4949

5050
initialNotify bool
5151

52-
baseEpoch beaconAPI.EpochTime
52+
baseEpoch api.EpochTime
5353
baseBlock int64
5454
}
5555

5656
// New constructs a new CometBFT backed beacon service client.
57-
func New(baseEpoch beaconAPI.EpochTime, baseBlock int64, consensus consensus.Backend, querier *app.QueryFactory) *ServiceClient {
57+
func New(baseEpoch api.EpochTime, baseBlock int64, consensus consensus.Backend, querier *app.QueryFactory) *ServiceClient {
58+
descriptor := cmtapi.NewServiceDescriptor(api.ModuleName, app.EventType, 1)
59+
descriptor.AddQuery(app.QueryApp)
60+
5861
return &ServiceClient{
5962
logger: logging.GetLogger("cometbft/beacon"),
6063
consensus: consensus,
6164
querier: querier,
65+
descriptor: descriptor,
6266
epochNotifier: pubsub.NewBroker(false),
63-
epochLastNotified: beaconAPI.EpochInvalid,
67+
epochLastNotified: api.EpochInvalid,
6468
epochCache: lru.New(lru.Capacity(epochCacheCapacity, false)),
6569
vrfNotifier: pubsub.NewBroker(false),
6670
baseEpoch: baseEpoch,
6771
baseBlock: baseBlock,
6872
}
6973
}
7074

71-
func (sc *ServiceClient) StateToGenesis(ctx context.Context, height int64) (*beaconAPI.Genesis, error) {
75+
func (sc *ServiceClient) StateToGenesis(ctx context.Context, height int64) (*api.Genesis, error) {
7276
q, err := sc.querier.QueryAt(ctx, height)
7377
if err != nil {
7478
return nil, err
@@ -77,7 +81,7 @@ func (sc *ServiceClient) StateToGenesis(ctx context.Context, height int64) (*bea
7781
return q.Genesis(ctx)
7882
}
7983

80-
func (sc *ServiceClient) ConsensusParameters(ctx context.Context, height int64) (*beaconAPI.ConsensusParameters, error) {
84+
func (sc *ServiceClient) ConsensusParameters(ctx context.Context, height int64) (*api.ConsensusParameters, error) {
8185
q, err := sc.querier.QueryAt(ctx, height)
8286
if err != nil {
8387
return nil, fmt.Errorf("beacon: genesis query failed: %w", err)
@@ -86,21 +90,21 @@ func (sc *ServiceClient) ConsensusParameters(ctx context.Context, height int64)
8690
return q.ConsensusParameters(ctx)
8791
}
8892

89-
func (sc *ServiceClient) GetBaseEpoch(context.Context) (beaconAPI.EpochTime, error) {
93+
func (sc *ServiceClient) GetBaseEpoch(context.Context) (api.EpochTime, error) {
9094
return sc.baseEpoch, nil
9195
}
9296

93-
func (sc *ServiceClient) GetEpoch(ctx context.Context, height int64) (beaconAPI.EpochTime, error) {
97+
func (sc *ServiceClient) GetEpoch(ctx context.Context, height int64) (api.EpochTime, error) {
9498
q, err := sc.querier.QueryAt(ctx, height)
9599
if err != nil {
96-
return beaconAPI.EpochInvalid, err
100+
return api.EpochInvalid, err
97101
}
98102

99103
epoch, _, err := q.Epoch(ctx)
100104
return epoch, err
101105
}
102106

103-
func (sc *ServiceClient) GetFutureEpoch(ctx context.Context, height int64) (*beaconAPI.EpochTimeState, error) {
107+
func (sc *ServiceClient) GetFutureEpoch(ctx context.Context, height int64) (*api.EpochTimeState, error) {
104108
q, err := sc.querier.QueryAt(ctx, height)
105109
if err != nil {
106110
return nil, err
@@ -109,7 +113,7 @@ func (sc *ServiceClient) GetFutureEpoch(ctx context.Context, height int64) (*bea
109113
return q.FutureEpoch(ctx)
110114
}
111115

112-
func (sc *ServiceClient) GetEpochBlock(ctx context.Context, epoch beaconAPI.EpochTime) (int64, error) {
116+
func (sc *ServiceClient) GetEpochBlock(ctx context.Context, epoch api.EpochTime) (int64, error) {
113117
now, currentBlk := sc.currentEpochBlock()
114118
switch {
115119
case epoch == now:
@@ -140,15 +144,15 @@ func (sc *ServiceClient) GetEpochBlock(ctx context.Context, epoch beaconAPI.Epoc
140144

141145
// Find historic epoch with bounded bisection.
142146
const maxIterations = 20 // Should be good enough for most use cases.
143-
var prevEpoch beaconAPI.EpochTime
147+
var prevEpoch api.EpochTime
144148
for range maxIterations {
145149
q, err := sc.querier.QueryAt(ctx, height)
146150
if err != nil {
147151
return 0, fmt.Errorf("failed to query epoch: %w", err)
148152
}
149153

150154
var (
151-
curEpoch beaconAPI.EpochTime
155+
curEpoch api.EpochTime
152156
epochHeight int64
153157
)
154158
curEpoch, epochHeight, err = q.Epoch(ctx)
@@ -176,7 +180,7 @@ func (sc *ServiceClient) GetEpochBlock(ctx context.Context, epoch beaconAPI.Epoc
176180
return 0, fmt.Errorf("failed to find historic epoch")
177181
}
178182

179-
func (sc *ServiceClient) WaitEpoch(ctx context.Context, epoch beaconAPI.EpochTime) error {
183+
func (sc *ServiceClient) WaitEpoch(ctx context.Context, epoch api.EpochTime) error {
180184
ch, sub, err := sc.WatchEpochs(ctx)
181185
if err != nil {
182186
return err
@@ -198,18 +202,18 @@ func (sc *ServiceClient) WaitEpoch(ctx context.Context, epoch beaconAPI.EpochTim
198202
}
199203
}
200204

201-
func (sc *ServiceClient) WatchEpochs(context.Context) (<-chan beaconAPI.EpochTime, pubsub.ClosableSubscription, error) {
205+
func (sc *ServiceClient) WatchEpochs(context.Context) (<-chan api.EpochTime, pubsub.ClosableSubscription, error) {
202206
hook := sc.epochNotifierHook()
203-
ch := make(chan beaconAPI.EpochTime)
207+
ch := make(chan api.EpochTime)
204208
sub := sc.epochNotifier.SubscribeEx(hook)
205209
sub.Unwrap(ch)
206210

207211
return ch, sub, nil
208212
}
209213

210-
func (sc *ServiceClient) WatchLatestEpoch(context.Context) (<-chan beaconAPI.EpochTime, pubsub.ClosableSubscription, error) {
214+
func (sc *ServiceClient) WatchLatestEpoch(context.Context) (<-chan api.EpochTime, pubsub.ClosableSubscription, error) {
211215
hook := sc.epochNotifierHook()
212-
ch := make(chan beaconAPI.EpochTime)
216+
ch := make(chan api.EpochTime)
213217
sub := sc.epochNotifier.SubscribeBufferedEx(1, hook)
214218
sub.Unwrap(ch)
215219

@@ -225,7 +229,7 @@ func (sc *ServiceClient) GetBeacon(ctx context.Context, height int64) ([]byte, e
225229
return q.Beacon(ctx)
226230
}
227231

228-
func (sc *ServiceClient) GetVRFState(ctx context.Context, height int64) (*beaconAPI.VRFState, error) {
232+
func (sc *ServiceClient) GetVRFState(ctx context.Context, height int64) (*api.VRFState, error) {
229233
q, err := sc.querier.QueryAt(ctx, height)
230234
if err != nil {
231235
return nil, err
@@ -234,17 +238,17 @@ func (sc *ServiceClient) GetVRFState(ctx context.Context, height int64) (*beacon
234238
return q.VRFState(ctx)
235239
}
236240

237-
func (sc *ServiceClient) WatchLatestVRFEvent(context.Context) (<-chan *beaconAPI.VRFEvent, *pubsub.Subscription, error) {
241+
func (sc *ServiceClient) WatchLatestVRFEvent(context.Context) (<-chan *api.VRFEvent, *pubsub.Subscription, error) {
238242
hook := sc.vrfNotifierHook()
239-
ch := make(chan *beaconAPI.VRFEvent)
243+
ch := make(chan *api.VRFEvent)
240244
sub := sc.vrfNotifier.SubscribeEx(hook)
241245
sub.Unwrap(ch)
242246

243247
return ch, sub, nil
244248
}
245249

246-
func (sc *ServiceClient) ServiceDescriptor() tmapi.ServiceDescriptor {
247-
return tmapi.NewStaticServiceDescriptor("beacon", app.EventType, []cmtpubsub.Query{app.QueryApp})
250+
func (sc *ServiceClient) ServiceDescriptor() *cmtapi.ServiceDescriptor {
251+
return sc.descriptor
248252
}
249253

250254
func (sc *ServiceClient) DeliverHeight(ctx context.Context, height int64) error {
@@ -266,13 +270,13 @@ func (sc *ServiceClient) DeliverHeight(ctx context.Context, height int64) error
266270
sc.epochNotifier.Broadcast(epoch)
267271
}
268272

269-
var vrfState *beaconAPI.VRFState
273+
var vrfState *api.VRFState
270274
vrfState, err = q.VRFState(ctx)
271275
if err != nil {
272276
return fmt.Errorf("beacon: failed to query VRF state: %w", err)
273277
}
274278
if vrfState != nil {
275-
var event beaconAPI.VRFEvent
279+
var event api.VRFEvent
276280
event.FromState(vrfState)
277281

278282
if sc.updateCachedVRFEvent(&event) {
@@ -289,8 +293,8 @@ func (sc *ServiceClient) DeliverEvent(_ context.Context, height int64, _ cmttype
289293
key := pair.GetKey()
290294
val := pair.GetValue()
291295

292-
if events.IsAttributeKind(key, &beaconAPI.EpochEvent{}) {
293-
var event beaconAPI.EpochEvent
296+
if events.IsAttributeKind(key, &api.EpochEvent{}) {
297+
var event api.EpochEvent
294298
if err := events.DecodeValue(val, &event); err != nil {
295299
sc.logger.Error("epochtime: malformed epoch event value",
296300
"err", err,
@@ -302,8 +306,8 @@ func (sc *ServiceClient) DeliverEvent(_ context.Context, height int64, _ cmttype
302306
sc.epochNotifier.Broadcast(event.Epoch)
303307
}
304308
}
305-
if events.IsAttributeKind(key, &beaconAPI.VRFEvent{}) {
306-
var event beaconAPI.VRFEvent
309+
if events.IsAttributeKind(key, &api.VRFEvent{}) {
310+
var event api.VRFEvent
307311
if err := events.DecodeValue(val, &event); err != nil {
308312
sc.logger.Error("beacon: malformed VRF event",
309313
"err", err,
@@ -318,7 +322,7 @@ func (sc *ServiceClient) DeliverEvent(_ context.Context, height int64, _ cmttype
318322
return nil
319323
}
320324

321-
func (sc *ServiceClient) updateCachedEpoch(height int64, epoch beaconAPI.EpochTime) bool {
325+
func (sc *ServiceClient) updateCachedEpoch(height int64, epoch api.EpochTime) bool {
322326
sc.Lock()
323327
defer sc.Unlock()
324328

@@ -338,7 +342,7 @@ func (sc *ServiceClient) updateCachedEpoch(height int64, epoch beaconAPI.EpochTi
338342
return false
339343
}
340344

341-
func (sc *ServiceClient) updateCachedVRFEvent(event *beaconAPI.VRFEvent) bool {
345+
func (sc *ServiceClient) updateCachedVRFEvent(event *api.VRFEvent) bool {
342346
sc.Lock()
343347
defer sc.Unlock()
344348

@@ -358,7 +362,7 @@ func (sc *ServiceClient) updateCachedVRFEvent(event *beaconAPI.VRFEvent) bool {
358362
return false
359363
}
360364

361-
func (sc *ServiceClient) currentEpochBlock() (beaconAPI.EpochTime, int64) {
365+
func (sc *ServiceClient) currentEpochBlock() (api.EpochTime, int64) {
362366
sc.RLock()
363367
defer sc.RUnlock()
364368

0 commit comments

Comments
 (0)