Skip to content

Commit 03425ec

Browse files
committed
feat: Implement consensus data source
1 parent f67e692 commit 03425ec

15 files changed

Lines changed: 569 additions & 326 deletions

consensus/consensus.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package consensus
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"strings"
8+
9+
"github.com/NethermindEth/juno/blockchain"
10+
"github.com/NethermindEth/juno/builder"
11+
consensusDB "github.com/NethermindEth/juno/consensus/db"
12+
"github.com/NethermindEth/juno/consensus/driver"
13+
"github.com/NethermindEth/juno/consensus/p2p"
14+
"github.com/NethermindEth/juno/consensus/p2p/config"
15+
"github.com/NethermindEth/juno/consensus/proposal"
16+
"github.com/NethermindEth/juno/consensus/proposer"
17+
"github.com/NethermindEth/juno/consensus/starknet"
18+
"github.com/NethermindEth/juno/consensus/tendermint"
19+
"github.com/NethermindEth/juno/consensus/types"
20+
"github.com/NethermindEth/juno/core/felt"
21+
"github.com/NethermindEth/juno/db"
22+
"github.com/NethermindEth/juno/utils"
23+
"github.com/NethermindEth/juno/vm"
24+
"github.com/libp2p/go-libp2p"
25+
"github.com/libp2p/go-libp2p/core/crypto"
26+
"github.com/libp2p/go-libp2p/core/host"
27+
"github.com/libp2p/go-libp2p/core/peer"
28+
"github.com/sourcegraph/conc/pool"
29+
)
30+
31+
type ConsensusServices struct {
32+
Host host.Host
33+
Proposer proposer.Proposer[starknet.Value, starknet.Hash]
34+
P2P p2p.P2P[starknet.Value, starknet.Hash, starknet.Address]
35+
Driver *driver.Driver[starknet.Value, starknet.Hash, starknet.Address]
36+
CommitListener driver.CommitListener[starknet.Value, starknet.Hash, starknet.Address]
37+
}
38+
39+
func Init(
40+
logger *utils.ZapLogger,
41+
database db.KeyValueStore,
42+
blockchain *blockchain.Blockchain,
43+
vm vm.VM,
44+
nodeAddress *starknet.Address,
45+
validators tendermint.Validators[starknet.Address],
46+
timeoutFn driver.TimeoutFn,
47+
hostAddress string,
48+
hostPrivateKey crypto.PrivKey,
49+
) (ConsensusServices, error) {
50+
chainHeight, err := blockchain.Height()
51+
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
52+
return ConsensusServices{}, err
53+
}
54+
currentHeight := types.Height(chainHeight + 1)
55+
56+
tendermintDB := consensusDB.NewTendermintDB[starknet.Value, starknet.Hash, starknet.Address](database, currentHeight)
57+
58+
executor := builder.NewExecutor(blockchain, vm, logger, false, true) // TODO: We're currently skipping signature validation
59+
builder := builder.New(blockchain, executor)
60+
61+
proposalStore := proposal.ProposalStore[starknet.Hash]{}
62+
proposer := proposer.New(logger, &builder, &proposalStore, *nodeAddress, toValue)
63+
stateMachine := tendermint.New(tendermintDB, logger, *nodeAddress, proposer, validators, currentHeight)
64+
65+
host, err := libp2p.New(
66+
libp2p.ListenAddrStrings(hostAddress),
67+
libp2p.Identity(hostPrivateKey),
68+
// libp2p.UserAgent(makeAgentName(version)),
69+
// // Use address factory to add the public address to the list of
70+
// // addresses that the node will advertise.
71+
// libp2p.AddrsFactory(addressFactory),
72+
// If we know the public ip, enable the relay service.
73+
libp2p.EnableRelayService(),
74+
// When listening behind NAT, enable peers to try to poke thought the
75+
// NAT in order to reach the node.
76+
libp2p.EnableHolePunching(),
77+
// Try to open a port in the NAT router to accept incoming connections.
78+
libp2p.NATPortMap(),
79+
)
80+
if err != nil {
81+
return ConsensusServices{}, err
82+
}
83+
84+
p2p := p2p.New(host, logger, &builder, &proposalStore, currentHeight, &config.DefaultBufferSizes)
85+
86+
commitListener := driver.NewCommitListener(logger, &proposalStore, proposer, p2p)
87+
driver := driver.New(logger, tendermintDB, stateMachine, commitListener, p2p, timeoutFn)
88+
89+
return ConsensusServices{
90+
Host: host,
91+
Proposer: proposer,
92+
P2P: p2p,
93+
Driver: &driver,
94+
CommitListener: commitListener,
95+
}, nil
96+
}
97+
98+
func Connect(ctx context.Context, host host.Host, peers string) error {
99+
if peers == "" {
100+
return nil
101+
}
102+
103+
pool := pool.New().WithErrors().WithFirstError()
104+
105+
for peerStr := range strings.SplitSeq(peers, ",") {
106+
pool.Go(func() error {
107+
peerAddr, err := peer.AddrInfoFromString(peerStr)
108+
if err != nil {
109+
return fmt.Errorf("unable to parse peer address %q: %w", peerStr, err)
110+
}
111+
112+
if err := host.Connect(ctx, *peerAddr); err != nil {
113+
return fmt.Errorf("unable to connect to %q: %w", peerStr, err)
114+
}
115+
116+
return nil
117+
})
118+
}
119+
120+
return pool.Wait()
121+
}
122+
123+
func toValue(value *felt.Felt) starknet.Value {
124+
return starknet.Value(*value)
125+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package datasource
2+
3+
import (
4+
"context"
5+
"errors"
6+
syncmap "sync"
7+
"sync/atomic"
8+
9+
"github.com/NethermindEth/juno/consensus/driver"
10+
"github.com/NethermindEth/juno/consensus/p2p"
11+
"github.com/NethermindEth/juno/consensus/proposer"
12+
"github.com/NethermindEth/juno/consensus/types"
13+
"github.com/NethermindEth/juno/core"
14+
"github.com/NethermindEth/juno/sync"
15+
)
16+
17+
const maxCommitHistory = 1024 // TODO: make this configurable
18+
19+
type consensusDataSource[V types.Hashable[H], H types.Hash, A types.Addr] struct {
20+
commitListener driver.CommitListener[V, H, A]
21+
proposer proposer.Proposer[V, H]
22+
cache syncmap.Map
23+
latest atomic.Uint64
24+
}
25+
26+
func New[V types.Hashable[H], H types.Hash, A types.Addr](
27+
commitListener driver.CommitListener[V, H, A],
28+
proposer proposer.Proposer[V, H],
29+
p2p p2p.P2P[V, H, A],
30+
) *consensusDataSource[V, H, A] {
31+
return &consensusDataSource[V, H, A]{
32+
commitListener: commitListener,
33+
proposer: proposer,
34+
cache: syncmap.Map{},
35+
latest: atomic.Uint64{},
36+
}
37+
}
38+
39+
func (c *consensusDataSource[V, H, A]) Run(ctx context.Context) error {
40+
for {
41+
select {
42+
case <-ctx.Done():
43+
return nil
44+
case committedBlock := <-c.commitListener.Listen():
45+
blockNumber := committedBlock.Block.Number
46+
47+
c.cache.Store(blockNumber, &committedBlock)
48+
c.latest.Store(blockNumber)
49+
c.cache.Delete(blockNumber - maxCommitHistory)
50+
}
51+
}
52+
}
53+
54+
func (c *consensusDataSource[V, H, A]) BlockByNumber(ctx context.Context, blockNumber uint64) (sync.CommittedBlock, error) {
55+
committedBlock, ok := c.cache.Load(blockNumber)
56+
if !ok {
57+
return sync.CommittedBlock{}, errors.New("block not found in cache")
58+
}
59+
60+
return *committedBlock.(*sync.CommittedBlock), nil
61+
}
62+
63+
func (c *consensusDataSource[V, H, A]) BlockLatest(ctx context.Context) (*core.Block, error) {
64+
committedBlock, err := c.BlockByNumber(ctx, c.latest.Load())
65+
if err != nil {
66+
return nil, err
67+
}
68+
69+
return committedBlock.Block, nil
70+
}
71+
72+
func (c *consensusDataSource[V, H, A]) BlockPending(ctx context.Context) (sync.Pending, error) {
73+
return sync.Pending{}, errors.New("not implemented") // TODO: Revise this
74+
}
75+
76+
func (c *consensusDataSource[V, H, A]) PreConfirmedBlockByNumber(ctx context.Context, blockNumber uint64) (core.PreConfirmed, error) {
77+
return *c.proposer.Preconfirmed(), nil
78+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package driver
2+
3+
import (
4+
"context"
5+
6+
"github.com/NethermindEth/juno/consensus/p2p"
7+
"github.com/NethermindEth/juno/consensus/proposal"
8+
"github.com/NethermindEth/juno/consensus/proposer"
9+
"github.com/NethermindEth/juno/consensus/types"
10+
"github.com/NethermindEth/juno/sync"
11+
"github.com/NethermindEth/juno/utils"
12+
)
13+
14+
// CommitListener is a component that is used to notify different components that a new committed block is available.
15+
type CommitListener[V types.Hashable[H], H types.Hash, A types.Addr] interface {
16+
// Commit is called by Tendermint when a block has been decided on and can be committed to the DB.
17+
Commit(context.Context, types.Height, V)
18+
// Listen returns a channel that will receive committed blocks.
19+
// This is supposed to be used by the component that writes the committed blocks to the database.
20+
Listen() <-chan sync.CommittedBlock
21+
}
22+
23+
type commitListener[V types.Hashable[H], H types.Hash, A types.Addr] struct {
24+
log utils.Logger
25+
proposalStore *proposal.ProposalStore[H]
26+
proposer proposer.Proposer[V, H]
27+
p2p p2p.P2P[V, H, A]
28+
commits chan sync.CommittedBlock
29+
}
30+
31+
func NewCommitListener[V types.Hashable[H], H types.Hash, A types.Addr](
32+
log utils.Logger,
33+
proposalStore *proposal.ProposalStore[H],
34+
proposer proposer.Proposer[V, H],
35+
p2p p2p.P2P[V, H, A],
36+
) CommitListener[V, H, A] {
37+
commits := make(chan sync.CommittedBlock)
38+
return &commitListener[V, H, A]{
39+
log: log,
40+
proposalStore: proposalStore,
41+
proposer: proposer,
42+
p2p: p2p,
43+
commits: commits,
44+
}
45+
}
46+
47+
func (b *commitListener[V, H, A]) Commit(ctx context.Context, height types.Height, value V) {
48+
buildResult := b.proposalStore.Get(value.Hash())
49+
if buildResult == nil {
50+
b.log.Errorw("failed to get build result", "hash", value.Hash())
51+
return
52+
}
53+
54+
committedBlock := sync.CommittedBlock{
55+
Block: buildResult.Preconfirmed.Block,
56+
StateUpdate: buildResult.Preconfirmed.StateUpdate,
57+
NewClasses: buildResult.Preconfirmed.NewClasses,
58+
Persisted: make(chan struct{}),
59+
}
60+
61+
select {
62+
case <-ctx.Done():
63+
return
64+
case b.commits <- committedBlock:
65+
}
66+
67+
select {
68+
case <-ctx.Done():
69+
return
70+
case <-committedBlock.Persisted:
71+
}
72+
73+
b.proposer.OnCommit(ctx, height, value)
74+
b.p2p.OnCommit(ctx, height, value)
75+
}
76+
77+
func (b *commitListener[V, H, A]) Listen() <-chan sync.CommittedBlock {
78+
return b.commits
79+
}

consensus/driver/driver.go

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,21 @@ import (
66

77
"github.com/NethermindEth/juno/consensus/db"
88
"github.com/NethermindEth/juno/consensus/p2p"
9-
"github.com/NethermindEth/juno/consensus/proposer"
109
"github.com/NethermindEth/juno/consensus/tendermint"
1110
"github.com/NethermindEth/juno/consensus/types"
1211
"github.com/NethermindEth/juno/utils"
1312
)
1413

15-
type timeoutFn func(step types.Step, round types.Round) time.Duration
16-
17-
type Blockchain[V types.Hashable[H], H types.Hash] interface {
18-
// Commit is called by Tendermint when a block has been decided on and can be committed to the DB.
19-
Commit(types.Height, V)
20-
}
14+
type TimeoutFn func(step types.Step, round types.Round) time.Duration
2115

2216
type Driver[V types.Hashable[H], H types.Hash, A types.Addr] struct {
23-
log utils.Logger
24-
db db.TendermintDB[V, H, A]
25-
stateMachine tendermint.StateMachine[V, H, A]
26-
blockchain Blockchain[V, H]
27-
p2p p2p.P2P[V, H, A]
28-
proposer proposer.Proposer[V, H]
17+
log utils.Logger
18+
db db.TendermintDB[V, H, A]
19+
stateMachine tendermint.StateMachine[V, H, A]
20+
commitListener CommitListener[V, H, A]
21+
p2p p2p.P2P[V, H, A]
2922

30-
getTimeout timeoutFn
23+
getTimeout TimeoutFn
3124

3225
scheduledTms map[types.Timeout]*time.Timer
3326
timeoutsCh chan types.Timeout
@@ -37,21 +30,19 @@ func New[V types.Hashable[H], H types.Hash, A types.Addr](
3730
log utils.Logger,
3831
db db.TendermintDB[V, H, A],
3932
stateMachine tendermint.StateMachine[V, H, A],
40-
blockchain Blockchain[V, H],
33+
commitListener CommitListener[V, H, A],
4134
p2p p2p.P2P[V, H, A],
42-
proposer proposer.Proposer[V, H],
43-
getTimeout timeoutFn,
35+
getTimeout TimeoutFn,
4436
) Driver[V, H, A] {
4537
return Driver[V, H, A]{
46-
log: log,
47-
db: db,
48-
stateMachine: stateMachine,
49-
blockchain: blockchain,
50-
p2p: p2p,
51-
proposer: proposer,
52-
getTimeout: getTimeout,
53-
scheduledTms: make(map[types.Timeout]*time.Timer),
54-
timeoutsCh: make(chan types.Timeout),
38+
log: log,
39+
db: db,
40+
stateMachine: stateMachine,
41+
commitListener: commitListener,
42+
p2p: p2p,
43+
getTimeout: getTimeout,
44+
scheduledTms: make(map[types.Timeout]*time.Timer),
45+
timeoutsCh: make(chan types.Timeout),
5546
}
5647
}
5748

@@ -132,9 +123,7 @@ func (d *Driver[V, H, A]) execute(
132123
}
133124

134125
d.log.Debugw("Committing", "height", action.Height, "round", action.Round)
135-
d.blockchain.Commit(action.Height, *action.Value)
136-
d.proposer.OnCommit(ctx, action.Height, *action.Value)
137-
d.p2p.OnCommit(ctx, action.Height, *action.Value)
126+
d.commitListener.Commit(ctx, action.Height, *action.Value)
138127

139128
if err := d.db.DeleteWALEntries(action.Height); err != nil {
140129
d.log.Errorw("failed to delete WAL messages during commit", "height", action.Height, "round", action.Round, "err", err)

consensus/driver/driver_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type (
2727
listeners = p2p.Listeners[starknet.Value, starknet.Hash, starknet.Address]
2828
broadcasters = p2p.Broadcasters[starknet.Value, starknet.Hash, starknet.Address]
2929
tendermintDB = db.TendermintDB[starknet.Value, starknet.Hash, starknet.Address]
30-
blockchain = driver.Blockchain[starknet.Value, starknet.Hash]
30+
commitListener = driver.CommitListener[starknet.Value, starknet.Hash, starknet.Address]
3131
)
3232

3333
const (
@@ -165,9 +165,8 @@ func TestDriver(t *testing.T) {
165165
utils.NewNopZapLogger(),
166166
newTendermintDB(t),
167167
stateMachine,
168-
newMockBlockchain(t, &commitAction),
168+
newMockCommitListener(t, &commitAction),
169169
p2p,
170-
newMockProposer(),
171170
mockTimeoutFn,
172171
)
173172

0 commit comments

Comments
 (0)