Skip to content

Commit 4736118

Browse files
committed
Sync store; minor test updates
1 parent ab0f35a commit 4736118

8 files changed

Lines changed: 141 additions & 12 deletions

File tree

pkg/store/cached_store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,3 +174,8 @@ func (cs *CachedStore) Close() error {
174174
cs.ClearCache()
175175
return cs.Store.Close()
176176
}
177+
178+
// Sync flushes the underlying store to durable storage.
179+
func (cs *CachedStore) Sync(ctx context.Context) error {
180+
return cs.Store.Sync(ctx)
181+
}

pkg/store/store.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/binary"
77
"errors"
88
"fmt"
9+
"time"
910

1011
ds "github.com/ipfs/go-datastore"
1112
"google.golang.org/protobuf/proto"
@@ -30,7 +31,21 @@ func New(ds ds.Batching) Store {
3031

3132
// Close safely closes underlying data storage, to ensure that data is actually saved.
3233
func (s *DefaultStore) Close() error {
33-
return s.db.Close()
34+
done := make(chan error, 1)
35+
go func() {
36+
syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
37+
defer cancel()
38+
39+
_ = s.Sync(syncCtx)
40+
done <- s.db.Close()
41+
}()
42+
43+
select {
44+
case err := <-done:
45+
return err
46+
case <-time.After(4 * time.Second):
47+
return nil
48+
}
3449
}
3550

3651
// Height returns height of the highest block saved in the Store.

pkg/store/store_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,22 @@ type mockBatch struct {
3535
commitError error
3636
}
3737

38+
type syncingBatchingDatastore struct {
39+
ds.Batching
40+
syncCalled bool
41+
closeCalled bool
42+
}
43+
44+
func (m *syncingBatchingDatastore) Sync(ctx context.Context, key ds.Key) error {
45+
m.syncCalled = true
46+
return m.Batching.Sync(ctx, key)
47+
}
48+
49+
func (m *syncingBatchingDatastore) Close() error {
50+
m.closeCalled = true
51+
return m.Batching.Close()
52+
}
53+
3854
func (m *mockBatchingDatastore) Put(ctx context.Context, key ds.Key, value []byte) error {
3955
if m.putError != nil {
4056
return m.putError
@@ -141,6 +157,20 @@ func TestStoreHeight(t *testing.T) {
141157
}
142158
}
143159

160+
func TestStoreCloseSyncsBeforeClose(t *testing.T) {
161+
t.Parallel()
162+
163+
kv, err := NewTestInMemoryKVStore()
164+
require.NoError(t, err)
165+
166+
mock := &syncingBatchingDatastore{Batching: kv}
167+
s := New(mock)
168+
169+
require.NoError(t, s.Close())
170+
require.True(t, mock.syncCalled)
171+
require.True(t, mock.closeCalled)
172+
}
173+
144174
func TestStoreLoad(t *testing.T) {
145175
t.Parallel()
146176
chainID := "TestStoreLoad"

pkg/store/tracing.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,19 @@ func (t *tracedStore) Close() error {
265265
return t.inner.Close()
266266
}
267267

268+
func (t *tracedStore) Sync(ctx context.Context) error {
269+
ctx, span := t.tracer.Start(ctx, "Store.Sync")
270+
defer span.End()
271+
272+
if err := t.inner.Sync(ctx); err != nil {
273+
span.RecordError(err)
274+
span.SetStatus(codes.Error, err.Error())
275+
return err
276+
}
277+
278+
return nil
279+
}
280+
268281
func (t *tracedStore) NewBatch(ctx context.Context) (Batch, error) {
269282
ctx, span := t.tracer.Start(ctx, "Store.NewBatch")
270283
defer span.End()

pkg/store/tracing_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type tracingMockStore struct {
3232
rollbackFn func(ctx context.Context, height uint64, aggregator bool) error
3333
pruneBlocksFn func(ctx context.Context, height uint64) error
3434
deleteStateAtHeightFn func(ctx context.Context, height uint64) error
35+
syncFn func(ctx context.Context) error
3536
newBatchFn func(ctx context.Context) (Batch, error)
3637
}
3738

@@ -137,6 +138,13 @@ func (m *tracingMockStore) Close() error {
137138
return nil
138139
}
139140

141+
func (m *tracingMockStore) Sync(ctx context.Context) error {
142+
if m.syncFn != nil {
143+
return m.syncFn(ctx)
144+
}
145+
return nil
146+
}
147+
140148
func (m *tracingMockStore) NewBatch(ctx context.Context) (Batch, error) {
141149
if m.newBatchFn != nil {
142150
return m.newBatchFn(ctx)

pkg/store/types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type Store interface {
4646
Metadata
4747
Rollback
4848
Pruner
49+
Syncer
4950

5051
// Close safely closes underlying data storage, to ensure that data is actually saved.
5152
Close() error
@@ -104,3 +105,8 @@ type Pruner interface {
104105
// It does not affect the current state or any states at other heights, allowing for targeted pruning of historical state snapshots.
105106
DeleteStateAtHeight(ctx context.Context, height uint64) error
106107
}
108+
109+
// Syncer flushes buffered store state to durable storage.
110+
type Syncer interface {
111+
Sync(ctx context.Context) error
112+
}

test/e2e/failover_e2e_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func TestLeaseFailoverE2E(t *testing.T) {
9797
proc := setupRaftSequencerNode(t, sut, workDir, "node1", node1RaftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(),
9898
bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), node1P2PListen,
9999
env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL(), true, passphraseFile)
100-
clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, node1P2PListen, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL())
100+
clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, node1P2PListen, node1P2PAddr, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL())
101101
t.Log("Node1 is up")
102102
}()
103103

@@ -106,7 +106,7 @@ func TestLeaseFailoverE2E(t *testing.T) {
106106
t.Log("Starting Node2")
107107
p2pPeers := node1P2PAddr + "," + node3P2PAddr
108108
proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), node2P2PListen, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile)
109-
clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, node2P2PListen, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL())
109+
clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, node2P2PListen, node2P2PAddr, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL())
110110
t.Log("Node2 is up")
111111
}()
112112

