diff --git a/CHANGELOG.md b/CHANGELOG.md index ede41c79b..6fef27b9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Added +* New pluggable architecture for connection selection in `ConnectionPool`: + `Store`, `Selector`, `Strategy`, `StrategyBuilder` interfaces (#509). +* New `RoundRobinBuilder` and `ActiveStandbyBuilder` implementations and + `Opts.StrategyBuilder` option for custom strategies (#509). * New types for MessagePack extensions compatible with go-option (#459). * Added `box.MustNew` wrapper for `box.New` without an error (#448). * Added missing IPROTO feature flags to greeting negotiation diff --git a/pool/active_standby_strategy.go b/pool/active_standby_strategy.go new file mode 100644 index 000000000..4d0c56a89 --- /dev/null +++ b/pool/active_standby_strategy.go @@ -0,0 +1,203 @@ +package pool + +import ( + "sync" + "sync/atomic" + + "github.com/tarantool/go-tarantool/v3" +) + +// activeStandbyStrategy implements a strategy with active/standby separation. +// Only a subset of connections (primary) are active and receive traffic. +// Standby connections are promoted when active ones are removed. +type activeStandbyStrategy struct { + primaryCount int + + // All connections + conns []*tarantool.Connection + indexById map[string]uint + + // Active connections for round-robin + activeConns []*tarantool.Connection + activeIndex map[string]uint + + mutex sync.RWMutex + current uint64 +} + +// newActiveStandbyStrategy creates a new active/standby strategy. +// primaryCount is the maximum number of active connections. +// expectedSize is used for pre-allocation. +func newActiveStandbyStrategy(primaryCount int, expectedSize int) *activeStandbyStrategy { + return &activeStandbyStrategy{ + primaryCount: primaryCount, + conns: make([]*tarantool.Connection, 0, expectedSize), + indexById: make(map[string]uint, expectedSize), + activeConns: make([]*tarantool.Connection, 0, primaryCount), + activeIndex: make(map[string]uint, primaryCount), + } +} + +// Add adds or updates a connection with the given ID (upsert). +func (s *activeStandbyStrategy) Add(id string, conn *tarantool.Connection) { + s.mutex.Lock() + defer s.mutex.Unlock() + + if idx, exists := s.indexById[id]; exists { + s.conns[idx] = conn + if activeIdx, active := s.activeIndex[id]; active { + s.activeConns[activeIdx] = conn + } + return + } + + s.indexById[id] = uint(len(s.conns)) + s.conns = append(s.conns, conn) + + if len(s.activeConns) < s.primaryCount { + s.addToActiveLocked(id) + } +} + +// Get returns a connection by ID. +func (s *activeStandbyStrategy) Get(id string) *tarantool.Connection { + s.mutex.RLock() + defer s.mutex.RUnlock() + + index, exists := s.indexById[id] + if !exists { + return nil + } + + return s.conns[index] +} + +// Remove removes a connection by ID. +func (s *activeStandbyStrategy) Remove(id string) *tarantool.Connection { + s.mutex.Lock() + defer s.mutex.Unlock() + + index, exists := s.indexById[id] + if !exists { + return nil + } + + conn := s.conns[index] + + if _, active := s.activeIndex[id]; active { + s.removeFromActiveLocked(id) + } + + delete(s.indexById, id) + s.conns = append(s.conns[:index], s.conns[index+1:]...) + + for id, idx := range s.indexById { + if idx > index { + s.indexById[id] = idx - 1 + } + } + + s.promoteStandbyLocked() + + return conn +} + +// Next returns the next active connection in round-robin order. +func (s *activeStandbyStrategy) Next() *tarantool.Connection { + s.mutex.RLock() + defer s.mutex.RUnlock() + + if len(s.activeConns) == 0 { + return nil + } + + return s.activeConns[s.nextIndex()] +} + +func (s *activeStandbyStrategy) nextIndex() uint64 { + next := atomic.AddUint64(&s.current, 1) + return (next - 1) % uint64(len(s.activeConns)) +} + +// Connections returns eligible connections (active only). +func (s *activeStandbyStrategy) Connections() map[string]*tarantool.Connection { + s.mutex.RLock() + defer s.mutex.RUnlock() + + result := make(map[string]*tarantool.Connection, len(s.activeIndex)) + for id, idx := range s.activeIndex { + result[id] = s.activeConns[idx] + } + return result +} + +func (s *activeStandbyStrategy) addToActiveLocked(id string) { + if _, active := s.activeIndex[id]; active { + return + } + + index, exists := s.indexById[id] + if !exists { + return + } + + s.activeIndex[id] = uint(len(s.activeConns)) + s.activeConns = append(s.activeConns, s.conns[index]) +} + +func (s *activeStandbyStrategy) removeFromActiveLocked(id string) { + index, active := s.activeIndex[id] + if !active { + return + } + + delete(s.activeIndex, id) + s.activeConns = append(s.activeConns[:index], s.activeConns[index+1:]...) + + for id, idx := range s.activeIndex { + if idx > index { + s.activeIndex[id] = idx - 1 + } + } +} + +func (s *activeStandbyStrategy) promoteStandbyLocked() { + for len(s.activeConns) < s.primaryCount { + promoted := false + for id := range s.indexById { + if _, active := s.activeIndex[id]; active { + continue + } + + s.addToActiveLocked(id) + promoted = true + break + } + + if !promoted { + break + } + } +} + +// StandbyCount returns the number of standby connections. +func (s *activeStandbyStrategy) StandbyCount() int { + s.mutex.RLock() + defer s.mutex.RUnlock() + return len(s.conns) - len(s.activeConns) +} + +// IsActive checks if a connection is active. +func (s *activeStandbyStrategy) IsActive(id string) bool { + s.mutex.RLock() + defer s.mutex.RUnlock() + _, active := s.activeIndex[id] + return active +} + +// Len returns the number of active connections managed by this strategy. +func (s *activeStandbyStrategy) Len() int { + s.mutex.RLock() + defer s.mutex.RUnlock() + return len(s.activeConns) +} diff --git a/pool/connection_pool.go b/pool/connection_pool.go index cb7b530ec..d47da590c 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -81,6 +81,9 @@ type Opts struct { CheckTimeout time.Duration // ConnectionHandler provides an ability to handle connection updates. ConnectionHandler ConnectionHandler + // StrategyBuilder creates strategies for connection selection. + // If nil, RoundRobinBuilder is used by default. + StrategyBuilder StrategyBuilder } /* @@ -111,9 +114,8 @@ type ConnectionPool struct { state state done chan struct{} - roPool *roundRobinStrategy - rwPool *roundRobinStrategy - anyPool *roundRobinStrategy + store *Store + selector *selector poolsMutex sync.RWMutex watcherContainer watcherContainer } @@ -167,18 +169,24 @@ func ConnectWithOpts(ctx context.Context, instances []Instance, } size := len(instances) - rwPool := newRoundRobinStrategy(size) - roPool := newRoundRobinStrategy(size) - anyPool := newRoundRobinStrategy(size) + + store := NewStore() + + builder := opts.StrategyBuilder + if builder == nil { + builder = RoundRobinBuilder{} + } + + rwStrategy, roStrategy, anyStrategy := builder.Build(size) + selector := newSelector(store, rwStrategy, roStrategy, anyStrategy) p := &ConnectionPool{ - ends: make(map[string]*endpoint), - opts: opts, - state: connectedState, - done: make(chan struct{}), - rwPool: rwPool, - roPool: roPool, - anyPool: anyPool, + ends: make(map[string]*endpoint), + opts: opts, + state: connectedState, + done: make(chan struct{}), + store: store, + selector: selector, } fillCtx, fillCancel := context.WithCancel(ctx) @@ -242,20 +250,7 @@ func (p *ConnectionPool) ConnectedNow(mode Mode) (bool, error) { if p.state.get() != connectedState { return false, nil } - switch mode { - case ANY: - return !p.anyPool.IsEmpty(), nil - case RW: - return !p.rwPool.IsEmpty(), nil - case RO: - return !p.roPool.IsEmpty(), nil - case PreferRW: - fallthrough - case PreferRO: - return !p.rwPool.IsEmpty() || !p.roPool.IsEmpty(), nil - default: - return false, ErrNoHealthyInstance - } + return !p.selector.IsEmpty(mode), nil } // ConfiguredTimeout gets timeout of current connection. @@ -498,14 +493,8 @@ func (p *ConnectionPool) NewWatcher(key string, watcher.container.add(watcher) - rr := p.anyPool - if mode == RW { - rr = p.rwPool - } else if mode == RO { - rr = p.roPool - } + conns := p.selector.ConnectionsByMode(mode) - conns := rr.GetConnections() for _, conn := range conns { // Check that connection supports watchers. if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS, conn.ProtocolInfo().Features) { @@ -524,7 +513,7 @@ func (p *ConnectionPool) NewWatcher(key string, // the argument of type Mode is unused. func (p *ConnectionPool) Do(req tarantool.Request, userMode Mode) tarantool.Future { if connectedReq, ok := req.(tarantool.ConnectedRequest); ok { - conns := p.anyPool.GetConnections() + conns := p.selector.Connections() isOurConnection := false for _, conn := range conns { // Compare raw pointers. @@ -548,7 +537,7 @@ func (p *ConnectionPool) Do(req tarantool.Request, userMode Mode) tarantool.Futu // DoInstance sends the request into a target instance and returns a future. func (p *ConnectionPool) DoInstance(req tarantool.Request, name string) tarantool.Future { - conn := p.anyPool.GetConnection(name) + conn := p.selector.Get(name) if conn == nil { return tarantool.NewFutureWithErr(nil, ErrNoHealthyInstance) } @@ -611,31 +600,28 @@ func (p *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (Role, er } func (p *ConnectionPool) getConnectionFromPool(name string) (*tarantool.Connection, Role) { - if conn := p.rwPool.GetConnection(name); conn != nil { - return conn, MasterRole - } - - if conn := p.roPool.GetConnection(name); conn != nil { - return conn, ReplicaRole + entry, ok := p.store.Get(name) + if !ok { + return nil, UnknownRole } - - return p.anyPool.GetConnection(name), UnknownRole + return entry.Conn, entry.Role } func (p *ConnectionPool) deleteConnection(name string) { - if conn := p.anyPool.DeleteConnection(name); conn != nil { - if conn := p.rwPool.DeleteConnection(name); conn == nil { - p.roPool.DeleteConnection(name) - } - // The internal connection deinitialization. - p.watcherContainer.mutex.RLock() - defer p.watcherContainer.mutex.RUnlock() - - _ = p.watcherContainer.foreach(func(watcher *poolWatcher) error { - watcher.unwatch(conn) - return nil - }) + entry, ok := p.store.Remove(name) + if !ok { + return } + conn := entry.Conn + + // The internal connection deinitialization. + p.watcherContainer.mutex.RLock() + defer p.watcherContainer.mutex.RUnlock() + + _ = p.watcherContainer.foreach(func(watcher *poolWatcher) error { + watcher.unwatch(conn) + return nil + }) } func (p *ConnectionPool) addConnection(name string, @@ -673,14 +659,8 @@ func (p *ConnectionPool) addConnection(name string, } } - p.anyPool.AddConnection(name, conn) + p.store.Upsert(name, conn, role) - switch role { - case MasterRole: - p.rwPool.AddConnection(name, conn) - case ReplicaRole: - p.roPool.AddConnection(name, conn) - } return nil } @@ -750,39 +730,26 @@ func (p *ConnectionPool) updateConnection(e *endpoint) { if role, err := p.getConnectionRole(e.conn); err == nil { if e.role != role { - p.deleteConnection(e.name) + oldRole := e.role + p.store.UpdateRole(e.name, role) + + // Update watchers for the connection. + p.updateConnectionWatchers(e.conn, oldRole, role) + p.poolsMutex.Unlock() - p.handlerDeactivated(e.name, e.conn, e.role) + p.handlerDeactivated(e.name, e.conn, oldRole) opened := p.handlerDiscovered(e.name, e.conn, role) if !opened { _ = e.conn.Close() + p.store.Remove(e.name) e.conn = nil e.role = UnknownRole return } - p.poolsMutex.Lock() - if p.state.get() != connectedState { - p.poolsMutex.Unlock() - - _ = e.conn.Close() - p.handlerDeactivated(e.name, e.conn, role) - e.conn = nil - e.role = UnknownRole - return - } - - if p.addConnection(e.name, e.conn, role) != nil { - p.poolsMutex.Unlock() - - _ = e.conn.Close() - p.handlerDeactivated(e.name, e.conn, role) - e.conn = nil - e.role = UnknownRole - return - } e.role = role + return } p.poolsMutex.Unlock() return @@ -798,6 +765,44 @@ func (p *ConnectionPool) updateConnection(e *endpoint) { } } +// updateConnectionWatchers updates watchers when connection role changes. +func (p *ConnectionPool) updateConnectionWatchers(conn *tarantool.Connection, + oldRole, newRole Role) { + // Check if connection supports watchers + if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS, conn.ProtocolInfo().Features) { + return + } + + p.watcherContainer.mutex.RLock() + defer p.watcherContainer.mutex.RUnlock() + + _ = p.watcherContainer.foreach(func(watcher *poolWatcher) error { + var wasWatching, shouldWatch bool + + switch watcher.mode { + case RW: + wasWatching = oldRole == MasterRole + shouldWatch = newRole == MasterRole + case RO: + wasWatching = oldRole == ReplicaRole + shouldWatch = newRole == ReplicaRole + default: // ANY, PreferRW, PreferRO + wasWatching = oldRole == MasterRole || oldRole == ReplicaRole + shouldWatch = newRole == MasterRole || newRole == ReplicaRole + } + + if wasWatching && !shouldWatch { + watcher.unwatch(conn) + } else if !wasWatching && shouldWatch { + if err := watcher.watch(conn); err != nil { + log.Printf("tarantool: failed to watch for %s after role change: %s", conn.Addr().String(), err) + } + } + + return nil + }) +} + func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error { e.conn = nil e.role = UnknownRole @@ -856,26 +861,6 @@ func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error { return err } -func (p *ConnectionPool) reconnect(ctx context.Context, e *endpoint) { - p.poolsMutex.Lock() - - if p.state.get() != connectedState { - p.poolsMutex.Unlock() - return - } - - p.deleteConnection(e.name) - p.poolsMutex.Unlock() - - p.handlerDeactivated(e.name, e.conn, e.role) - e.conn = nil - e.role = UnknownRole - - if err := p.tryConnect(ctx, e); err != nil { - log.Printf("tarantool: reconnect to %s failed: %s\n", e.name, err) - } -} - func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) { timer := time.NewTicker(p.opts.CheckTimeout) defer timer.Stop() @@ -943,33 +928,17 @@ func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) { // Will be processed at an upper level. case <-e.shutdown: // Will be processed at an upper level. - case <-e.notify: - if e.conn != nil && e.conn.ClosedNow() { - p.poolsMutex.Lock() - if p.state.get() == connectedState { - p.deleteConnection(e.name) - p.poolsMutex.Unlock() - p.handlerDeactivated(e.name, e.conn, e.role) - e.conn = nil - e.role = UnknownRole - } else { - p.poolsMutex.Unlock() - } - } + case event := <-e.notify: + p.handleConnEvent(e, event) case <-timer.C: - // Reopen connection. - // Relocate connection between subpools - // if ro/rw was updated. - switch { - case e.conn == nil: + // Check for role updates and reconnect if needed. + if e.conn == nil { + // Try to reconnect if connection is nil. if err := p.tryConnect(ctx, e); err != nil { - log.Printf("tarantool: reopen connection to %s failed: %s\n", - e.name, err) + log.Printf("tarantool: reconnect to %s failed: %s\n", e.name, err) } - case !e.conn.ClosedNow(): + } else if !e.conn.ClosedNow() { p.updateConnection(e) - default: - p.reconnect(ctx, e) } } } @@ -977,38 +946,46 @@ func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) { } } -func (p *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connection, error) { - switch mode { - case ANY: - if next := p.anyPool.GetNextConnection(); next != nil { - return next, nil - } - case RW: - if next := p.rwPool.GetNextConnection(); next != nil { - return next, nil - } - return nil, ErrNoRwInstance - case RO: - if next := p.roPool.GetNextConnection(); next != nil { - return next, nil - } - return nil, ErrNoRoInstance - case PreferRW: - if next := p.rwPool.GetNextConnection(); next != nil { - return next, nil +// handleConnEvent processes connection events from Notify channel. +func (p *ConnectionPool) handleConnEvent(e *endpoint, event tarantool.ConnEvent) { + switch event.Kind { + case tarantool.Disconnected: + p.poolsMutex.Lock() + if p.state.get() == connectedState { + p.store.UpdateHealth(e.name, false) } - if next := p.roPool.GetNextConnection(); next != nil { - return next, nil + p.poolsMutex.Unlock() + + case tarantool.ReconnectFailed: + p.poolsMutex.Lock() + if p.state.get() == connectedState { + p.store.UpdateHealth(e.name, false) } - case PreferRO: - if next := p.roPool.GetNextConnection(); next != nil { - return next, nil + p.poolsMutex.Unlock() + + case tarantool.Connected: + p.poolsMutex.Lock() + if p.state.get() == connectedState { + p.store.UpdateHealth(e.name, true) } - if next := p.rwPool.GetNextConnection(); next != nil { - return next, nil + p.poolsMutex.Unlock() + + case tarantool.Closed: + p.poolsMutex.Lock() + if p.state.get() == connectedState && e.conn != nil { + p.deleteConnection(e.name) + p.poolsMutex.Unlock() + p.handlerDeactivated(e.name, e.conn, e.role) + e.conn = nil + e.role = UnknownRole + } else { + p.poolsMutex.Unlock() } } - return nil, ErrNoHealthyInstance +} + +func (p *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connection, error) { + return p.selector.Select(mode) } func isFeatureInSlice(expected iproto.Feature, actualSlice []iproto.Feature) bool { diff --git a/pool/round_robin.go b/pool/round_robin.go deleted file mode 100644 index f3ccb014c..000000000 --- a/pool/round_robin.go +++ /dev/null @@ -1,112 +0,0 @@ -package pool - -import ( - "sync" - "sync/atomic" - - "github.com/tarantool/go-tarantool/v3" -) - -type roundRobinStrategy struct { - conns []*tarantool.Connection - indexById map[string]uint - mutex sync.RWMutex - size uint64 - current uint64 -} - -func newRoundRobinStrategy(size int) *roundRobinStrategy { - return &roundRobinStrategy{ - conns: make([]*tarantool.Connection, 0, size), - indexById: make(map[string]uint, size), - size: 0, - current: 0, - } -} - -func (r *roundRobinStrategy) GetConnection(id string) *tarantool.Connection { - r.mutex.RLock() - defer r.mutex.RUnlock() - - index, found := r.indexById[id] - if !found { - return nil - } - - return r.conns[index] -} - -func (r *roundRobinStrategy) DeleteConnection(id string) *tarantool.Connection { - r.mutex.Lock() - defer r.mutex.Unlock() - - if r.size == 0 { - return nil - } - - index, found := r.indexById[id] - if !found { - return nil - } - - delete(r.indexById, id) - - conn := r.conns[index] - r.conns = append(r.conns[:index], r.conns[index+1:]...) - r.size -= 1 - - for k, v := range r.indexById { - if v > index { - r.indexById[k] = v - 1 - } - } - - return conn -} - -func (r *roundRobinStrategy) IsEmpty() bool { - r.mutex.RLock() - defer r.mutex.RUnlock() - - return r.size == 0 -} - -func (r *roundRobinStrategy) GetNextConnection() *tarantool.Connection { - r.mutex.RLock() - defer r.mutex.RUnlock() - - if r.size == 0 { - return nil - } - return r.conns[r.nextIndex()] -} - -func (r *roundRobinStrategy) GetConnections() map[string]*tarantool.Connection { - r.mutex.RLock() - defer r.mutex.RUnlock() - - conns := map[string]*tarantool.Connection{} - for id, index := range r.indexById { - conns[id] = r.conns[index] - } - - return conns -} - -func (r *roundRobinStrategy) AddConnection(id string, conn *tarantool.Connection) { - r.mutex.Lock() - defer r.mutex.Unlock() - - if idx, ok := r.indexById[id]; ok { - r.conns[idx] = conn - } else { - r.conns = append(r.conns, conn) - r.indexById[id] = uint(r.size) - r.size += 1 - } -} - -func (r *roundRobinStrategy) nextIndex() uint64 { - next := atomic.AddUint64(&r.current, 1) - return (next - 1) % r.size -} diff --git a/pool/round_robin_strategy.go b/pool/round_robin_strategy.go new file mode 100644 index 000000000..e09e7cb88 --- /dev/null +++ b/pool/round_robin_strategy.go @@ -0,0 +1,113 @@ +package pool + +import ( + "sync" + "sync/atomic" + + "github.com/tarantool/go-tarantool/v3" +) + +// roundRobinStrategy implements a round-robin connection selection strategy. +// All connections are active and selected in round-robin order. +type roundRobinStrategy struct { + conns []*tarantool.Connection + indexById map[string]uint + mutex sync.RWMutex + current uint64 +} + +// newRoundRobinStrategy creates a new round-robin strategy. +func newRoundRobinStrategy(expectedSize int) *roundRobinStrategy { + return &roundRobinStrategy{ + conns: make([]*tarantool.Connection, 0, expectedSize), + indexById: make(map[string]uint, expectedSize), + current: 0, + } +} + +// Add adds or updates a connection with the given ID (upsert). +func (s *roundRobinStrategy) Add(id string, conn *tarantool.Connection) { + s.mutex.Lock() + defer s.mutex.Unlock() + + if idx, exists := s.indexById[id]; exists { + s.conns[idx] = conn // Update existing + return + } + + s.indexById[id] = uint(len(s.conns)) + s.conns = append(s.conns, conn) +} + +// Get returns a connection by ID. +func (s *roundRobinStrategy) Get(id string) *tarantool.Connection { + s.mutex.RLock() + defer s.mutex.RUnlock() + + index, exists := s.indexById[id] + if !exists { + return nil + } + + return s.conns[index] +} + +// Remove removes a connection by ID. +func (s *roundRobinStrategy) Remove(id string) *tarantool.Connection { + s.mutex.Lock() + defer s.mutex.Unlock() + + index, exists := s.indexById[id] + if !exists { + return nil + } + + delete(s.indexById, id) + + conn := s.conns[index] + s.conns = append(s.conns[:index], s.conns[index+1:]...) + + for id, idx := range s.indexById { + if idx > index { + s.indexById[id] = idx - 1 + } + } + + return conn +} + +// Next returns the next connection in round-robin order. +func (s *roundRobinStrategy) Next() *tarantool.Connection { + s.mutex.RLock() + defer s.mutex.RUnlock() + + if len(s.conns) == 0 { + return nil + } + + return s.conns[s.nextIndex()] +} + +// Connections returns a map of all connections by their ID. +func (s *roundRobinStrategy) Connections() map[string]*tarantool.Connection { + s.mutex.RLock() + defer s.mutex.RUnlock() + + result := make(map[string]*tarantool.Connection, len(s.indexById)) + for id, index := range s.indexById { + result[id] = s.conns[index] + } + return result +} + +// Len returns the number of connections managed by this strategy. +func (s *roundRobinStrategy) Len() int { + s.mutex.RLock() + defer s.mutex.RUnlock() + return len(s.conns) +} + +func (s *roundRobinStrategy) nextIndex() uint64 { + next := atomic.AddUint64(&s.current, 1) + return (next - 1) % uint64(len(s.conns)) +} diff --git a/pool/round_robin_test.go b/pool/round_robin_test.go deleted file mode 100644 index dcc219fd4..000000000 --- a/pool/round_robin_test.go +++ /dev/null @@ -1,90 +0,0 @@ -package pool - -import ( - "testing" - - "github.com/tarantool/go-tarantool/v3" -) - -const ( - validAddr1 = "x" - validAddr2 = "y" -) - -func TestRoundRobinAddDelete(t *testing.T) { - rr := newRoundRobinStrategy(10) - - addrs := []string{validAddr1, validAddr2} - conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} - - for i, addr := range addrs { - rr.AddConnection(addr, conns[i]) - } - - for i, addr := range addrs { - if conn := rr.DeleteConnection(addr); conn != conns[i] { - t.Errorf("Unexpected connection on address %s", addr) - } - } - if !rr.IsEmpty() { - t.Errorf("RoundRobin does not empty") - } -} - -func TestRoundRobinAddDuplicateDelete(t *testing.T) { - rr := newRoundRobinStrategy(10) - - conn1 := &tarantool.Connection{} - conn2 := &tarantool.Connection{} - - rr.AddConnection(validAddr1, conn1) - rr.AddConnection(validAddr1, conn2) - - if rr.DeleteConnection(validAddr1) != conn2 { - t.Errorf("Unexpected deleted connection") - } - if !rr.IsEmpty() { - t.Errorf("RoundRobin does not empty") - } - if rr.DeleteConnection(validAddr1) != nil { - t.Errorf("Unexpected value after second deletion") - } -} - -func TestRoundRobinGetNextConnection(t *testing.T) { - rr := newRoundRobinStrategy(10) - - addrs := []string{validAddr1, validAddr2} - conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} - - for i, addr := range addrs { - rr.AddConnection(addr, conns[i]) - } - - expectedConns := []*tarantool.Connection{conns[0], conns[1], conns[0], conns[1]} - for i, expected := range expectedConns { - if rr.GetNextConnection() != expected { - t.Errorf("Unexpected connection on %d call", i) - } - } -} - -func TestRoundRobinStrategy_GetConnections(t *testing.T) { - rr := newRoundRobinStrategy(10) - - addrs := []string{validAddr1, validAddr2} - conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} - - for i, addr := range addrs { - rr.AddConnection(addr, conns[i]) - } - - rr.GetConnections()[validAddr2] = conns[0] // GetConnections() returns a copy. - rrConns := rr.GetConnections() - - for i, addr := range addrs { - if conns[i] != rrConns[addr] { - t.Errorf("Unexpected connection on %s addr", addr) - } - } -} diff --git a/pool/selector.go b/pool/selector.go new file mode 100644 index 000000000..ece63ebdc --- /dev/null +++ b/pool/selector.go @@ -0,0 +1,166 @@ +package pool + +import ( + "github.com/tarantool/go-tarantool/v3" +) + +// selector routes connections to appropriate strategies based on role. +// It observes the store for changes and updates strategies accordingly. +type selector struct { + rwStrategy Strategy + roStrategy Strategy + anyStrategy Strategy +} + +// newSelector creates a new selector. +func newSelector( + store *Store, + rwStrategy Strategy, + roStrategy Strategy, + anyStrategy Strategy, +) *selector { + s := &selector{ + rwStrategy: rwStrategy, + roStrategy: roStrategy, + anyStrategy: anyStrategy, + } + + store.AddObserver(s) + + return s +} + +// OnUpsert implements StoreObserver. +func (s *selector) OnUpsert(name string, curr Entry, prev Entry, existed bool) { + applyOne(s.anyStrategy, name, curr, prev, existed, eligibleAny) + applyOne(s.rwStrategy, name, curr, prev, existed, eligibleRW) + applyOne(s.roStrategy, name, curr, prev, existed, eligibleRO) +} + +// OnRemove implements StoreObserver. +func (s *selector) OnRemove(name string) { + s.anyStrategy.Remove(name) + s.rwStrategy.Remove(name) + s.roStrategy.Remove(name) +} + +// Select returns a connection based on the mode. +func (s *selector) Select(mode Mode) (*tarantool.Connection, error) { + switch mode { + case RW: + if conn := s.rwStrategy.Next(); conn != nil { + return conn, nil + } + return nil, ErrNoRwInstance + + case RO: + if conn := s.roStrategy.Next(); conn != nil { + return conn, nil + } + return nil, ErrNoRoInstance + + case PreferRW: + if conn := s.rwStrategy.Next(); conn != nil { + return conn, nil + } + if conn := s.roStrategy.Next(); conn != nil { + return conn, nil + } + return nil, ErrNoHealthyInstance + + case PreferRO: + if conn := s.roStrategy.Next(); conn != nil { + return conn, nil + } + if conn := s.rwStrategy.Next(); conn != nil { + return conn, nil + } + return nil, ErrNoHealthyInstance + + default: // ANY + if conn := s.anyStrategy.Next(); conn != nil { + return conn, nil + } + return nil, ErrNoHealthyInstance + } +} + +// Get returns a connection by name. +func (s *selector) Get(name string) *tarantool.Connection { + return s.anyStrategy.Get(name) +} + +// Connections returns all eligible connections. +func (s *selector) Connections() map[string]*tarantool.Connection { + return s.anyStrategy.Connections() +} + +// ConnectionsByMode returns connections filtered by mode. +func (s *selector) ConnectionsByMode(mode Mode) map[string]*tarantool.Connection { + switch mode { + case RW: + return s.rwStrategy.Connections() + case RO: + return s.roStrategy.Connections() + default: + return s.anyStrategy.Connections() + } +} + +// IsEmpty checks if there are no connections for the given mode. +func (s *selector) IsEmpty(mode Mode) bool { + switch mode { + case ANY: + return s.anyStrategy.Len() == 0 + case RW: + return s.rwStrategy.Len() == 0 + case RO: + return s.roStrategy.Len() == 0 + case PreferRW, PreferRO: + return s.rwStrategy.Len() == 0 && s.roStrategy.Len() == 0 + default: + return true + } +} + +func eligibleAny(e Entry) bool { + return e.Conn != nil && e.healthy +} + +func eligibleRW(e Entry) bool { + return e.Conn != nil && e.healthy && e.Role == MasterRole +} + +func eligibleRO(e Entry) bool { + return e.Conn != nil && e.healthy && e.Role == ReplicaRole +} + +func applyOne( + strategy Strategy, + name string, + curr, prev Entry, + existed bool, + eligible func(Entry) bool, +) { + newOk := eligible(curr) + oldOk := existed && eligible(prev) + + switch { + case !oldOk && newOk: + // became eligible => add + strategy.Add(name, curr.Conn) + + case oldOk && !newOk: + // became ineligible => remove + strategy.Remove(name) + + case oldOk && newOk: + // still eligible; if conn replaced => update pointer + if prev.Conn != curr.Conn { + strategy.Add(name, curr.Conn) + } + + default: + // still ineligible => do nothing + } +} diff --git a/pool/store.go b/pool/store.go new file mode 100644 index 000000000..c170baca0 --- /dev/null +++ b/pool/store.go @@ -0,0 +1,190 @@ +package pool + +import ( + "sync" + + "github.com/tarantool/go-tarantool/v3" +) + +// Entry represents a connection with its metadata. +type Entry struct { + Conn *tarantool.Connection + Role Role + healthy bool +} + +// StoreObserver is notified when store changes. +type StoreObserver interface { + // OnUpsert is called when a connection is added or updated. + // name is the connection name. + // curr is the current entry, prev is the previous entry (if existed). + // existed is true if the entry already existed. + OnUpsert(name string, curr Entry, prev Entry, existed bool) + + // OnRemove is called when a connection is removed. + // name is the connection name. + OnRemove(name string) +} + +// Store is a thread-safe storage for connection entries. +// It notifies observers about changes via StoreObserver interface. +type Store struct { + mu sync.RWMutex + entries map[string]Entry + observers []StoreObserver +} + +// NewStore creates a new Store. +func NewStore() *Store { + return &Store{ + entries: make(map[string]Entry), + } +} + +// AddObserver registers an observer. +// Observers are added only during initialization and never removed. +func (s *Store) AddObserver(observer StoreObserver) { + s.observers = append(s.observers, observer) +} + +// Get returns an entry by name. +func (s *Store) Get(name string) (Entry, bool) { + s.mu.RLock() + e, ok := s.entries[name] + s.mu.RUnlock() + return e, ok +} + +// Upsert adds or updates a connection entry. +// It notifies observers with the current and previous entry. +func (s *Store) Upsert(name string, conn *tarantool.Connection, role Role) (Entry, bool) { + s.mu.Lock() + + var prev Entry + var existed bool + if e, ok := s.entries[name]; ok { + prev = e + existed = true + } + + curr := Entry{ + Conn: conn, + Role: role, + healthy: true, + } + s.entries[name] = curr + + s.mu.Unlock() + + // Notify outside of lock + for _, o := range s.observers { + o.OnUpsert(name, curr, prev, existed) + } + + return prev, existed +} + +// UpdateHealth updates the health status of a connection. +func (s *Store) UpdateHealth(name string, healthy bool) bool { + s.mu.Lock() + + e, ok := s.entries[name] + if !ok { + s.mu.Unlock() + return false + } + + if e.healthy == healthy { + s.mu.Unlock() + return true // No change + } + + prev := e + e.healthy = healthy + s.entries[name] = e + curr := e + + s.mu.Unlock() + + // Notify outside of lock + for _, o := range s.observers { + o.OnUpsert(name, curr, prev, true) + } + + return true +} + +// UpdateRole updates the role of a connection. +func (s *Store) UpdateRole(name string, role Role) bool { + s.mu.Lock() + + e, ok := s.entries[name] + if !ok { + s.mu.Unlock() + return false + } + + if e.Role == role { + s.mu.Unlock() + return true // No change + } + + prev := e + e.Role = role + s.entries[name] = e + curr := e + + s.mu.Unlock() + + // Notify outside of lock + for _, o := range s.observers { + o.OnUpsert(name, curr, prev, true) + } + + return true +} + +// Remove removes a connection entry. +func (s *Store) Remove(name string) (Entry, bool) { + s.mu.Lock() + + old, ok := s.entries[name] + if !ok { + s.mu.Unlock() + return Entry{}, false + } + + delete(s.entries, name) + s.mu.Unlock() + + // Notify outside of lock + for _, o := range s.observers { + o.OnRemove(name) + } + + return old, true +} + +// All returns all entries. +func (s *Store) All() map[string]Entry { + s.mu.RLock() + result := make(map[string]Entry, len(s.entries)) + for k, v := range s.entries { + result[k] = v + } + s.mu.RUnlock() + return result +} + +// AllHealthy returns all entries with healthy status. +func (s *Store) AllHealthy() map[string]Entry { + s.mu.RLock() + result := make(map[string]Entry) + for k, v := range s.entries { + if v.healthy { + result[k] = v + } + } + s.mu.RUnlock() + return result +} diff --git a/pool/strategy.go b/pool/strategy.go new file mode 100644 index 000000000..efcf95da6 --- /dev/null +++ b/pool/strategy.go @@ -0,0 +1,26 @@ +package pool + +import "github.com/tarantool/go-tarantool/v3" + +// Strategy defines the interface for connection selection strategies. +// Strategies own connections directly and provide round-robin or other +// selection algorithms. +type Strategy interface { + // Add adds a connection with the given ID to the strategy. + Add(id string, conn *tarantool.Connection) + + // Remove removes a connection by ID. + Remove(id string) *tarantool.Connection + + // Get returns a connection by ID. + Get(id string) *tarantool.Connection + + // Next returns the next connection according to the strategy's algorithm. + Next() *tarantool.Connection + + // Connections returns all connections managed by this strategy. + Connections() map[string]*tarantool.Connection + + // Len returns the number of connections managed by this strategy. + Len() int +} diff --git a/pool/strategy_builder.go b/pool/strategy_builder.go new file mode 100644 index 000000000..9b41be1e8 --- /dev/null +++ b/pool/strategy_builder.go @@ -0,0 +1,43 @@ +package pool + +// StrategyBuilder creates strategies for the connection pool. +// It allows users to customize how connections are selected. +type StrategyBuilder interface { + // Build creates strategies for RW, RO, and ANY modes. + // ExpectedSize is the expected number of connections for pre-allocation. + Build(expectedSize int) (rw, ro, any Strategy) +} + +// RoundRobinBuilder creates round-robin strategies for all modes. +// This is the default strategy builder. +type RoundRobinBuilder struct{} + +// Build creates round-robin strategies. +func (b RoundRobinBuilder) Build(expectedSize int) (rw, ro, any Strategy) { + return newRoundRobinStrategy(expectedSize), + newRoundRobinStrategy(expectedSize), + newRoundRobinStrategy(expectedSize) +} + +// ActiveStandbyBuilder creates active/standby strategies for all modes. +// +// ActiveStandby maintains a limited number of "active" connections that +// receive traffic. When an active connection is removed, a standby +// connection is automatically promoted. +type ActiveStandbyBuilder struct { + // RWPrimaryCount is the maximum number of active connections for RW mode. + RWPrimaryCount int + + // ROPrimaryCount is the maximum number of active connections for RO mode. + ROPrimaryCount int +} + +// Build creates active/standby strategies for all modes. +// AnyPrimaryCount is calculated as RWPrimaryCount + ROPrimaryCount. +func (b ActiveStandbyBuilder) Build(expectedSize int) (rw, ro, any Strategy) { + anyPrimaryCount := b.RWPrimaryCount + b.ROPrimaryCount + + return newActiveStandbyStrategy(b.RWPrimaryCount, expectedSize), + newActiveStandbyStrategy(b.ROPrimaryCount, expectedSize), + newActiveStandbyStrategy(anyPrimaryCount, expectedSize) +} diff --git a/pool/strategy_test.go b/pool/strategy_test.go new file mode 100644 index 000000000..1012ce89c --- /dev/null +++ b/pool/strategy_test.go @@ -0,0 +1,383 @@ +package pool + +import ( + "testing" + + "github.com/tarantool/go-tarantool/v3" +) + +// roundRobinStrategy Tests + +func TestRoundRobinStrategy_AddRemove(t *testing.T) { + s := newRoundRobinStrategy(10) + + ids := []string{"conn1", "conn2"} + conns := []*tarantool.Connection{{}, {}} + + for i, id := range ids { + s.Add(id, conns[i]) + } + + for i, id := range ids { + removed := s.Remove(id) + if removed != conns[i] { + t.Errorf("Remove(%q) = %p, want %p", id, removed, conns[i]) + } + } + + if len(s.Connections()) != 0 { + t.Errorf("Connections() should be empty, got %d", len(s.Connections())) + } +} + +func TestRoundRobinStrategy_AddUpsert(t *testing.T) { + s := newRoundRobinStrategy(10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn1", conn2) // Should update (upsert) + + conns := s.Connections() + if len(conns) != 1 { + t.Errorf("Connections() len = %d, want 1", len(conns)) + } + if conns["conn1"] != conn2 { + t.Errorf("Connections()[conn1] = %p, want %p (updated)", conns["conn1"], conn2) + } +} + +func TestRoundRobinStrategy_RemoveNonExistent(t *testing.T) { + s := newRoundRobinStrategy(10) + + removed := s.Remove("nonexistent") + if removed != nil { + t.Errorf("Remove(nonexistent) = %p, want nil", removed) + } +} + +func TestRoundRobinStrategy_Next(t *testing.T) { + s := newRoundRobinStrategy(10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn2", conn2) + + expected := []*tarantool.Connection{conn1, conn2, conn1, conn2} + for i, want := range expected { + got := s.Next() + if got != want { + t.Errorf("Next() call %d = %p, want %p", i, got, want) + } + } +} + +func TestRoundRobinStrategy_NextEmpty(t *testing.T) { + s := newRoundRobinStrategy(10) + + got := s.Next() + if got != nil { + t.Errorf("Next() on empty strategy = %p, want nil", got) + } +} + +func TestRoundRobinStrategy_RemoveIndexUpdate(t *testing.T) { + s := newRoundRobinStrategy(10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + conn3 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn2", conn2) + s.Add("conn3", conn3) + + s.Remove("conn2") + + expected := []*tarantool.Connection{conn1, conn3, conn1, conn3} + for i, want := range expected { + got := s.Next() + if got != want { + t.Errorf("Next() after remove, call %d = %p, want %p", i, got, want) + } + } +} + +// activeStandbyStrategy Tests + +func TestActiveStandbyStrategy_ActiveSlots(t *testing.T) { + s := newActiveStandbyStrategy(2, 10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + conn3 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn2", conn2) + s.Add("conn3", conn3) // Should be standby (only 2 active slots). + + if s.Len() != 2 { + t.Errorf("ActiveCount() = %d, want 2", s.Len()) + } + if s.StandbyCount() != 1 { + t.Errorf("StandbyCount() = %d, want 1", s.StandbyCount()) + } +} + +func TestActiveStandbyStrategy_PromotionOnRemove(t *testing.T) { + s := newActiveStandbyStrategy(2, 10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + conn3 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn2", conn2) + s.Add("conn3", conn3) // Standby. + + // Remove active connection. + removed := s.Remove("conn1") + if removed != conn1 { + t.Errorf("Remove(conn1) = %p, want %p", removed, conn1) + } + + // Standby should be promoted. + if s.Len() != 2 { + t.Errorf("ActiveCount() after remove = %d, want 2", s.Len()) + } + if !s.IsActive("conn3") { + t.Errorf("conn3 should be promoted after conn1 removal") + } +} + +func TestActiveStandbyStrategy_RemoveNonExistent(t *testing.T) { + s := newActiveStandbyStrategy(2, 10) + + removed := s.Remove("nonexistent") + if removed != nil { + t.Errorf("Remove(nonexistent) = %p, want nil", removed) + } +} + +func TestActiveStandbyStrategy_Next(t *testing.T) { + s := newActiveStandbyStrategy(2, 10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn2", conn2) + + expected := []*tarantool.Connection{conn1, conn2, conn1, conn2} + for i, want := range expected { + got := s.Next() + if got != want { + t.Errorf("Next() call %d = %p, want %p", i, got, want) + } + } +} + +func TestActiveStandbyStrategy_NextOnlyActive(t *testing.T) { + s := newActiveStandbyStrategy(1, 10) + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + s.Add("conn1", conn1) + s.Add("conn2", conn2) // Standby. + + // Next should only return conn1 (active). + for i := 0; i < 4; i++ { + got := s.Next() + if got != conn1 { + t.Errorf("Next() call %d = %p, want %p (conn1)", i, got, conn1) + } + } +} + +// selector Tests + +type selectorTest struct { + store *Store + rwStrategy *roundRobinStrategy + roStrategy *roundRobinStrategy + anyStrategy *roundRobinStrategy + sel *selector +} + +func setupSelectorTest() selectorTest { + store := NewStore() + rwStrategy := newRoundRobinStrategy(10) + roStrategy := newRoundRobinStrategy(10) + anyStrategy := newRoundRobinStrategy(10) + sel := newSelector(store, rwStrategy, roStrategy, anyStrategy) + return selectorTest{ + store: store, + rwStrategy: rwStrategy, + roStrategy: roStrategy, + anyStrategy: anyStrategy, + sel: sel, + } +} + +func TestSelector_Select_RW(t *testing.T) { + test := setupSelectorTest() + + masterConn := &tarantool.Connection{} + test.store.Upsert("master", masterConn, MasterRole) + + // RW mode should return master. + conn, err := test.rwStrategy.Next(), error(nil) + if err != nil { + t.Errorf("Next() error = %v", err) + } + if conn != masterConn { + t.Errorf("Next() = %p, want %p", conn, masterConn) + } +} + +func TestSelector_Select_RO(t *testing.T) { + test := setupSelectorTest() + + masterConn := &tarantool.Connection{} + replicaConn := &tarantool.Connection{} + + test.store.Upsert("master", masterConn, MasterRole) + test.store.Upsert("replica", replicaConn, ReplicaRole) + + // RO mode should return replica. + conn := test.roStrategy.Next() + if conn != replicaConn { + t.Errorf("Next() = %p, want %p", conn, replicaConn) + } +} + +func TestSelector_Select_ANY(t *testing.T) { + test := setupSelectorTest() + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + test.store.Upsert("conn1", conn1, MasterRole) + test.store.Upsert("conn2", conn2, ReplicaRole) + + // ANY mode should return any connection. + seen := make(map[*tarantool.Connection]bool) + for i := 0; i < 10; i++ { + conn := test.anyStrategy.Next() + seen[conn] = true + } + // Should have seen both connections. + if len(seen) != 2 { + t.Errorf("Next() should rotate between all connections, saw %d unique", len(seen)) + } +} + +func TestSelector_RoleChange(t *testing.T) { + test := setupSelectorTest() + + conn := &tarantool.Connection{} + test.store.Upsert("conn", conn, MasterRole) + + // Initially master. + if test.rwStrategy.Next() == nil { + t.Errorf("rwStrategy.Next() should return connection") + } + + // Change role to replica. + test.store.UpdateRole("conn", ReplicaRole) + + // Now should be in RO pool. + if test.rwStrategy.Next() != nil { + t.Errorf("rwStrategy.Next() should return nil after role change") + } + if test.roStrategy.Next() == nil { + t.Errorf("roStrategy.Next() should return connection after role change") + } +} + +func TestSelector_Remove(t *testing.T) { + test := setupSelectorTest() + + conn := &tarantool.Connection{} + test.store.Upsert("conn", conn, MasterRole) + + test.store.Remove("conn") + + // Should be removed from all strategies. + if !test.sel.IsEmpty(RW) { + t.Errorf("IsEmpty(RW) = false, want true") + } + if !test.sel.IsEmpty(ANY) { + t.Errorf("IsEmpty(ANY) = false, want true") + } +} + +func TestSelector_Get(t *testing.T) { + test := setupSelectorTest() + + conn := &tarantool.Connection{} + test.store.Upsert("conn", conn, MasterRole) + + got := test.sel.Get("conn") + if got != conn { + t.Errorf("Get(conn) = %p, want %p", got, conn) + } + + got = test.sel.Get("nonexistent") + if got != nil { + t.Errorf("Get(nonexistent) = %p, want nil", got) + } +} + +func TestSelector_HealthChange(t *testing.T) { + test := setupSelectorTest() + + conn := &tarantool.Connection{} + test.store.Upsert("conn", conn, MasterRole) + + // Initially healthy. + if test.rwStrategy.Next() == nil { + t.Errorf("rwStrategy.Next() should return connection") + } + + // Become unhealthy. + test.store.UpdateHealth("conn", false) + + // Should not be available. + if test.rwStrategy.Next() != nil { + t.Errorf("rwStrategy.Next() should return nil when unhealthy") + } + if !test.sel.IsEmpty(RW) { + t.Errorf("IsEmpty(RW) = false, want true when unhealthy") + } + + // Become healthy again. + test.store.UpdateHealth("conn", true) + + // Should be available again. + if test.rwStrategy.Next() == nil { + t.Errorf("rwStrategy.Next() should return connection after recovery") + } +} + +func TestSelector_ConnectionUpdate(t *testing.T) { + test := setupSelectorTest() + + conn1 := &tarantool.Connection{} + conn2 := &tarantool.Connection{} + + test.store.Upsert("conn", conn1, MasterRole) + if test.sel.Get("conn") != conn1 { + t.Errorf("Get(conn) = %p, want %p", test.sel.Get("conn"), conn1) + } + + // Update connection pointer. + test.store.Upsert("conn", conn2, MasterRole) + if test.sel.Get("conn") != conn2 { + t.Errorf("Get(conn) = %p, want %p", test.sel.Get("conn"), conn2) + } +}