Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
*.deb
*.rpm
*.exe
*.out
tmp/
.vscode/
224 changes: 224 additions & 0 deletions checker/consul_leader_checker_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package checker

import (
"context"
"errors"
"strings"
"testing"
"time"

"github.com/cybertec-postgresql/vip-manager/vipconfig"
capi "github.com/hashicorp/consul/api"
"github.com/testcontainers/testcontainers-go"
tcconsul "github.com/testcontainers/testcontainers-go/modules/consul"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -55,3 +61,221 @@ func TestNewConsulLeaderChecker_ValidURL(t *testing.T) {
t.Fatal("expected non-nil checker")
}
}

// ---------------------------------------------------------------------------
// Integration tests – require a running Docker daemon
// ---------------------------------------------------------------------------

const consulImage = "hashicorp/consul:1.15"
const consulTestKey = "service/batman/leader"

// startConsulContainer starts a real Consul container and returns the HTTP API
// endpoint (with scheme) and a seed client for writing test data.
// The test is skipped when Docker is not available.
func startConsulContainer(t *testing.T) (endpoint string, seed *capi.Client) {
t.Helper()
ctx := context.Background()
ctr, err := tcconsul.Run(ctx, consulImage)
if err != nil {
t.Skipf("cannot start consul container (Docker may be unavailable): %v", err)
}
testcontainers.CleanupContainer(t, ctr)

host, err := ctr.ApiEndpoint(ctx)
if err != nil {
t.Fatalf("ApiEndpoint: %v", err)
}

cfg := capi.DefaultConfig()
cfg.Address = host
seed, err = capi.NewClient(cfg)
if err != nil {
t.Fatalf("seed api.NewClient: %v", err)
}
return "http://" + host, seed
}

// consulCheckerFor builds a ConsulLeaderChecker with a 1 ms poll interval for
// fast test iteration.
func consulCheckerFor(t *testing.T, endpoint, key, value string) *ConsulLeaderChecker {
t.Helper()
conf := &vipconfig.Config{
Endpoints: []string{endpoint},
TriggerKey: key,
TriggerValue: value,
Interval: 1,
Logger: zap.NewNop(),
}
checker, err := NewConsulLeaderChecker(conf)
if err != nil {
t.Fatalf("NewConsulLeaderChecker: %v", err)
}
return checker
}

// runConsulStream starts GetChangeNotificationStream in a goroutine and
// returns the output channel and a done channel carrying the final error.
// The channel is unbuffered so that, once the test stops reading, `out <-
// state` always blocks and ctx.Done() is the guaranteed winner in the
// production select – preventing spurious extra long-poll cycles after cancel.
func runConsulStream(ctx context.Context, c *ConsulLeaderChecker) (out chan bool, done chan error) {
out = make(chan bool) // unbuffered
done = make(chan error, 1)
go func() { done <- c.GetChangeNotificationStream(ctx, out) }()
return
}

// receiveOne reads one value from out within 3 s or fails the test.
func receiveOne(t *testing.T, out <-chan bool) bool {
t.Helper()
select {
case v := <-out:
return v
case <-time.After(3 * time.Second):
t.Fatal("timed out waiting for stream value")
return false
}
}

// waitDone waits for the stream goroutine to exit within 5 s (must allow for
// the consul WaitTime=1 s long-poll to expire).
func waitDone(t *testing.T, done <-chan error) error {
t.Helper()
select {
case err := <-done:
return err
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for GetChangeNotificationStream to return")
return nil
}
}

// TestConsulLeaderChecker_GetChangeNotificationStream_KeyAbsent verifies that
// the stream emits false when the watched key is not present in Consul.
// After cancelling, a key is injected so the goroutine can advance past the
// nil-response path (which has no inline ctx check) and exit via the select.
func TestConsulLeaderChecker_GetChangeNotificationStream_KeyAbsent(t *testing.T) {
endpoint, seed := startConsulContainer(t)
checker := consulCheckerFor(t, endpoint, consulTestKey, "primary")

ctx, cancel := context.WithCancel(context.Background())
out, done := runConsulStream(ctx, checker)

if receiveOne(t, out) {
t.Error("expected false for absent key, got true")
}

cancel()
// The nil-response branch has no inline ctx check and writes to `out`
// directly (not in a select). With an unbuffered channel the goroutine may
// therefore be stuck on that write. One extra read unblocks it so it can
// proceed. Injecting a key lets it advance to the resp-not-nil select where
// ctx.Done() fires cleanly.
_, _ = seed.KV().Put(&capi.KVPair{Key: consulTestKey, Value: []byte("any")}, nil)
select {
case <-out: // unblock the goroutine if stuck on a nil-response write
case <-time.After(time.Second): // goroutine already past that point – fine
}

if err := waitDone(t, done); !errors.Is(err, context.Canceled) {
t.Errorf("expected context.Canceled, got %v", err)
}
}