@@ -118,7 +118,7 @@ func TestLeaseFailoverE2E(t *testing.T) {
118118
node3RPCListen := fmt.Sprintf("127.0.0.1:%d", mustGetAvailablePort(t))
119119
ethEngineURL := fmt.Sprintf("http://127.0.0.1:%s", fullNode3EnginePort)
120120
proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PListen, ethEngineURL, node3EthAddr, true, passphraseFile)
121-
clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PListen, ethEngineURL, node3EthAddr)
121+
clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PListen, node3P2PAddr, ethEngineURL, node3EthAddr)
122122
t.Log("Node3 is up")
123123
}()
124124

@@ -177,11 +177,11 @@ func TestLeaseFailoverE2E(t *testing.T) {
177177
}
178178
}
179179
oldDetails := clusterNodes.Details(oldLeader)
180-
restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile)
180+
restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pPeerAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile)
181181
t.Log("Restarted old leader to sync with cluster: " + oldLeader)
182182

183183
if IsNodeUp(t, oldDetails.rpcAddr, NodeStartupTimeout) {
184-
clusterNodes.Set(oldLeader, oldDetails.rpcAddr, restartedNodeProcess, oldDetails.ethAddr, oldDetails.raftAddr, "", oldDetails.engineURL, oldDetails.ethAddr)
184+
clusterNodes.Set(oldLeader, oldDetails.rpcAddr, restartedNodeProcess, oldDetails.ethAddr, oldDetails.raftAddr, "", oldDetails.p2pPeerAddr, oldDetails.engineURL, oldDetails.ethAddr)
185185
} else {
186186
t.Log("+++ old leader did not recover on restart. Skipping node verification")
187187
}
@@ -294,7 +294,7 @@ func TestHASequencerRollingRestartE2E(t *testing.T) {
294294
proc := setupRaftSequencerNode(t, sut, workDir, "node1", node1RaftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(),
295295
bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), node1P2PListen,
296296
env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL(), true, passphraseFile)
297-
clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, node1P2PListen, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL())
297+
clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, node1P2PListen, node1P2PAddr, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL())
298298
t.Log("Node1 is up")
299299
}()
300300

@@ -303,7 +303,7 @@ func TestHASequencerRollingRestartE2E(t *testing.T) {
303303
t.Log("Starting Node2")
304304
p2pPeers := node1P2PAddr + "," + node3P2PAddr
305305
proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), node2P2PListen, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile)
306-
clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, node2P2PListen, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL())
306+
clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, node2P2PListen, node2P2PAddr, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL())
307307
t.Log("Node2 is up")
308308
}()
309309

@@ -315,7 +315,7 @@ func TestHASequencerRollingRestartE2E(t *testing.T) {
315315
node3RPCListen := fmt.Sprintf("127.0.0.1:%d", mustGetAvailablePort(t))
316316
ethEngineURL := fmt.Sprintf("http://127.0.0.1:%s", fullNode3EnginePort)
317317
proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PListen, ethEngineURL, node3EthAddr, true, passphraseFile)
318-
clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PListen, ethEngineURL, node3EthAddr)
318+
clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PListen, node3P2PAddr, ethEngineURL, node3EthAddr)
319319
t.Log("Node3 is up")
320320
}()
321321

@@ -401,7 +401,7 @@ func TestHASequencerRollingRestartE2E(t *testing.T) {
401401
nodeDetails.engineURL, nodeDetails.ethAddr, false, passphraseFile)
402402

403403
clusterNodes.Set(nodeName, nodeDetails.rpcAddr, restartedProc, nodeDetails.ethAddr,
404-
nodeDetails.raftAddr, nodeDetails.p2pAddr, nodeDetails.engineURL, nodeDetails.ethAddr)
404+
nodeDetails.raftAddr, nodeDetails.p2pAddr, nodeDetails.p2pPeerAddr, nodeDetails.engineURL, nodeDetails.ethAddr)
405405
}
406406

407407
// Initial restart of all nodes
@@ -861,6 +861,7 @@ type nodeDetails struct {
861861
xRPCClient atomic.Pointer[rpcclient.Client]
862862
running atomic.Bool
863863
p2pAddr string
864+
p2pPeerAddr string
864865
engineURL string
865866
ethURL string
866867
}
@@ -913,10 +914,10 @@ type raftClusterNodes struct {
913914
nodes map[string]*nodeDetails
914915
}
915916

916-
func (c *raftClusterNodes) Set(node string, listen string, proc *os.Process, eth string, raftAddr string, p2pAddr string, engineURL string, ethURL string) {
917+
func (c *raftClusterNodes) Set(node string, listen string, proc *os.Process, eth string, raftAddr string, p2pAddr string, p2pPeerAddr string, engineURL string, ethURL string) {
917918
c.mx.Lock()
918919
defer c.mx.Unlock()
919-
d := &nodeDetails{raftAddr: raftAddr, rpcAddr: listen, process: proc, ethAddr: eth, p2pAddr: p2pAddr, engineURL: engineURL, ethURL: ethURL}
920+
d := &nodeDetails{raftAddr: raftAddr, rpcAddr: listen, process: proc, ethAddr: eth, p2pAddr: p2pAddr, p2pPeerAddr: p2pPeerAddr, engineURL: engineURL, ethURL: ethURL}
920921
d.running.Store(true)
921922
c.nodes[node] = d
922923
}

test/mocks/store.go

Lines changed: 51 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)