Skip to content

Commit 5c5f8f6

Browse files
authored
refactor(event): parse events after querying block results (#1978)
1 parent 32f2443 commit 5c5f8f6

2 files changed

Lines changed: 38 additions & 38 deletions

File tree

events/publish.go

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,46 +22,34 @@ import (
2222

2323
// Publish events using tm buses to clients. Waits on context
2424
// shutdown signals to exit.
25-
func Publish(ctx context.Context, tmbus tmclient.EventsClient, name string, bus pubsub.Bus) (err error) {
26-
25+
func Publish(ctx context.Context, client tmclient.Client, name string, bus pubsub.Bus) (err error) {
2726
const (
28-
queuesz = 100
27+
queuesz = 1000
2928
)
3029
var (
31-
txname = name + "-tx"
32-
blkname = name + "-blk"
30+
blkHeaderName = name + "-blk-hdr"
3331
)
3432

35-
txch, err := tmbus.Subscribe(ctx, txname, txQuery().String(), queuesz)
36-
if err != nil {
37-
return err
38-
}
39-
defer func() {
40-
err = tmbus.UnsubscribeAll(ctx, txname)
41-
}()
33+
tmbus := client.(tmclient.EventsClient)
4234

43-
blkch, err := tmbus.Subscribe(ctx, blkname, blkQuery().String(), queuesz)
35+
blkch, err := tmbus.Subscribe(ctx, blkHeaderName, blkHeaderQuery().String(), queuesz)
4436
if err != nil {
4537
return err
4638
}
4739
defer func() {
48-
err = tmbus.UnsubscribeAll(ctx, blkname)
40+
err = tmbus.UnsubscribeAll(ctx, blkHeaderName)
4941
}()
5042

5143
g, ctx := errgroup.WithContext(ctx)
5244

5345
g.Go(func() error {
54-
return publishEvents(ctx, txch, bus)
55-
})
56-
57-
g.Go(func() error {
58-
return publishEvents(ctx, blkch, bus)
46+
return publishEvents(ctx, client, blkch, bus)
5947
})
6048

6149
return g.Wait()
6250
}
6351

64-
func publishEvents(ctx context.Context, ch <-chan ctypes.ResultEvent, bus pubsub.Bus) error {
52+
func publishEvents(ctx context.Context, client tmclient.Client, ch <-chan ctypes.ResultEvent, bus pubsub.Bus) error {
6553
var err error
6654

6755
loop:
@@ -70,30 +58,37 @@ loop:
7058
case <-ctx.Done():
7159
break loop
7260
case ed := <-ch:
61+
// nolint: gocritic
7362
switch evt := ed.Data.(type) {
74-
case tmtmtypes.EventDataTx:
75-
if !evt.Result.IsOK() {
76-
continue
77-
}
78-
processEvents(bus, evt.Result.GetEvents())
7963
case tmtmtypes.EventDataNewBlockHeader:
80-
processEvents(bus, evt.ResultEndBlock.GetEvents())
64+
processBlock(ctx, bus, client, evt.Header.Height)
8165
}
8266
}
8367
}
8468

8569
return err
8670
}
8771

88-
func processEvents(bus pubsub.Bus, events []abci.Event) {
89-
for _, ev := range events {
90-
if mev, ok := processEvent(ev); ok {
91-
if err := bus.Publish(mev); err != nil {
92-
bus.Close()
93-
return
94-
}
72+
func processBlock(ctx context.Context, bus pubsub.Bus, client tmclient.Client, height int64) {
73+
blkResults, err := client.BlockResults(ctx, &height)
74+
if err != nil {
75+
return
76+
}
77+
78+
for _, tx := range blkResults.TxsResults {
79+
if tx == nil {
9580
continue
9681
}
82+
83+
for _, ev := range tx.Events {
84+
if mev, ok := processEvent(ev); ok {
85+
if err := bus.Publish(mev); err != nil {
86+
bus.Close()
87+
return
88+
}
89+
continue
90+
}
91+
}
9792
}
9893
}
9994

events/query.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,17 @@ import (
88
tmtypes "github.com/tendermint/tendermint/types"
99
)
1010

11-
func txQuery() pubsub.Query {
12-
return tmquery.MustParse(
13-
fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx))
14-
}
11+
// func txQuery() pubsub.Query {
12+
// return tmquery.MustParse(
13+
// fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx))
14+
// }
15+
//
16+
// func blkQuery() pubsub.Query {
17+
// return tmquery.MustParse(
18+
// fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlock))
19+
// }
1520

16-
func blkQuery() pubsub.Query {
21+
func blkHeaderQuery() pubsub.Query {
1722
return tmquery.MustParse(
1823
fmt.Sprintf("%s='%s'", tmtypes.EventTypeKey, tmtypes.EventNewBlockHeader))
1924
}

0 commit comments

Comments
 (0)