Skip to content

Commit a8720e9

Browse files
authored
[+] add leader checker tests (#380)
* add leader checker tests * add etcd leader checker integration tests * add consul leader checker integration tests * add coverage output to .gitignore
1 parent aa647da commit a8720e9

6 files changed

Lines changed: 666 additions & 8 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
*.deb
22
*.rpm
33
*.exe
4+
*.out
45
tmp/
56
.vscode/

checker/consul_leader_checker_test.go

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
package checker
22

33
import (
4+
"context"
5+
"errors"
46
"strings"
57
"testing"
8+
"time"
69

710
"github.com/cybertec-postgresql/vip-manager/vipconfig"
11+
capi "github.com/hashicorp/consul/api"
12+
"github.com/testcontainers/testcontainers-go"
13+
tcconsul "github.com/testcontainers/testcontainers-go/modules/consul"
814
"go.uber.org/zap"
915
)
1016

@@ -55,3 +61,221 @@ func TestNewConsulLeaderChecker_ValidURL(t *testing.T) {
5561
t.Fatal("expected non-nil checker")
5662
}
5763
}
64+
65+
// ---------------------------------------------------------------------------
66+
// Integration tests – require a running Docker daemon
67+
// ---------------------------------------------------------------------------
68+
69+
const consulImage = "hashicorp/consul:1.15"
70+
const consulTestKey = "service/batman/leader"
71+
72+
// startConsulContainer starts a real Consul container and returns the HTTP API
73+
// endpoint (with scheme) and a seed client for writing test data.
74+
// The test is skipped when Docker is not available.
75+
func startConsulContainer(t *testing.T) (endpoint string, seed *capi.Client) {
76+
t.Helper()
77+
ctx := context.Background()
78+
ctr, err := tcconsul.Run(ctx, consulImage)
79+
if err != nil {
80+
t.Skipf("cannot start consul container (Docker may be unavailable): %v", err)
81+
}
82+
testcontainers.CleanupContainer(t, ctr)
83+
84+
host, err := ctr.ApiEndpoint(ctx)
85+
if err != nil {
86+
t.Fatalf("ApiEndpoint: %v", err)
87+
}
88+
89+
cfg := capi.DefaultConfig()
90+
cfg.Address = host
91+
seed, err = capi.NewClient(cfg)
92+
if err != nil {
93+
t.Fatalf("seed api.NewClient: %v", err)
94+
}
95+
return "http://" + host, seed
96+
}
97+
98+
// consulCheckerFor builds a ConsulLeaderChecker with a 1 ms poll interval for
99+
// fast test iteration.
100+
func consulCheckerFor(t *testing.T, endpoint, key, value string) *ConsulLeaderChecker {
101+
t.Helper()
102+
conf := &vipconfig.Config{
103+
Endpoints: []string{endpoint},
104+
TriggerKey: key,
105+
TriggerValue: value,
106+
Interval: 1,
107+
Logger: zap.NewNop(),
108+
}
109+
checker, err := NewConsulLeaderChecker(conf)
110+
if err != nil {
111+
t.Fatalf("NewConsulLeaderChecker: %v", err)
112+
}
113+
return checker
114+
}
115+
116+
// runConsulStream starts GetChangeNotificationStream in a goroutine and
117+
// returns the output channel and a done channel carrying the final error.
118+
// The channel is unbuffered so that, once the test stops reading, `out <-
119+
// state` always blocks and ctx.Done() is the guaranteed winner in the
120+
// production select – preventing spurious extra long-poll cycles after cancel.
121+
func runConsulStream(ctx context.Context, c *ConsulLeaderChecker) (out chan bool, done chan error) {
122+
out = make(chan bool) // unbuffered
123+
done = make(chan error, 1)
124+
go func() { done <- c.GetChangeNotificationStream(ctx, out) }()
125+
return
126+
}
127+
128+
// receiveOne reads one value from out within 3 s or fails the test.
129+
func receiveOne(t *testing.T, out <-chan bool) bool {
130+
t.Helper()
131+
select {
132+
case v := <-out:
133+
return v
134+
case <-time.After(3 * time.Second):
135+
t.Fatal("timed out waiting for stream value")
136+
return false
137+
}
138+
}
139+
140+
// waitDone waits for the stream goroutine to exit within 5 s (must allow for
141+
// the consul WaitTime=1 s long-poll to expire).
142+
func waitDone(t *testing.T, done <-chan error) error {
143+
t.Helper()
144+
select {
145+
case err := <-done:
146+
return err
147+
case <-time.After(5 * time.Second):
148+
t.Fatal("timed out waiting for GetChangeNotificationStream to return")
149+
return nil
150+
}
151+
}
152+
153+
// TestConsulLeaderChecker_GetChangeNotificationStream_KeyAbsent verifies that
154+
// the stream emits false when the watched key is not present in Consul.
155+
// After cancelling, a key is injected so the goroutine can advance past the
156+
// nil-response path (which has no inline ctx check) and exit via the select.
157+
func TestConsulLeaderChecker_GetChangeNotificationStream_KeyAbsent(t *testing.T) {
158+
endpoint, seed := startConsulContainer(t)
159+
checker := consulCheckerFor(t, endpoint, consulTestKey, "primary")
160+
161+
ctx, cancel := context.WithCancel(context.Background())
162+
out, done := runConsulStream(ctx, checker)
163+
164+
if receiveOne(t, out) {
165+
t.Error("expected false for absent key, got true")
166+
}
167+
168+
cancel()
169+
// The nil-response branch has no inline ctx check and writes to `out`
170+
// directly (not in a select). With an unbuffered channel the goroutine may
171+
// therefore be stuck on that write. One extra read unblocks it so it can
172+
// proceed. Injecting a key lets it advance to the resp-not-nil select where
173+
// ctx.Done() fires cleanly.
174+
_, _ = seed.KV().Put(&capi.KVPair{Key: consulTestKey, Value: []byte("any")}, nil)
175+
select {
176+
case <-out: // unblock the goroutine if stuck on a nil-response write
177+
case <-time.After(time.Second): // goroutine already past that point – fine
178+
}
179+
180+
if err := waitDone(t, done); !errors.Is(err, context.Canceled) {
181+
t.Errorf("expected context.Canceled, got %v", err)
182+
}
183+
}
184+
185+
// TestConsulLeaderChecker_GetChangeNotificationStream_MatchingValue verifies
186+
// that the stream emits true when the key value equals TriggerValue.
187+
func TestConsulLeaderChecker_GetChangeNotificationStream_MatchingValue(t *testing.T) {
188+
endpoint, seed := startConsulContainer(t)
189+
if _, err := seed.KV().Put(&capi.KVPair{Key: consulTestKey, Value: []byte("primary")}, nil); err != nil {
190+
t.Fatalf("seed Put: %v", err)
191+
}
192+
checker := consulCheckerFor(t, endpoint, consulTestKey, "primary")
193+
194+
ctx, cancel := context.WithCancel(context.Background())
195+
out, done := runConsulStream(ctx, checker)
196+
197+
if !receiveOne(t, out) {
198+
t.Error("expected true for matching value, got false")
199+
}
200+
201+
cancel()
202+
if err := waitDone(t, done); !errors.Is(err, context.Canceled) {
203+
t.Errorf("expected context.Canceled, got %v", err)
204+
}
205+
}
206+
207+
// TestConsulLeaderChecker_GetChangeNotificationStream_NonMatchingValue verifies
208+
// that the stream emits false when the key value differs from TriggerValue.
209+
func TestConsulLeaderChecker_GetChangeNotificationStream_NonMatchingValue(t *testing.T) {
210+
endpoint, seed := startConsulContainer(t)
211+
if _, err := seed.KV().Put(&capi.KVPair{Key: consulTestKey, Value: []byte("secondary")}, nil); err != nil {
212+
t.Fatalf("seed Put: %v", err)
213+
}
214+
checker := consulCheckerFor(t, endpoint, consulTestKey, "primary")
215+
216+
ctx, cancel := context.WithCancel(context.Background())
217+
out, done := runConsulStream(ctx, checker)
218+
219+
if receiveOne(t, out) {
220+
t.Error("expected false for non-matching value, got true")
221+
}
222+
223+
cancel()
224+
if err := waitDone(t, done); !errors.Is(err, context.Canceled) {
225+
t.Errorf("expected context.Canceled, got %v", err)
226+
}
227+
}
228+
229+
// TestConsulLeaderChecker_GetChangeNotificationStream_KeyChanges verifies that
230+
// the stream picks up a value change via the blocking long-poll.
231+
func TestConsulLeaderChecker_GetChangeNotificationStream_KeyChanges(t *testing.T) {
232+
endpoint, seed := startConsulContainer(t)
233+
if _, err := seed.KV().Put(&capi.KVPair{Key: consulTestKey, Value: []byte("primary")}, nil); err != nil {
234+
t.Fatalf("seed Put: %v", err)
235+
}
236+
checker := consulCheckerFor(t, endpoint, consulTestKey, "primary")
237+
238+
ctx, cancel := context.WithCancel(context.Background())
239+
defer cancel()
240+
out, done := runConsulStream(ctx, checker)
241+
242+
// Initial value: matching → true.
243+
if !receiveOne(t, out) {
244+
t.Error("expected true for initial matching value, got false")
245+
}
246+
247+
// Change the value while the stream is long-polling; the poll unblocks immediately.
248+
if _, err := seed.KV().Put(&capi.KVPair{Key: consulTestKey, Value: []byte("secondary")}, nil); err != nil {
249+
t.Fatalf("update Put: %v", err)
250+
}
251+
252+
// Updated value: non-matching → false.
253+
if receiveOne(t, out) {
254+
t.Error("expected false after key change to non-matching value, got true")
255+
}
256+
257+
cancel()
258+
if err := waitDone(t, done); !errors.Is(err, context.Canceled) {
259+
t.Errorf("expected context.Canceled, got %v", err)
260+
}
261+
}
262+
263+
// TestConsulLeaderChecker_GetChangeNotificationStream_ErrorPath verifies that
264+
// a KV error (unreachable server) causes the stream to emit false and that
265+
// cancelling the context stops it cleanly.
266+
func TestConsulLeaderChecker_GetChangeNotificationStream_ErrorPath(t *testing.T) {
267+
// Port 1 is closed on loopback; the TCP dial fails immediately.
268+
checker := consulCheckerFor(t, "http://127.0.0.1:1", consulTestKey, "primary")
269+
270+
ctx, cancel := context.WithCancel(context.Background())
271+
out, done := runConsulStream(ctx, checker)
272+
273+
if receiveOne(t, out) {
274+
t.Error("expected false on error path, got true")
275+
}
276+
277+
cancel()
278+
if err := waitDone(t, done); !errors.Is(err, context.Canceled) {
279+
t.Errorf("expected context.Canceled, got %v", err)
280+
}
281+
}

0 commit comments

Comments
 (0)