Skip to content

Commit 00855a8

Browse files
committed
go/consensus/cometbft: Serve service descriptor queries immediately
1 parent 41b5a04 commit 00855a8

11 files changed

Lines changed: 31 additions & 36 deletions

File tree

.changelog/6209.trivial.md

Whitespace-only changes.

go/consensus/cometbft/api/api.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -310,29 +310,18 @@ type ServiceEvent struct {
310310
type ServiceDescriptor struct {
311311
name string
312312
eventType string
313-
queryCh <-chan cmtpubsub.Query
313+
queryCh chan cmtpubsub.Query
314314
}
315315

316316
// NewServiceDescriptor creates a new consensus service descriptor.
317-
func NewServiceDescriptor(name, eventType string, queryCh <-chan cmtpubsub.Query) *ServiceDescriptor {
317+
func NewServiceDescriptor(name, eventType string, capacity int) *ServiceDescriptor {
318318
return &ServiceDescriptor{
319319
name: name,
320320
eventType: eventType,
321-
queryCh: queryCh,
321+
queryCh: make(chan cmtpubsub.Query, capacity),
322322
}
323323
}
324324

325-
// NewStaticServiceDescriptor creates a new static consensus service descriptor.
326-
func NewStaticServiceDescriptor(name, eventType string, queries []cmtpubsub.Query) *ServiceDescriptor {
327-
ch := make(chan cmtpubsub.Query)
328-
go func() {
329-
for _, q := range queries {
330-
ch <- q
331-
}
332-
}()
333-
return NewServiceDescriptor(name, eventType, ch)
334-
}
335-
336325
// Name returns the name of this service.
337326
func (sd *ServiceDescriptor) Name() string {
338327
return sd.name
@@ -348,6 +337,14 @@ func (sd *ServiceDescriptor) Queries() <-chan cmtpubsub.Query {
348337
return sd.queryCh
349338
}
350339

340+
// AddQuery enqueues query that need to be subscribed to.
341+
//
342+
// This method blocks if the query channel is full, which can occur when the
343+
// number of pending queries exceeds the channel's capacity.
344+
func (sd *ServiceDescriptor) AddQuery(query cmtpubsub.Query) {
345+
sd.queryCh <- query
346+
}
347+
351348
// ServiceClient is a consensus service client.
352349
type ServiceClient interface {
353350
// ServiceDescriptor returns the consensus service descriptor.

go/consensus/cometbft/api/api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55

66
"github.com/stretchr/testify/require"
77

8-
cmtpubsub "github.com/cometbft/cometbft/libs/pubsub"
98
cmtquery "github.com/cometbft/cometbft/libs/pubsub/query"
109
)
1110

@@ -14,7 +13,8 @@ func TestServiceDescriptor(t *testing.T) {
1413

1514
q1 := cmtquery.MustParse("a='b'")
1615

17-
sd := NewStaticServiceDescriptor("test", "test_type", []cmtpubsub.Query{q1})
16+
sd := NewServiceDescriptor("test", "test_type", 1)
17+
sd.AddQuery(q1)
1818
require.Equal("test", sd.Name())
1919
require.Equal("test_type", sd.EventType())
2020
recvQ1 := <-sd.Queries()

go/consensus/cometbft/beacon/beacon.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"sync"
1010

1111
cmtabcitypes "github.com/cometbft/cometbft/abci/types"
12-
cmtpubsub "github.com/cometbft/cometbft/libs/pubsub"
1312
cmttypes "github.com/cometbft/cometbft/types"
1413
"github.com/eapache/channels"
1514

@@ -56,7 +55,8 @@ type ServiceClient struct {
5655

5756
// New constructs a new CometBFT backed beacon service client.
5857
func New(baseEpoch api.EpochTime, baseBlock int64, consensus consensus.Backend, querier *app.QueryFactory) *ServiceClient {
59-
descriptor := cmtapi.NewStaticServiceDescriptor(api.ModuleName, app.EventType, []cmtpubsub.Query{app.QueryApp})
58+
descriptor := cmtapi.NewServiceDescriptor(api.ModuleName, app.EventType, 1)
59+
descriptor.AddQuery(app.QueryApp)
6060

6161
return &ServiceClient{
6262
logger: logging.GetLogger("cometbft/beacon"),

go/consensus/cometbft/governance/governance.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77

88
cmtabcitypes "github.com/cometbft/cometbft/abci/types"
9-
cmtpubsub "github.com/cometbft/cometbft/libs/pubsub"
109
cmttypes "github.com/cometbft/cometbft/types"
1110

1211
"github.com/oasisprotocol/oasis-core/go/common/logging"
@@ -33,7 +32,8 @@ type ServiceClient struct {
3332

3433
// New constructs a new CometBFT backed governance service client.
3534
func New(consensus consensus.Backend, querier *app.QueryFactory) *ServiceClient {
36-
descriptor := tmapi.NewStaticServiceDescriptor(api.ModuleName, app.EventType, []cmtpubsub.Query{app.QueryApp})
35+
descriptor := tmapi.NewServiceDescriptor(api.ModuleName, app.EventType, 1)
36+
descriptor.AddQuery(app.QueryApp)
3737

3838
return &ServiceClient{
3939
logger: logging.GetLogger("cometbft/staking"),

go/consensus/cometbft/keymanager/keymanager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"context"
77

88
cmtabcitypes "github.com/cometbft/cometbft/abci/types"
9-
cmtpubsub "github.com/cometbft/cometbft/libs/pubsub"
109
cmttypes "github.com/cometbft/cometbft/types"
1110

1211
tmapi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api"
@@ -30,7 +29,8 @@ type ServiceClient struct {
3029

3130
// New constructs a new CometBFT backed key manager service client.
3231
func New(querier *app.QueryFactory) *ServiceClient {
33-
descriptor := tmapi.NewStaticServiceDescriptor(api.ModuleName, app.EventType, []cmtpubsub.Query{app.QueryApp})
32+
descriptor := tmapi.NewServiceDescriptor(api.ModuleName, app.EventType, 1)
33+
descriptor.AddQuery(app.QueryApp)
3434

3535
return &ServiceClient{
3636
descriptor: descriptor,

go/consensus/cometbft/registry/registry.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77

88
cmtabcitypes "github.com/cometbft/cometbft/abci/types"
9-
cmtpubsub "github.com/cometbft/cometbft/libs/pubsub"
109
cmttypes "github.com/cometbft/cometbft/types"
1110
"github.com/eapache/channels"
1211

@@ -39,7 +38,8 @@ type ServiceClient struct {
3938

4039
// New constructs a new CometBFT backed registry service client.
4140
func New(consensus consensus.Backend, querier *app.QueryFactory) *ServiceClient {
42-
descriptor := tmapi.NewStaticServiceDescriptor(api.ModuleName, app.EventType, []cmtpubsub.Query{app.QueryApp})
41+
descriptor := tmapi.NewServiceDescriptor(api.ModuleName, app.EventType, 1)
42+
descriptor.AddQuery(app.QueryApp)
4343

4444
return &ServiceClient{
4545
logger: logging.GetLogger("cometbft/registry"),

go/consensus/cometbft/roothash/roothash.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"sync"
1010

1111
cmtabcitypes "github.com/cometbft/cometbft/abci/types"
12-
cmtpubsub "github.com/cometbft/cometbft/libs/pubsub"
1312
cmttypes "github.com/cometbft/cometbft/types"
1413

1514
"github.com/oasisprotocol/oasis-core/go/common"
@@ -23,7 +22,7 @@ import (
2322
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
2423
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
2524
"github.com/oasisprotocol/oasis-core/go/roothash/api/message"
26-
runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry"
25+
"github.com/oasisprotocol/oasis-core/go/runtime/registry"
2726
)
2827

2928
const crashPointBlockBeforeIndex = "roothash.before_index"
@@ -56,14 +55,12 @@ type ServiceClient struct {
5655
runtimeNotifiers map[common.Namespace]*runtimeBrokers
5756
genesisBlocks map[common.Namespace]*block.Block
5857

59-
queryCh chan cmtpubsub.Query
6058
trackedRuntimes map[common.Namespace]*trackedRuntime
6159
}
6260

6361
// New constructs a new CometBFT-based roothash service client.
6462
func New(consensus consensus.Backend, querier *app.QueryFactory) *ServiceClient {
65-
queryCh := make(chan cmtpubsub.Query, runtimeRegistry.MaxRuntimeCount)
66-
descriptor := cmtapi.NewServiceDescriptor(api.ModuleName, app.EventType, queryCh)
63+
descriptor := cmtapi.NewServiceDescriptor(api.ModuleName, app.EventType, registry.MaxRuntimeCount)
6764

6865
return &ServiceClient{
6966
logger: logging.GetLogger("cometbft/roothash"),
@@ -237,7 +234,8 @@ func (sc *ServiceClient) trackRuntime(id common.Namespace) {
237234
}
238235

239236
// Request subscription to events for this runtime.
240-
sc.queryCh <- app.QueryForRuntime(id)
237+
query := app.QueryForRuntime(id)
238+
sc.descriptor.AddQuery(query)
241239
}
242240

243241
// StateToGenesis implements api.Backend.

go/consensus/cometbft/scheduler/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77

88
cmtabcitypes "github.com/cometbft/cometbft/abci/types"
9-
cmtpubsub "github.com/cometbft/cometbft/libs/pubsub"
109
cmttypes "github.com/cometbft/cometbft/types"
1110
"github.com/eapache/channels"
1211

@@ -32,7 +31,8 @@ type ServiceClient struct {
3231

3332
// New constructs a new CometBFT-based scheduler service client.
3433
func New(querier *app.QueryFactory) *ServiceClient {
35-
descriptor := tmapi.NewStaticServiceDescriptor(api.ModuleName, app.EventType, []cmtpubsub.Query{app.QueryApp})
34+
descriptor := tmapi.NewServiceDescriptor(api.ModuleName, app.EventType, 1)
35+
descriptor.AddQuery(app.QueryApp)
3636

3737
sc := &ServiceClient{
3838
logger: logging.GetLogger("cometbft/scheduler"),

go/consensus/cometbft/staking/staking.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77

88
cmtabcitypes "github.com/cometbft/cometbft/abci/types"
9-
cmtpubsub "github.com/cometbft/cometbft/libs/pubsub"
109
cmttypes "github.com/cometbft/cometbft/types"
1110

1211
"github.com/oasisprotocol/oasis-core/go/common/logging"
@@ -33,7 +32,8 @@ type ServiceClient struct {
3332

3433
// New constructs a new CometBFT backed staking service client.
3534
func New(consensus consensus.Backend, querier *app.QueryFactory) *ServiceClient {
36-
descriptor := tmapi.NewStaticServiceDescriptor(api.ModuleName, app.EventType, []cmtpubsub.Query{app.QueryApp})
35+
descriptor := tmapi.NewServiceDescriptor(api.ModuleName, app.EventType, 1)
36+
descriptor.AddQuery(app.QueryApp)
3737

3838
return &ServiceClient{
3939
logger: logging.GetLogger("cometbft/staking"),

0 commit comments

Comments
 (0)