Skip to content

Commit e18383d

Browse files
committed
feat: support promotable proposer rotation
1 parent 31d3993 commit e18383d

9 files changed

Lines changed: 540 additions & 72 deletions

File tree

node/full.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,15 @@ func newFullNode(
118118
// Initialize raft node if enabled (for both aggregator and sync nodes)
119119
var leaderElection leaderElection
120120
switch {
121+
case nodeConfig.Node.Promotable && !nodeConfig.Raft.Enable:
122+
if signer == nil {
123+
return nil, fmt.Errorf("promotable mode requires a signer")
124+
}
125+
localProposer, err := signer.GetAddress()
126+
if err != nil {
127+
return nil, fmt.Errorf("get promotable signer address: %w", err)
128+
}
129+
leaderElection = newDynamicProposerElection(logger, localProposer, genesis.ProposerAddress, evstore, leaderFactory, followerFactory, 300*time.Millisecond)
121130
case nodeConfig.Node.Aggregator && nodeConfig.Raft.Enable:
122131
leaderElection = raftpkg.NewDynamicLeaderElection(logger, leaderFactory, followerFactory, raftNode)
123132
case nodeConfig.Node.Aggregator && !nodeConfig.Raft.Enable:

node/proposer_election.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package node
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"errors"
7+
"fmt"
8+
"sync"
9+
"sync/atomic"
10+
"time"
11+
12+
"github.com/rs/zerolog"
13+
14+
"github.com/evstack/ev-node/pkg/raft"
15+
"github.com/evstack/ev-node/pkg/store"
16+
"github.com/evstack/ev-node/types"
17+
)
18+
19+
type proposerStateReader interface {
20+
GetState(ctx context.Context) (types.State, error)
21+
}
22+
23+
type proposerRole uint8
24+
25+
const (
26+
proposerRoleFollower proposerRole = iota
27+
proposerRoleLeader
28+
)
29+
30+
type dynamicProposerElection struct {
31+
logger zerolog.Logger
32+
localProposer []byte
33+
initialProposer []byte
34+
stateReader proposerStateReader
35+
leaderFactory, followerFactory func() (raft.Runnable, error)
36+
pollInterval time.Duration
37+
running atomic.Bool
38+
}
39+
40+
func newDynamicProposerElection(
41+
logger zerolog.Logger,
42+
localProposer []byte,
43+
initialProposer []byte,
44+
stateReader proposerStateReader,
45+
leaderFactory func() (raft.Runnable, error),
46+
followerFactory func() (raft.Runnable, error),
47+
pollInterval time.Duration,
48+
) *dynamicProposerElection {
49+
if pollInterval <= 0 {
50+
pollInterval = 300 * time.Millisecond
51+
}
52+
return &dynamicProposerElection{
53+
logger: logger,
54+
localProposer: append([]byte(nil), localProposer...),
55+
initialProposer: append([]byte(nil), initialProposer...),
56+
stateReader: stateReader,
57+
leaderFactory: leaderFactory,
58+
followerFactory: followerFactory,
59+
pollInterval: pollInterval,
60+
}
61+
}
62+
63+
func (d *dynamicProposerElection) Run(ctx context.Context) error {
64+
var wg sync.WaitGroup
65+
var workerCancel context.CancelFunc = func() {}
66+
errCh := make(chan error, 1)
67+
currentRole := proposerRoleFollower
68+
69+
defer func() {
70+
workerCancel()
71+
wg.Wait()
72+
close(errCh)
73+
}()
74+
d.running.Store(true)
75+
defer d.running.Store(false)
76+
77+
startRole := func(role proposerRole) error {
78+
workerCancel()
79+
wg.Wait()
80+
81+
var (
82+
runnable raft.Runnable
83+
err error
84+
name string
85+
)
86+
switch role {
87+
case proposerRoleLeader:
88+
name = "leader"
89+
runnable, err = d.leaderFactory()
90+
case proposerRoleFollower:
91+
name = "follower"
92+
runnable, err = d.followerFactory()
93+
default:
94+
return fmt.Errorf("unknown proposer role: %d", role)
95+
}
96+
if err != nil {
97+
return err
98+
}
99+
100+
workerCtx, cancel := context.WithCancel(ctx)
101+
workerCancel = cancel
102+
currentRole = role
103+
wg.Add(1)
104+
go func() {
105+
defer wg.Done()
106+
if err := runnable.Run(workerCtx); err != nil && !errors.Is(err, context.Canceled) {
107+
select {
108+
case errCh <- fmt.Errorf("%s worker exited unexpectedly: %w", name, err):
109+
default:
110+
}
111+
}
112+
}()
113+
return nil
114+
}
115+
116+
if err := startRole(proposerRoleFollower); err != nil {
117+
return err
118+
}
119+
120+
ticker := time.NewTicker(d.pollInterval)
121+
defer ticker.Stop()
122+
123+
for {
124+
select {
125+
case <-ticker.C:
126+
shouldLead, err := d.shouldLead(ctx)
127+
if err != nil {
128+
d.logger.Debug().Err(err).Msg("could not read local proposer state")
129+
continue
130+
}
131+
if shouldLead && currentRole != proposerRoleLeader {
132+
d.logger.Info().Msg("local signer is next proposer, promoting to aggregator")
133+
if err := startRole(proposerRoleLeader); err != nil {
134+
return err
135+
}
136+
}
137+
if !shouldLead && currentRole != proposerRoleFollower {
138+
d.logger.Info().Msg("local signer is not next proposer, returning to sync mode")
139+
if err := startRole(proposerRoleFollower); err != nil {
140+
return err
141+
}
142+
}
143+
case err := <-errCh:
144+
return err
145+
case <-ctx.Done():
146+
return ctx.Err()
147+
}
148+
}
149+
}
150+
151+
func (d *dynamicProposerElection) IsRunning() bool {
152+
return d.running.Load()
153+
}
154+
155+
func (d *dynamicProposerElection) shouldLead(ctx context.Context) (bool, error) {
156+
state, err := d.stateReader.GetState(ctx)
157+
if err != nil {
158+
if store.IsNotFound(err) {
159+
return len(d.initialProposer) > 0 && bytes.Equal(d.initialProposer, d.localProposer), nil
160+
}
161+
return false, err
162+
}
163+
expectedProposer := state.NextProposerAddress
164+
if len(expectedProposer) == 0 {
165+
expectedProposer = d.initialProposer
166+
}
167+
return len(expectedProposer) > 0 && bytes.Equal(expectedProposer, d.localProposer), nil
168+
}

node/proposer_election_test.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package node
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"github.com/rs/zerolog"
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/evstack/ev-node/pkg/raft"
14+
"github.com/evstack/ev-node/pkg/store"
15+
"github.com/evstack/ev-node/types"
16+
)
17+
18+
type proposerElectionStateReader struct {
19+
mu sync.Mutex
20+
state types.State
21+
err error
22+
}
23+
24+
func (r *proposerElectionStateReader) setNextProposer(addr []byte) {
25+
r.mu.Lock()
26+
defer r.mu.Unlock()
27+
r.state.NextProposerAddress = append([]byte(nil), addr...)
28+
r.err = nil
29+
}
30+
31+
func (r *proposerElectionStateReader) setErr(err error) {
32+
r.mu.Lock()
33+
defer r.mu.Unlock()
34+
r.err = err
35+
}
36+
37+
func (r *proposerElectionStateReader) GetState(context.Context) (types.State, error) {
38+
r.mu.Lock()
39+
defer r.mu.Unlock()
40+
if r.err != nil {
41+
return types.State{}, r.err
42+
}
43+
return r.state, nil
44+
}
45+
46+
type proposerElectionRunnable struct {
47+
name string
48+
started chan<- string
49+
stopped chan<- string
50+
}
51+
52+
func (r proposerElectionRunnable) Run(ctx context.Context) error {
53+
r.started <- r.name
54+
<-ctx.Done()
55+
r.stopped <- r.name
56+
return ctx.Err()
57+
}
58+
59+
func (r proposerElectionRunnable) IsSynced(*raft.RaftBlockState) (int, error) {
60+
return 0, nil
61+
}
62+
63+
func (r proposerElectionRunnable) Recover(context.Context, *raft.RaftBlockState) error {
64+
return nil
65+
}
66+
67+
func TestDynamicProposerElectionPromotesAndDemotesFromLocalState(t *testing.T) {
68+
localProposer := []byte{1, 2, 3}
69+
otherProposer := []byte{9, 8, 7}
70+
stateReader := &proposerElectionStateReader{}
71+
stateReader.setNextProposer(otherProposer)
72+
73+
started := make(chan string, 8)
74+
stopped := make(chan string, 8)
75+
election := newDynamicProposerElection(
76+
zerolog.Nop(),
77+
localProposer,
78+
localProposer,
79+
stateReader,
80+
func() (raft.Runnable, error) {
81+
return proposerElectionRunnable{name: "leader", started: started, stopped: stopped}, nil
82+
},
83+
func() (raft.Runnable, error) {
84+
return proposerElectionRunnable{name: "follower", started: started, stopped: stopped}, nil
85+
},
86+
time.Millisecond,
87+
)
88+
89+
ctx, cancel := context.WithCancel(context.Background())
90+
defer cancel()
91+
errCh := make(chan error, 1)
92+
go func() {
93+
errCh <- election.Run(ctx)
94+
}()
95+
96+
require.Equal(t, "follower", receiveProposerElectionEvent(t, started))
97+
require.True(t, election.IsRunning())
98+
99+
stateReader.setNextProposer(localProposer)
100+
require.Equal(t, "follower", receiveProposerElectionEvent(t, stopped))
101+
require.Equal(t, "leader", receiveProposerElectionEvent(t, started))
102+
103+
stateReader.setNextProposer(otherProposer)
104+
require.Equal(t, "leader", receiveProposerElectionEvent(t, stopped))
105+
require.Equal(t, "follower", receiveProposerElectionEvent(t, started))
106+
107+
cancel()
108+
require.Equal(t, "follower", receiveProposerElectionEvent(t, stopped))
109+
require.ErrorIs(t, receiveProposerElectionError(t, errCh), context.Canceled)
110+
}
111+
112+
func TestDynamicProposerElectionUsesInitialProposerBeforeStateExists(t *testing.T) {
113+
localProposer := []byte{1, 2, 3}
114+
otherProposer := []byte{9, 8, 7}
115+
stateReader := &proposerElectionStateReader{}
116+
stateReader.setErr(store.ErrNotFound)
117+
118+
started := make(chan string, 8)
119+
stopped := make(chan string, 8)
120+
election := newDynamicProposerElection(
121+
zerolog.Nop(),
122+
localProposer,
123+
localProposer,
124+
stateReader,
125+
func() (raft.Runnable, error) {
126+
return proposerElectionRunnable{name: "leader", started: started, stopped: stopped}, nil
127+
},
128+
func() (raft.Runnable, error) {
129+
return proposerElectionRunnable{name: "follower", started: started, stopped: stopped}, nil
130+
},
131+
time.Millisecond,
132+
)
133+
134+
ctx, cancel := context.WithCancel(context.Background())
135+
defer cancel()
136+
errCh := make(chan error, 1)
137+
go func() {
138+
errCh <- election.Run(ctx)
139+
}()
140+
141+
require.Equal(t, "follower", receiveProposerElectionEvent(t, started))
142+
require.Equal(t, "follower", receiveProposerElectionEvent(t, stopped))
143+
require.Equal(t, "leader", receiveProposerElectionEvent(t, started))
144+
145+
stateReader.setNextProposer(otherProposer)
146+
require.Equal(t, "leader", receiveProposerElectionEvent(t, stopped))
147+
require.Equal(t, "follower", receiveProposerElectionEvent(t, started))
148+
149+
cancel()
150+
require.Equal(t, "follower", receiveProposerElectionEvent(t, stopped))
151+
require.ErrorIs(t, receiveProposerElectionError(t, errCh), context.Canceled)
152+
}
153+
154+
func TestDynamicProposerElectionDoesNotUseInitialProposerForUnexpectedStateErrors(t *testing.T) {
155+
localProposer := []byte{1, 2, 3}
156+
stateReader := &proposerElectionStateReader{}
157+
stateReader.setErr(errors.New("state read failed"))
158+
159+
election := newDynamicProposerElection(
160+
zerolog.Nop(),
161+
localProposer,
162+
localProposer,
163+
stateReader,
164+
func() (raft.Runnable, error) {
165+
t.Fatal("leader factory should not be called on unexpected state read errors")
166+
return nil, nil
167+
},
168+
func() (raft.Runnable, error) {
169+
return proposerElectionRunnable{}, nil
170+
},
171+
time.Millisecond,
172+
)
173+
174+
shouldLead, err := election.shouldLead(context.Background())
175+
require.ErrorContains(t, err, "state read failed")
176+
require.False(t, shouldLead)
177+
}
178+
179+
func receiveProposerElectionEvent(t *testing.T, ch <-chan string) string {
180+
t.Helper()
181+
select {
182+
case event := <-ch:
183+
return event
184+
case <-time.After(time.Second):
185+
t.Fatal("timed out waiting for proposer election event")
186+
return ""
187+
}
188+
}
189+
190+
func receiveProposerElectionError(t *testing.T, ch <-chan error) error {
191+
t.Helper()
192+
select {
193+
case err := <-ch:
194+
return err
195+
case <-time.After(time.Second):
196+
t.Fatal("timed out waiting for proposer election error")
197+
return nil
198+
}
199+
}

pkg/cmd/run_node.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ func StartNode(
108108
// Validate and load pkgsigner first (before attempting DA connection, which may fail
109109
// eagerly over WebSocket if no DA server is running).
110110
var signer pkgsigner.Signer
111-
if nodeConfig.Node.Aggregator && !nodeConfig.Node.BasedSequencer {
111+
needsSigner := nodeConfig.Node.Aggregator || nodeConfig.Node.Promotable
112+
if needsSigner && !nodeConfig.Node.BasedSequencer {
112113
passphrase := ""
113114
if nodeConfig.Signer.SignerType == "file" {
114115
passphraseFile, err := cmd.Flags().GetString(rollconf.FlagSignerPassphraseFile)

0 commit comments

Comments
 (0)