diff --git a/pkg/cnservice/server.go b/pkg/cnservice/server.go index 11dbc0f619b6b..29bf9a024301c 100644 --- a/pkg/cnservice/server.go +++ b/pkg/cnservice/server.go @@ -19,6 +19,7 @@ import ( "compress/gzip" "context" "encoding/hex" + "errors" "fmt" "sync" "time" @@ -942,7 +943,7 @@ func (s *service) bootstrap() error { // bootstrap cannot fail. We panic here to make sure the service can not start. // If bootstrap failed, need clean all data to retry. if err := s.bootstrapService.Bootstrap(ctx); err != nil { - panic(moerr.AttachCause(ctx, err)) + return handleBootstrapErr(ctx, err) } trace.GetService(s.cfg.UUID).EnableFlush() @@ -963,6 +964,18 @@ func (s *service) bootstrap() error { return nil } +// handleBootstrapErr decides whether a bootstrap error should be returned +// gracefully (for context cancellation during shutdown) or trigger a panic +// (for real bootstrap failures). Only context.Canceled is treated as a +// graceful shutdown signal; DeadlineExceeded from the 5-minute bootstrap +// timeout is a legitimate failure that should still panic. +func handleBootstrapErr(ctx context.Context, err error) error { + if errors.Is(err, context.Canceled) { + return err + } + panic(moerr.AttachCause(ctx, err)) +} + func (s *service) initTxnTraceService() { rt := runtime.ServiceRuntime(s.cfg.UUID) ts, err := trace.NewService( diff --git a/pkg/cnservice/server_bootstrap_test.go b/pkg/cnservice/server_bootstrap_test.go new file mode 100644 index 0000000000000..4093f660bceac --- /dev/null +++ b/pkg/cnservice/server_bootstrap_test.go @@ -0,0 +1,80 @@ +// Copyright 2021-2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cnservice + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" +) + +func TestHandleBootstrapErr(t *testing.T) { + t.Run("context.Canceled returns error", func(t *testing.T) { + ctx := context.Background() + err := handleBootstrapErr(ctx, context.Canceled) + require.Error(t, err) + assert.True(t, err == context.Canceled) + }) + + t.Run("wrapped context.Canceled returns error", func(t *testing.T) { + ctx := context.Background() + wrappedErr := fmt.Errorf("bootstrap failed: %w", context.Canceled) + err := handleBootstrapErr(ctx, wrappedErr) + require.Error(t, err) + assert.ErrorIs(t, err, context.Canceled) + }) + + t.Run("context.DeadlineExceeded panics", func(t *testing.T) { + ctx := context.Background() + assert.Panics(t, func() { + handleBootstrapErr(ctx, context.DeadlineExceeded) + }) + }) + + t.Run("bootstrap timeout with cause panics", func(t *testing.T) { + // Simulate the real bootstrap context: WithTimeoutCause sets a + // custom cause, but the 5-minute timeout is a legitimate failure + // that must still panic. + ctx, cancel := context.WithTimeoutCause( + context.Background(), 0, moerr.CauseBootstrap, + ) + defer cancel() + // Wait for the timeout to fire. + <-ctx.Done() + + assert.Panics(t, func() { + handleBootstrapErr(ctx, ctx.Err()) + }) + }) + + t.Run("other error panics", func(t *testing.T) { + ctx := context.Background() + assert.Panics(t, func() { + handleBootstrapErr(ctx, fmt.Errorf("SQL execution failed")) + }) + }) + + t.Run("moerr wrapped error panics", func(t *testing.T) { + ctx := context.Background() + assert.Panics(t, func() { + handleBootstrapErr(ctx, moerr.NewInternalErrorNoCtx("bootstrap init failed")) + }) + }) +} diff --git a/pkg/embed/cluster.go b/pkg/embed/cluster.go index bd061d4d56043..540e8b4d66969 100644 --- a/pkg/embed/cluster.go +++ b/pkg/embed/cluster.go @@ -104,19 +104,20 @@ func (c *cluster) Start() error { return moerr.NewInvalidStateNoCtx("embed mo cluster already started") } - c.doStartLocked(0) + if err := c.doStartLocked(0); err != nil { + return err + } c.state = started return nil } -func (c *cluster) doStartLocked(from int) { +func (c *cluster) doStartLocked(from int) error { var wg sync.WaitGroup - errC := make(chan error, 1) - defer close(errC) + var startErr atomic.Value for _, s := range c.services[from:] { if s.serviceType != metadata.ServiceType_CN { if err := s.Start(); err != nil { - panic(err) + return err } continue } @@ -125,12 +126,19 @@ func (c *cluster) doStartLocked(from int) { go func(s *operator) { defer wg.Done() if err := s.Start(); err != nil { - panic(err) + // Only the first error is captured; concurrent failures + // from other services are discarded since knowing that + // any service failed is sufficient to abort startup. + startErr.CompareAndSwap(nil, err) } }(s) } wg.Wait() + if v := startErr.Load(); v != nil { + return v.(error) + } + return nil } func (c *cluster) Close() error { @@ -227,8 +235,7 @@ func (c *cluster) StartNewCNService(n int) error { return err } - c.doStartLocked(serviceFrom) - return nil + return c.doStartLocked(serviceFrom) } func (c *cluster) adjust() { diff --git a/pkg/embed/cluster_test.go b/pkg/embed/cluster_test.go index b519aa7c470cb..acfae54470094 100644 --- a/pkg/embed/cluster_test.go +++ b/pkg/embed/cluster_test.go @@ -28,6 +28,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/pb/metadata" "github.com/matrixorigin/matrixone/pkg/util/executor" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -286,3 +287,70 @@ func TestCreateDB(t *testing.T) { }, ) } + +// TestDoStartLockedErrorPaths exercises the error-handling branches in +// doStartLocked that are not reached by normal cluster startup tests. +func TestDoStartLockedErrorPaths(t *testing.T) { + t.Run("non-CN service error returns immediately", func(t *testing.T) { + // A non-CN operator whose state is already 'started' will return + // an error from Start(), exercising the direct-return path at + // cluster.go line 119-121. + op := &operator{ + serviceType: metadata.ServiceType_LOG, + state: started, // forces Start() to return error + } + c := &cluster{ + services: []*operator{op}, + } + err := c.doStartLocked(0) + assert.Error(t, err) + assert.Contains(t, err.Error(), "already started") + }) + + t.Run("CN service error captured via atomic.Value", func(t *testing.T) { + // A CN operator whose state is already 'started' will return an + // error from Start(), exercising the goroutine error-capture path + // at cluster.go lines 128-133 and the error-return at 138-140. + op := &operator{ + serviceType: metadata.ServiceType_CN, + state: started, + } + c := &cluster{ + services: []*operator{op}, + } + err := c.doStartLocked(0) + assert.Error(t, err) + assert.Contains(t, err.Error(), "already started") + }) + + t.Run("Start propagates doStartLocked error", func(t *testing.T) { + // Exercises the error propagation in Start() at line 107-109. + op := &operator{ + serviceType: metadata.ServiceType_LOG, + state: started, + } + c := &cluster{ + state: stopped, + services: []*operator{op}, + } + err := c.Start() + assert.Error(t, err) + assert.Contains(t, err.Error(), "already started") + }) + + t.Run("Start rejects double start", func(t *testing.T) { + c := &cluster{state: started} + err := c.Start() + assert.Error(t, err) + assert.Contains(t, err.Error(), "embed mo cluster already started") + }) + + t.Run("happy path with no services", func(t *testing.T) { + c := &cluster{ + state: stopped, + services: []*operator{}, + } + err := c.doStartLocked(0) + assert.NoError(t, err) + }) +}