Skip to content

Commit ecc9149

Browse files
committed
test(replication,gossip,ci): expand replication scenarios and add smoke/stress scheduling
Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
1 parent 6a59f75 commit ecc9149

6 files changed

Lines changed: 315 additions & 11 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
name: Tests and Build
22

33
on:
4+
workflow_dispatch:
45
push:
56
branches: [ "*" ]
67
tags: [ "v*" ]
78
pull_request:
89
branches: [ main ]
10+
schedule:
11+
- cron: "0 3 * * *"
912
jobs:
1013
build:
1114
runs-on: ubuntu-latest
@@ -89,6 +92,7 @@ jobs:
8992

9093
integration-tests:
9194
name: Integration tests
95+
if: github.event_name != 'schedule'
9296
runs-on: ubuntu-latest
9397

9498
strategy:
@@ -98,8 +102,6 @@ jobs:
98102
- "TestServer"
99103
- "TestServerReplicated/sync"
100104
- "TestServerReplicated/async"
101-
- "TestReplicationModes/sync"
102-
- "TestReplicationModes/async"
103105

104106
steps:
105107
- name: Checkout
@@ -130,3 +132,71 @@ jobs:
130132
131133
- name: Run integration tests
132134
run: make ci-integration TEST="${{ matrix.test }}"
135+
136+
replication-smoke:
137+
name: Replication smoke
138+
if: github.event_name != 'schedule'
139+
runs-on: ubuntu-latest
140+
141+
steps:
142+
- name: Checkout
143+
uses: actions/checkout@v3
144+
with:
145+
fetch-depth: 0
146+
147+
- name: Set up Go
148+
uses: actions/setup-go@v2
149+
with:
150+
go-version: "1.24"
151+
152+
- name: Configure git for private modules
153+
env:
154+
TOKEN: ${{ secrets.REPOSITORIES_ACCESS_TOKEN }}
155+
USER: ${{ secrets.REPOSITORIES_ACCESS_USER }}
156+
run: git config --global url."https://${USER}:${TOKEN}@github.com".insteadOf "https://github.com"
157+
158+
- name: Share cache with other actions
159+
uses: actions/cache@v3
160+
with:
161+
path: |
162+
~/go/pkg/mod
163+
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
164+
restore-keys: |
165+
${{ runner.os }}-go-
166+
167+
- name: Run replication smoke tests
168+
run: make ci-replication-smoke
169+
170+
replication-stress-nightly:
171+
name: Replication stress nightly
172+
if: github.event_name == 'schedule'
173+
runs-on: ubuntu-latest
174+
175+
steps:
176+
- name: Checkout
177+
uses: actions/checkout@v3
178+
with:
179+
fetch-depth: 0
180+
181+
- name: Set up Go
182+
uses: actions/setup-go@v2
183+
with:
184+
go-version: "1.24"
185+
186+
- name: Configure git for private modules
187+
env:
188+
TOKEN: ${{ secrets.REPOSITORIES_ACCESS_TOKEN }}
189+
USER: ${{ secrets.REPOSITORIES_ACCESS_USER }}
190+
run: git config --global url."https://${USER}:${TOKEN}@github.com".insteadOf "https://github.com"
191+
192+
- name: Share cache with other actions
193+
uses: actions/cache@v3
194+
with:
195+
path: |
196+
~/go/pkg/mod
197+
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
198+
restore-keys: |
199+
${{ runner.os }}-go-
200+
201+
- name: Run replication stress tests
202+
run: make ci-replication-stress

Makefile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,17 @@ ci-fuzz-smoke:
156156
@go test -fuzz=FuzzTokenDecodeNoPanic -fuzztime=5s ./internal/token
157157
@go test -fuzz=FuzzTokenRoundTrip -fuzztime=5s ./internal/token
158158

159+
.PHONY: ci-replication-smoke
160+
ci-replication-smoke:
161+
@go test -v -count 1 -shuffle=on -p 1 -timeout 20m -run '^TestReplicationModes/(async|sync)/(leader_churn_under_writes|follower_offline_catchup_under_load)$$' ./tests
162+
163+
.PHONY: ci-replication-stress
164+
ci-replication-stress:
165+
@go test -v -count 2 -shuffle=on -p 1 -timeout 20m -run '^TestReplicationModes/(async|sync)/leader_churn_under_writes$$' ./tests
166+
@go test -v -count 2 -shuffle=on -p 1 -timeout 20m -run '^TestReplicationModes/(async|sync)/follower_offline_catchup_under_load$$' ./tests
167+
@go test -v -count 1 -shuffle=on -p 1 -timeout 20m -run '^TestReplicationModes/(async|sync)/rolling_restart_no_data_loss$$' ./tests
168+
@go test -v -count 1 -shuffle=on -p 1 -timeout 20m -run '^TestReplicationModes/(async|sync)/delete_propagation_no_resurrection$$' ./tests
169+
159170
.PHONY: ci-integration
160171
ci-integration:
161172
@set -eu; \

