diff --git a/cmd/manager/main.go b/cmd/manager/main.go index ff64dcf1f..65f18751b 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -79,6 +79,10 @@ func main() { opts := zap.Options{ Encoder: getLogEncoder(setupLog), Level: getLogLevel(setupLog), + // We rely on github.com/pkg/errors to capture stacktrace on the error itself. + // Zap's stacktrace on log.Error only adds the logger call site from controller-runtime, + // which is redundant and obscures the real failure stack. + StacktraceLevel: zapcore.PanicLevel, } opts.BindFlags(flag.CommandLine) flag.Parse() diff --git a/pkg/controller/ps/controller.go b/pkg/controller/ps/controller.go index 61433ada0..74cbe0be9 100644 --- a/pkg/controller/ps/controller.go +++ b/pkg/controller/ps/controller.go @@ -115,7 +115,7 @@ func (r *PerconaServerMySQLReconciler) Reconcile( var err error defer func() { - if err := r.reconcileCRStatus(ctx, cr, err); err != nil { + if err := util.WrapWithDeepestStack(r.reconcileCRStatus(ctx, cr, err), ""); err != nil { log.Error(err, "failed to update status") } }() @@ -137,8 +137,8 @@ func (r *PerconaServerMySQLReconciler) Reconcile( return rr, nil } - if err = r.doReconcile(ctx, cr); err != nil { - return ctrl.Result{}, errors.Wrap(err, "reconcile") + if err = util.WrapWithDeepestStack(r.doReconcile(ctx, cr), "reconcile"); err != nil { + return ctrl.Result{}, err } return rr, nil @@ -916,6 +916,12 @@ func (r *PerconaServerMySQLReconciler) reconcileReplication(ctx context.Context, case errors.Is(err, orchestrator.ErrNoSuchHost): log.Info("mysql is not ready, host not found. skip") return nil + case errors.Is(err, orchestrator.ErrTimeout): + log.Info("mysql is not ready, connection timeout. skip") + return nil + case errors.Is(err, orchestrator.ErrContainerNotFound): + log.Info("orchestrator is not ready, container not found. skip") + return nil } return errors.Wrap(err, "failed to discover cluster") } diff --git a/pkg/orchestrator/client.go b/pkg/orchestrator/client.go index 12787514d..582bda602 100644 --- a/pkg/orchestrator/client.go +++ b/pkg/orchestrator/client.go @@ -6,6 +6,7 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "io" "strings" "time" @@ -37,6 +38,9 @@ func (r *orcResponse) Error() error { if strings.Contains(r.Message, "no such host") { return ErrNoSuchHost } + if strings.Contains(r.Message, "i/o timeout") { + return ErrTimeout + } return errors.New(r.Message) } @@ -65,18 +69,31 @@ var ( ErrUnauthorized = errors.New("unauthorized") ErrBadConn = errors.New("bad connection") ErrNoSuchHost = errors.New("mysql host not found") + ErrTimeout = errors.New("timeout") + ErrContainerNotFound = errors.New("orchestrator container not found") ) func exec(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, endpoint string, outb, errb *bytes.Buffer) error { c := []string{"curl", fmt.Sprintf("localhost:%d/%s", defaultWebPort, endpoint)} err := cliCmd.Exec(ctx, pod, AppName, c, nil, outb, errb, false) if err != nil { + if strings.Contains(strings.ToLower(err.Error()), "unable to upgrade connection: container not found") { + return ErrContainerNotFound + } return errors.Wrapf(err, "run %s, stdout: %s, stderr: %s", c, outb, errb) } return nil } +func isTruncatedJSONErr(err error) bool { + if err == nil { + return false + } + + return errors.Is(err, io.ErrUnexpectedEOF) || err.Error() == "unexpected end of JSON input" +} + func ClusterPrimary(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, clusterHint string) (*Instance, error) { url := fmt.Sprintf("api/master/%s", clusterHint) @@ -95,6 +112,9 @@ func ClusterPrimary(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Po orcResp := &orcResponse{} if err := json.Unmarshal(body, orcResp); err != nil { + if isTruncatedJSONErr(err) { + return nil, ErrEmptyResponse + } return nil, errors.Wrap(err, "json decode") } @@ -116,6 +136,9 @@ func StopReplication(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.P orcResp := &orcResponse{} if err := json.Unmarshal(res.Bytes(), &orcResp); err != nil { + if isTruncatedJSONErr(err) { + return ErrEmptyResponse + } return errors.Wrap(err, "json decode") } @@ -133,6 +156,9 @@ func StartReplication(ctx context.Context, cliCmd clientcmd.Client, pod *corev1. orcResp := &orcResponse{} if err := json.Unmarshal(res.Bytes(), &orcResp); err != nil { + if isTruncatedJSONErr(err) { + return ErrEmptyResponse + } return errors.Wrap(err, "json decode") } @@ -158,6 +184,9 @@ func AddPeer(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, peer orcResp := &orcResponse{} if err := json.Unmarshal(body, &orcResp); err != nil { + if isTruncatedJSONErr(err) { + return ErrEmptyResponse + } return errors.Wrap(err, "json decode") } @@ -183,6 +212,9 @@ func RemovePeer(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, p orcResp := &orcResponse{} if err := json.Unmarshal(body, &orcResp); err != nil { + if isTruncatedJSONErr(err) { + return ErrEmptyResponse + } return errors.Wrap(err, "json decode") } @@ -211,6 +243,9 @@ func EnsureNodeIsPrimary(ctx context.Context, cliCmd clientcmd.Client, pod *core orcResp := &orcResponse{} if err := json.Unmarshal(body, orcResp); err != nil { + if isTruncatedJSONErr(err) { + return ErrEmptyResponse + } return errors.Wrapf(err, "json decode \"%s\"", string(body)) } @@ -234,6 +269,9 @@ func Discover(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, hos } if err := json.Unmarshal(body, orcResp); err != nil { + if isTruncatedJSONErr(err) { + return ErrEmptyResponse + } return errors.Wrapf(err, "json decode \"%s\"", string(body)) } @@ -257,6 +295,9 @@ func SetWriteable(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, } if err := json.Unmarshal(body, orcResp); err != nil { + if isTruncatedJSONErr(err) { + return ErrEmptyResponse + } return errors.Wrapf(err, "json decode \"%s\"", string(body)) } @@ -284,6 +325,9 @@ func Cluster(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, clus orcResp := &orcResponse{} if err := json.Unmarshal(body, orcResp); err != nil { + if isTruncatedJSONErr(err) { + return nil, ErrEmptyResponse + } return nil, errors.Wrap(err, "json decode") } @@ -311,6 +355,9 @@ func ForgetInstance(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Po } if err := json.Unmarshal(body, orcResp); err != nil { + if isTruncatedJSONErr(err) { + return ErrEmptyResponse + } return errors.Wrapf(err, "json decode \"%s\"", string(body)) } @@ -343,6 +390,9 @@ func BeginDowntime( } if err := json.Unmarshal(body, orcResp); err != nil { + if isTruncatedJSONErr(err) { + return ErrEmptyResponse + } return errors.Wrapf(err, "json decode \"%s\"", string(body)) } @@ -366,6 +416,9 @@ func EndDowntime(ctx context.Context, cliCmd clientcmd.Client, pod *corev1.Pod, } if err := json.Unmarshal(body, orcResp); err != nil { + if isTruncatedJSONErr(err) { + return ErrEmptyResponse + } return errors.Wrapf(err, "json decode \"%s\"", string(body)) } diff --git a/pkg/util/errors.go b/pkg/util/errors.go new file mode 100644 index 000000000..ee7b1b650 --- /dev/null +++ b/pkg/util/errors.go @@ -0,0 +1,68 @@ +package util + +import ( + "fmt" + "io" + + "github.com/pkg/errors" +) + +// WrapWithDeepestStack preserves the full error message chain, but limits +// verbose logging to the deepest pkg/errors stack. This avoids the default +// github.com/pkg/errors behavior where %+v prints stack traces from all wraps. +func WrapWithDeepestStack(err error, msg string) error { + if err == nil { + return nil + } + + stackErr := err + for next := errors.Unwrap(err); next != nil; next = errors.Unwrap(next) { + if _, ok := next.(stackTracer); ok { + stackErr = next + } + } + + text := err.Error() + if msg != "" { + text = msg + ": " + text + } + + return &deepStackErr{ + msg: text, + cause: stackErr, + } +} + +type deepStackErr struct { + msg string + cause error +} + +func (e *deepStackErr) Error() string { return e.msg } + +func (e *deepStackErr) Cause() error { return e.cause } + +func (e *deepStackErr) Unwrap() error { return e.cause } + +func (e *deepStackErr) Format(s fmt.State, verb rune) { + switch verb { + case 'v': + if s.Flag('+') { + _, _ = io.WriteString(s, e.msg) + if e.cause != nil { + _, _ = io.WriteString(s, "\n") + _, _ = fmt.Fprintf(s, "%+v", e.cause) + } + return + } + fallthrough + case 's': + _, _ = io.WriteString(s, e.msg) + case 'q': + _, _ = fmt.Fprintf(s, "%q", e.msg) + } +} + +type stackTracer interface { + StackTrace() errors.StackTrace +}