// TestConsulLeaderChecker_GetChangeNotificationStream_MatchingValue verifies
// that the stream emits true when the key value equals TriggerValue.
func TestConsulLeaderChecker_GetChangeNotificationStream_MatchingValue(t *testing.T) {
endpoint, seed := startConsulContainer(t)
if _, err := seed.KV().Put(&capi.KVPair{Key: consulTestKey, Value: []byte("primary")}, nil); err != nil {
t.Fatalf("seed Put: %v", err)
}
checker := consulCheckerFor(t, endpoint, consulTestKey, "primary")

ctx, cancel := context.WithCancel(context.Background())
out, done := runConsulStream(ctx, checker)

if !receiveOne(t, out) {
t.Error("expected true for matching value, got false")
}

cancel()
if err := waitDone(t, done); !errors.Is(err, context.Canceled) {
t.Errorf("expected context.Canceled, got %v", err)
}
}

// TestConsulLeaderChecker_GetChangeNotificationStream_NonMatchingValue verifies
// that the stream emits false when the key value differs from TriggerValue.
func TestConsulLeaderChecker_GetChangeNotificationStream_NonMatchingValue(t *testing.T) {
endpoint, seed := startConsulContainer(t)
if _, err := seed.KV().Put(&capi.KVPair{Key: consulTestKey, Value: []byte("secondary")}, nil); err != nil {
t.Fatalf("seed Put: %v", err)
}
checker := consulCheckerFor(t, endpoint, consulTestKey, "primary")

ctx, cancel := context.WithCancel(context.Background())
out, done := runConsulStream(ctx, checker)

if receiveOne(t, out) {
t.Error("expected false for non-matching value, got true")
}

cancel()
if err := waitDone(t, done); !errors.Is(err, context.Canceled) {
t.Errorf("expected context.Canceled, got %v", err)
}
}

// TestConsulLeaderChecker_GetChangeNotificationStream_KeyChanges verifies that
// the stream picks up a value change via the blocking long-poll.
func TestConsulLeaderChecker_GetChangeNotificationStream_KeyChanges(t *testing.T) {
endpoint, seed := startConsulContainer(t)
if _, err := seed.KV().Put(&capi.KVPair{Key: consulTestKey, Value: []byte("primary")}, nil); err != nil {
t.Fatalf("seed Put: %v", err)
}
checker := consulCheckerFor(t, endpoint, consulTestKey, "primary")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
out, done := runConsulStream(ctx, checker)

// Initial value: matching → true.
if !receiveOne(t, out) {
t.Error("expected true for initial matching value, got false")
}

// Change the value while the stream is long-polling; the poll unblocks immediately.
if _, err := seed.KV().Put(&capi.KVPair{Key: consulTestKey, Value: []byte("secondary")}, nil); err != nil {
t.Fatalf("update Put: %v", err)
}

// Updated value: non-matching → false.
if receiveOne(t, out) {
t.Error("expected false after key change to non-matching value, got true")
}

cancel()
if err := waitDone(t, done); !errors.Is(err, context.Canceled) {
t.Errorf("expected context.Canceled, got %v", err)
}
}

// TestConsulLeaderChecker_GetChangeNotificationStream_ErrorPath verifies that
// a KV error (unreachable server) causes the stream to emit false and that
// cancelling the context stops it cleanly.
func TestConsulLeaderChecker_GetChangeNotificationStream_ErrorPath(t *testing.T) {
// Port 1 is closed on loopback; the TCP dial fails immediately.
checker := consulCheckerFor(t, "http://127.0.0.1:1", consulTestKey, "primary")

ctx, cancel := context.WithCancel(context.Background())
out, done := runConsulStream(ctx, checker)

if receiveOne(t, out) {
t.Error("expected false on error path, got true")
}

cancel()
if err := waitDone(t, done); !errors.Is(err, context.Canceled) {
t.Errorf("expected context.Canceled, got %v", err)
}
}
Loading
Loading