diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ac046a15..425e50852 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. * New `MockDoer` interface for custom `Doer` testing with builder pattern methods: `AddResponse`, `AddResponseRaw`, `AddResponseError`, `Requests`. * New `MockRequestNamed` type for verifying specific requests in tests. +* New `test_helpers.ExecuteOnAll` function to execute operations on all + instances in parallel with context support. ### Changed @@ -62,11 +64,16 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. become private, `SetError` and `SetResponse` become private (#470). * `ConnectionPool.Close()` returns a single error value, combining multiple errors using errors.Join() (#540). +* `test_helpers.CheckPoolStatuses` and `test_helpers.ProcessListenOnInstance` + now accept typed arguments (`CheckStatusesArgs` and `ListenOnInstanceArgs` + respectively) instead of `interface{}`. ### Removed * Deprecated `NewCall16Request` and `NewCall17Request` constructors. Use `NewCallRequest` instead. +* `test_helpers.Retry` function. Use `assert.Eventually` from testify instead. + `test_helpers.WaitUntilReconnected` reimplemented without `Retry`. ### Fixed diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index 1598938a3..129f73f7e 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -95,8 +95,10 @@ var connOpts = tarantool.Opts{ Timeout: 5 * time.Second, } -var defaultCountRetry = 5 -var defaultTimeoutRetry = 500 * time.Millisecond +const defaultCountRetry = 5 + +const tick = 500 * time.Millisecond +const timeout = defaultCountRetry * tick var helpInstances []*test_helpers.TarantoolInstance @@ -437,9 +439,10 @@ func TestReconnect(t *testing.T) { server: false, }, } - - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) err = test_helpers.RestartTarantool(helpInstances[0]) @@ -454,9 +457,10 @@ func TestReconnect(t *testing.T) { server: true, }, } - - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) } @@ -486,8 +490,10 @@ func TestDisconnect_withReconnect(t *testing.T) { servers[serverId]: false, }, } - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, - args, defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) // Restart the server after success. @@ -503,9 +509,10 @@ func TestDisconnect_withReconnect(t *testing.T) { servers[serverId]: true, }, } - - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, - args, defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) } @@ -534,9 +541,10 @@ func TestDisconnectAll(t *testing.T) { server2: false, }, } - - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) err = test_helpers.RestartTarantool(helpInstances[0]) @@ -555,9 +563,10 @@ func TestDisconnectAll(t *testing.T) { server2: true, }, } - - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) } @@ -592,8 +601,10 @@ func TestAdd(t *testing.T) { }, } - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) } @@ -639,8 +650,10 @@ func TestAdd_exist(t *testing.T) { }, } - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) } @@ -676,8 +689,10 @@ func TestAdd_unreachable(t *testing.T) { }, } - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) } @@ -781,8 +796,10 @@ func TestRemove(t *testing.T) { }, } - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) } @@ -810,8 +827,10 @@ func TestRemove_double(t *testing.T) { }, } - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) } @@ -838,8 +857,10 @@ func TestRemove_unknown(t *testing.T) { }, } - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) } @@ -887,8 +908,10 @@ func TestRemove_concurrent(t *testing.T) { }, } - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) } @@ -972,8 +995,10 @@ func TestClose(t *testing.T) { }, } - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) } @@ -1035,8 +1060,10 @@ func TestCloseGraceful(t *testing.T) { }, } - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) } @@ -1397,8 +1424,10 @@ func TestRequestOnClosed(t *testing.T) { server2: false, }, } - err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.CheckPoolStatuses(args) + return err == nil + }, timeout, tick) require.NoError(t, err) _, err = connPool.Do(tarantool.NewPingRequest(), pool.ANY).Get() @@ -1968,8 +1997,10 @@ func TestUpdateInstancesRoles(t *testing.T) { Mode: pool.ANY, } - err = test_helpers.Retry(test_helpers.ProcessListenOnInstance, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.ProcessListenOnInstance(args) + return err == nil + }, timeout, tick) require.NoError(t, err) // RW @@ -1980,8 +2011,10 @@ func TestUpdateInstancesRoles(t *testing.T) { Mode: pool.RW, } - err = test_helpers.Retry(test_helpers.ProcessListenOnInstance, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.ProcessListenOnInstance(args) + return err == nil + }, timeout, tick) require.NoError(t, err) // RO @@ -1992,8 +2025,10 @@ func TestUpdateInstancesRoles(t *testing.T) { Mode: pool.RO, } - err = test_helpers.Retry(test_helpers.ProcessListenOnInstance, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.ProcessListenOnInstance(args) + return err == nil + }, timeout, tick) require.NoError(t, err) // PreferRW @@ -2004,8 +2039,10 @@ func TestUpdateInstancesRoles(t *testing.T) { Mode: pool.PreferRW, } - err = test_helpers.Retry(test_helpers.ProcessListenOnInstance, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.ProcessListenOnInstance(args) + return err == nil + }, timeout, tick) require.NoError(t, err) // PreferRO @@ -2016,8 +2053,10 @@ func TestUpdateInstancesRoles(t *testing.T) { Mode: pool.PreferRO, } - err = test_helpers.Retry(test_helpers.ProcessListenOnInstance, args, - defaultCountRetry, defaultTimeoutRetry) + assert.Eventually(t, func() bool { + err = test_helpers.ProcessListenOnInstance(args) + return err == nil + }, timeout, tick) require.NoError(t, err) } diff --git a/shutdown_test.go b/shutdown_test.go index 0d4ffeb5f..45d449481 100644 --- a/shutdown_test.go +++ b/shutdown_test.go @@ -5,7 +5,6 @@ package tarantool_test import ( - "fmt" "sync" "syscall" "testing" @@ -84,24 +83,16 @@ func testGracefulShutdown(t *testing.T, conn *Connection, inst *test_helpers.Tar require.NoError(t, inst.Signal(syscall.SIGTERM)) // Check that we can't send new requests after shutdown starts. - // Retry helps to wait a bit until server starts to shutdown + // assert.Eventually helps to wait a bit until server starts to shutdown // and send us the shutdown event. - shutdownWaitRetries := 5 - shutdownWaitTimeout := 100 * time.Millisecond - - err = test_helpers.Retry(func(interface{}) error { - _, err = conn.Do(NewPingRequest()).Get() - if err == nil { - return fmt.Errorf("expected error for requests sent on shutdown") - } - - if err.Error() != "server shutdown in progress (0x4005)" { - return err - } - - return nil - }, nil, shutdownWaitRetries, shutdownWaitTimeout) - require.NoError(t, err) + const shutdownWaitRetries = 5 + tick := 100 * time.Millisecond + timeout := time.Duration(shutdownWaitRetries) * tick + + require.Eventually(t, func() bool { + _, err := conn.Do(NewPingRequest()).Get() + return err != nil && err.Error() == "server shutdown in progress (0x4005)" + }, timeout, tick, "expected shutdown error 'server shutdown in progress (0x4005)'") // Check that requests started before the shutdown finish successfully. data, err := fut.Get() diff --git a/test_helpers/pool_helper.go b/test_helpers/pool_helper.go index 8e134e8b4..b55944a2a 100644 --- a/test_helpers/pool_helper.go +++ b/test_helpers/pool_helper.go @@ -4,11 +4,11 @@ import ( "context" "errors" "fmt" - "reflect" "sync" "time" "github.com/tarantool/go-tarantool/v3" + "github.com/tarantool/go-tarantool/v3/box" "github.com/tarantool/go-tarantool/v3/pool" ) @@ -41,26 +41,41 @@ func compareTuples(expectedTpl []interface{}, actualTpl []interface{}) error { return nil } -func CheckPoolStatuses(args interface{}) error { - checkArgs, ok := args.(CheckStatusesArgs) - if !ok { - return fmt.Errorf("incorrect args") +func comparePortMaps(expected map[string]bool, actual map[string]bool) error { + if len(actual) != len(expected) { + return fmt.Errorf("unexpected number of ports: expected %d, actual %d", + len(expected), len(actual)) + } + + for port, expectedValue := range expected { + actualValue, ok := actual[port] + if !ok { + return fmt.Errorf("missing expected port: %s", port) + } + if actualValue != expectedValue { + return fmt.Errorf("unexpected value for port %s: expected %t, actual %t", + port, expectedValue, actualValue) + } } - connected, _ := checkArgs.ConnPool.ConnectedNow(checkArgs.Mode) - if connected != checkArgs.ExpectedPoolStatus { + return nil +} + +func CheckPoolStatuses(args CheckStatusesArgs) error { + connected, _ := args.ConnPool.ConnectedNow(args.Mode) + if connected != args.ExpectedPoolStatus { return fmt.Errorf( "incorrect connection pool status: expected status %t actual status %t", - checkArgs.ExpectedPoolStatus, connected) + args.ExpectedPoolStatus, connected) } - poolInfo := checkArgs.ConnPool.GetInfo() - for _, server := range checkArgs.Servers { + poolInfo := args.ConnPool.GetInfo() + for _, server := range args.Servers { status := poolInfo[server].ConnectedNow - if checkArgs.ExpectedStatuses[server] != status { + if args.ExpectedStatuses[server] != status { return fmt.Errorf( "incorrect conn status: addr %s expected status %t actual status %t", - server, checkArgs.ExpectedStatuses[server], status) + server, args.ExpectedStatuses[server], status) } } @@ -76,90 +91,64 @@ func CheckPoolStatuses(args interface{}) error { // ports or to all ports. // For PreferRW mode expected received ports equals to master ports // or to all ports. -func ProcessListenOnInstance(args interface{}) error { +func ProcessListenOnInstance(args ListenOnInstanceArgs) error { actualPorts := map[string]bool{} - listenArgs, ok := args.(ListenOnInstanceArgs) - if !ok { - return fmt.Errorf("incorrect args") - } - - for i := 0; i < listenArgs.ServersNumber; i++ { + for i := 0; i < args.ServersNumber; i++ { req := tarantool.NewEvalRequest("return box.cfg.listen") - data, err := listenArgs.ConnPool.Do(req, listenArgs.Mode).Get() + var data []string + err := args.ConnPool.Do(req, args.Mode).GetTyped(&data) if err != nil { - return fmt.Errorf("fail to Eval: %s", err.Error()) - } - if len(data) < 1 { - return fmt.Errorf("response.Data is empty after Eval") + return fmt.Errorf("failed to get response: %w", err) } - - port, ok := data[0].(string) - if !ok { - return fmt.Errorf("response.Data is incorrect after Eval") + if len(data) == 0 { + return errors.New("empty response from Eval") } - actualPorts[port] = true + actualPorts[data[0]] = true } - equal := reflect.DeepEqual(actualPorts, listenArgs.ExpectedPorts) - if !equal { - return fmt.Errorf("expected ports: %v, actual ports: %v", - listenArgs.ExpectedPorts, actualPorts) + if err := comparePortMaps(args.ExpectedPorts, actualPorts); err != nil { + return err } return nil } -func Retry(f func(interface{}) error, args interface{}, count int, timeout time.Duration) error { - var err error - - for i := 0; ; i++ { - err = f(args) - if err == nil { - return err - } - - if i >= (count - 1) { - break - } - - time.Sleep(timeout) - } - - return err -} - func InsertOnInstance(ctx context.Context, dialer tarantool.Dialer, connOpts tarantool.Opts, space interface{}, tuple interface{}) error { conn, err := tarantool.Connect(ctx, dialer, connOpts) - if err != nil { - return fmt.Errorf("fail to connect: %s", err.Error()) - } - if conn == nil { + + switch { + case err != nil: + return fmt.Errorf("fail to connect: %w", err) + case conn == nil: return fmt.Errorf("conn is nil after Connect") } + defer func() { _ = conn.Close() }() data, err := conn.Do(tarantool.NewInsertRequest(space).Tuple(tuple)).Get() - if err != nil { - return fmt.Errorf("failed to Insert: %s", err.Error()) - } - if len(data) != 1 { + + switch { + case err != nil: + return fmt.Errorf("failed to Insert: %w", err) + case len(data) != 1: return fmt.Errorf("response Body len != 1") } - if tpl, ok := data[0].([]interface{}); !ok { + + tpl, ok := data[0].([]interface{}) + if !ok { return fmt.Errorf("unexpected body of Insert") - } else { - expectedTpl, ok := tuple.([]interface{}) - if !ok { - return fmt.Errorf("failed to cast") - } + } - err = compareTuples(expectedTpl, tpl) - if err != nil { - return err - } + expectedTpl, ok := tuple.([]interface{}) + if !ok { + return fmt.Errorf("failed to cast") + } + + if err := compareTuples(expectedTpl, tpl); err != nil { + return err } return nil @@ -179,22 +168,12 @@ func InsertOnInstances( err := SetClusterRO(ctx, dialers, connOpts, roles) if err != nil { - return fmt.Errorf("fail to set roles for cluster: %s", err.Error()) - } - - errs := make([]error, len(dialers)) - var wg sync.WaitGroup - wg.Add(len(dialers)) - for i, dialer := range dialers { - // Pass loop variable(s) to avoid its capturing by reference (not needed since Go 1.22). - go func(i int, dialer tarantool.Dialer) { - defer wg.Done() - errs[i] = InsertOnInstance(ctx, dialer, connOpts, space, tuple) - }(i, dialer) + return fmt.Errorf("fail to set roles for cluster: %w", err) } - wg.Wait() - return errors.Join(errs...) + return ExecuteOnAll(ctx, dialers, func(_ context.Context, d tarantool.Dialer, _ int) error { + return InsertOnInstance(ctx, d, connOpts, space, tuple) + }) } func SetInstanceRO(ctx context.Context, dialer tarantool.Dialer, connOpts tarantool.Opts, @@ -212,73 +191,80 @@ func SetInstanceRO(ctx context.Context, dialer tarantool.Dialer, connOpts tarant return err } - checkRole := func(conn *tarantool.Connection, isReplica bool) string { - data, err := conn.Do(tarantool.NewCallRequest("box.info")).Get() - switch { - case err != nil: - return fmt.Sprintf("failed to get box.info: %s", err) - case len(data) < 1: - return "box.info is empty" - } - - boxInfo, ok := data[0].(map[interface{}]interface{}) - if !ok { - return "unexpected type in box.info response" + checkRole := func(conn *tarantool.Connection, isReplica bool) error { + boxInfo, err := box.MustNew(conn).Info() + if err != nil { + return fmt.Errorf("failed to get box.info: %s", err) } - status, statusFound := boxInfo["status"] - readonly, readonlyFound := boxInfo["ro"] switch { - case !statusFound: - return "box.info.status is missing" - case status != "running": - return fmt.Sprintf("box.info.status='%s' (waiting for 'running')", status) - case !readonlyFound: - return "box.info.ro is missing" - case readonly != isReplica: - return fmt.Sprintf("box.info.ro='%v' (waiting for '%v')", readonly, isReplica) + case boxInfo.Status != "running": + return fmt.Errorf("box.info.status='%s' (waiting for 'running')", boxInfo.Status) + case boxInfo.RO != isReplica: + return fmt.Errorf("box.info.ro='%v' (waiting for '%v')", boxInfo.RO, isReplica) default: - return "" + return nil } } - problem := "not checked yet" + err = errors.New("not checked yet") // Wait for the role to be applied. - for len(problem) != 0 { + for err != nil { select { case <-time.After(10 * time.Millisecond): case <-ctx.Done(): - return fmt.Errorf("%w: failed to apply role, the last problem: %s", - ctx.Err(), problem) + return fmt.Errorf("%w: failed to apply role, the last error: %s", + ctx.Err(), err) } - problem = checkRole(conn, isReplica) + err = checkRole(conn, isReplica) } return nil } -func SetClusterRO(ctx context.Context, dialers []tarantool.Dialer, connOpts tarantool.Opts, - roles []bool) error { - if len(dialers) != len(roles) { - return fmt.Errorf("number of servers should be equal to number of roles") - } - - // Apply roles in parallel. - errs := make([]error, len(dialers)) +func ExecuteOnAll(ctx context.Context, dialers []tarantool.Dialer, + fn func(context.Context, tarantool.Dialer, int) error) error { var wg sync.WaitGroup + var errs []error + var mu sync.Mutex + wg.Add(len(dialers)) for i, dialer := range dialers { - // Pass loop variable(s) to avoid its capturing by reference (not needed since Go 1.22). - go func(i int, dialer tarantool.Dialer) { + go func(idx int, d tarantool.Dialer) { defer wg.Done() - errs[i] = SetInstanceRO(ctx, dialer, connOpts, roles[i]) + select { + case <-ctx.Done(): + mu.Lock() + errs = append(errs, fmt.Errorf("instance %d: %w", idx, ctx.Err())) + mu.Unlock() + default: + if err := fn(ctx, d, idx); err != nil { + mu.Lock() + errs = append(errs, fmt.Errorf("instance %d: %w", idx, err)) + mu.Unlock() + } + } }(i, dialer) } + wg.Wait() + if len(errs) > 0 { + return errors.Join(errs...) + } + return nil +} + +func SetClusterRO(ctx context.Context, dialers []tarantool.Dialer, connOpts tarantool.Opts, + roles []bool) error { + if len(dialers) != len(roles) { + return fmt.Errorf("number of servers should be equal to number of roles") + } - return errors.Join(errs...) + return ExecuteOnAll(ctx, dialers, func(_ context.Context, d tarantool.Dialer, idx int) error { + return SetInstanceRO(ctx, d, connOpts, roles[idx]) + }) } func StartTarantoolInstances(instsOpts []StartOpts) ([]*TarantoolInstance, error) { diff --git a/test_helpers/utils.go b/test_helpers/utils.go index f3d6dd78d..8faaacf6e 100644 --- a/test_helpers/utils.go +++ b/test_helpers/utils.go @@ -49,16 +49,15 @@ func DeleteRecordByKey(t T, conn tarantool.Connector, // Returns false in case of connection is not in the connected state // after specified retries count, true otherwise. func WaitUntilReconnected(conn *tarantool.Connection, retries uint, timeout time.Duration) bool { - err := Retry(func(arg interface{}) error { - conn := arg.(*tarantool.Connection) - connected := conn.ConnectedNow() - if !connected { - return fmt.Errorf("not connected") + for i := 0; i < int(retries); i++ { + if conn.ConnectedNow() { + return true } - return nil - }, conn, int(retries), timeout) - - return err == nil + if i < int(retries)-1 { + time.Sleep(timeout) + } + } + return conn.ConnectedNow() } func SkipIfSQLUnsupported(t T) {