internal/badgerd/replication/gossip/nodes_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,26 @@ func TestOnNewLeaderFollowerPath(t *testing.T) {
5555
t.Fatal("expected ready channel to be closed")
5656
}
5757
}
58+
59+
func TestOnNewLeaderEmptyIdentityIgnored(t *testing.T) {
60+
r := &Gossip{
61+
ctx: context.Background(),
62+
name: "self",
63+
leading: NewAtomic(false),
64+
leaderName: NewAtomic("peer"),
65+
ready: make(chan struct{}),
66+
pub: pubsub.NewPublisher[string](time.Second, 2),
67+
}
68+
r.meta.Store(&pb.Meta{LocalVersion: 3})
69+
70+
r.onNewLeader(context.Background(), "")
71+
72+
assert.Equal(t, "peer", r.CurrentLeader())
73+
assert.False(t, r.IsLeader())
74+
assert.True(t, r.HasLeader())
75+
select {
76+
case <-r.ready:
77+
t.Fatal("ready must not close on empty leader")
78+
default:
79+
}
80+
}

internal/badgerd/replication/gossip/service_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"net"
88
"testing"
9+
"time"
910

1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
@@ -69,6 +70,33 @@ func TestReplicatePreconditions(t *testing.T) {
6970
r = &Gossip{leading: NewAtomic(false), name: "self", db: &fakeDB{path: t.TempDir(), maxBatchCount: 10, maxBatchSize: 1 << 20, valueThr: 1024}}
7071
err = r.Replicate(&fakeReplicateSrv{ctx: peerCtx("127.0.0.1", 7000), recvErr: errors.New("recv")})
7172
require.EqualError(t, err, "recv")
73+
74+
r = &Gossip{leading: NewAtomic(false), name: "self", db: &fakeDB{path: t.TempDir(), maxBatchCount: 10, maxBatchSize: 1 << 20, valueThr: 1024}}
75+
err = r.Replicate(&fakeReplicateSrv{ctx: peerCtx("127.0.0.1", 7000), msgs: []*pb.ReplicateRequest{{Ops: []*pb.Op{{ID: 2, Action: &pb.Op_Commit{Commit: &pb.Commit{At: 7}}}}}}})
76+
require.Equal(t, io.EOF, err)
77+
}
78+
79+
func TestInitWaitsForNodeToAppear(t *testing.T) {
80+
called := false
81+
r := &Gossip{
82+
leading: NewAtomic(true),
83+
db: &fakeDB{maxVersion: 5, streamFn: func(_ context.Context, at, since uint64, _ io.Writer) error {
84+
called = true
85+
assert.EqualValues(t, 5, at)
86+
assert.EqualValues(t, 1, since)
87+
return nil
88+
}},
89+
nodes: Map[*node]{},
90+
}
91+
92+
go func() {
93+
time.Sleep(15 * time.Millisecond)
94+
r.nodes.Store("late", &node{name: "late", addr: net.ParseIP("127.0.0.1")})
95+
}()
96+
97+
err := r.Init(&pb.InitRequest{Since: 1}, &fakeInitSrv{ctx: peerCtx("127.0.0.1", 7000)})
98+
require.NoError(t, err)
99+
assert.True(t, called)
72100
}
73101

74102
func TestAliveTable(t *testing.T) {

internal/badgerd/replication/gossip/tx_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,24 @@ func TestTxCommitReplaySetDeleteAndFlush(t *testing.T) {
113113
assert.Equal(t, []byte("k2"), batch.dels[0].key)
114114
assert.EqualValues(t, 42, batch.dels[0].ts)
115115
}
116+
117+
func TestTxSendSyncRemovesFailingPeerAndKeepsHealthy(t *testing.T) {
118+
bad := &stream{n: "bad", s: &fakeReplicateStream{sendErr: errors.New("send")}}
119+
good := &stream{n: "good", s: &fakeReplicateStream{acks: []*pb.Ack{{}}}}
120+
r := &tx{mode: replication.ModeSync, cs: []*stream{bad, good}}
121+
122+
err := r.send(context.Background(), &pb.ReplicateRequest{Ops: []*pb.Op{{Action: &pb.Op_New{New: &pb.New{At: 1}}}}})
123+
require.ErrorContains(t, err, "bad: send")
124+
assert.False(t, r.hasStreams(bad))
125+
assert.True(t, r.hasStreams(good))
126+
}
127+
128+
func TestTxSendAsyncRemovesPeerOnSendFailure(t *testing.T) {
129+
badStream := &fakeReplicateStream{sendErr: errors.New("send")}
130+
bad := &stream{n: "bad", q: async.NewQueue[*pb.ReplicateRequest, *pb.Ack](badStream)}
131+
r := &tx{mode: replication.ModeAsync, cs: []*stream{bad}}
132+
133+
require.NoError(t, r.send(context.Background(), &pb.ReplicateRequest{Ops: []*pb.Op{{Action: &pb.Op_New{New: &pb.New{At: 1}}}}}))
134+
r.swg.Wait()
135+
assert.False(t, r.hasStreams(bad))
136+
}

0 commit comments

Comments
 (0)