Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions percona/controller/pgcluster/standby.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ func (r *PGClusterReconciler) reconcileStandbyLag(ctx context.Context, cr *v2.Pe

lagBytes, err := r.getStandbyLag(ctx, cr)
if err != nil {
if errors.Is(err, ErrPrimaryPodNotFound) && cr.Status.State != v2.AppStateReady {
Copy link
Copy Markdown
Contributor

@gkech gkech Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to have app state ready while the primary cannot be found?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It refers to the primary pod of the main site, not the standby

meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: postgrescluster.ConditionStandbyLagging,
Status: metav1.ConditionUnknown,
Reason: "PrimaryNotFound",
Message: "Cannot find primary for replication lag calculation",
})
return nil
}
if errors.Is(err, ErrInvalidLagQueryOutput) {
meta.SetStatusCondition(&cr.Status.Conditions, metav1.Condition{
Type: postgrescluster.ConditionStandbyLagging,
Status: metav1.ConditionUnknown,
Reason: "InvalidLagQueryOutput",
Message: "Invalid output from lag query. The WAL receiver is probably not active",
})
return nil
}
Comment thread
pooknull marked this conversation as resolved.
Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should unconditionally add metav1.ConditionUnknown for this condition if we have a non-nil error, regardless of what the error actually is..

We can have a standard reason ErrorGettingLag, and the message can be the error string.. WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return errors.Wrap(err, "calculate replication lag bytes")
}

Expand Down Expand Up @@ -146,10 +164,15 @@ func (r *PGClusterReconciler) getStandbyLag(ctx context.Context, standby *v2.Per
return r.getLagFromMainSite(ctx, standby)
}

var (
ErrPrimaryPodNotFound = errors.New("primary pod not found")
ErrInvalidLagQueryOutput = errors.New("invalid lag query output")
)
Comment on lines +167 to +170
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem: ErrPrimaryPodNotFound / ErrInvalidLagQueryOutput are exported but (currently) only used within this file.
Why it matters: Exporting expands the package API surface and makes future refactors harder.
Fix: Make these errors unexported (e.g., errPrimaryPodNotFound, errInvalidLagQueryOutput) unless they are intended to be referenced from other packages.

Copilot uses AI. Check for mistakes.

func (r *PGClusterReconciler) getLagFromStreamingHost(ctx context.Context, standby *v2.PerconaPGCluster) (int64, error) {
primary, err := perconaPG.GetPrimaryPod(ctx, r.Client, standby)
if err != nil {
return 0, errors.Wrap(err, "get primary pod")
return 0, errors.Wrapf(ErrPrimaryPodNotFound, "get primary pod: %s", err.Error())
Comment thread
pooknull marked this conversation as resolved.
}
Comment thread
pooknull marked this conversation as resolved.

podExecutor := postgres.Executor(func(ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string) error {
Expand All @@ -166,6 +189,9 @@ func (r *PGClusterReconciler) getLagFromStreamingHost(ctx context.Context, stand
}

lagStr := strings.TrimSpace(stdout)
if lagStr == "" {
return 0, errors.Wrapf(ErrInvalidLagQueryOutput, "failed to get lag from streaming host: invalid value: %q", lagStr)
}
lagBytes, err := strconv.ParseInt(lagStr, 10, 64)
if err != nil {
return 0, errors.Wrapf(err, "parse lag bytes: %s", lagStr)
Expand Down Expand Up @@ -213,7 +239,7 @@ func (r *PGClusterReconciler) getWALLagBytes(
) (int64, error) {
primary, err := perconaPG.GetPrimaryPod(ctx, r.Client, standby)
if err != nil {
return 0, errors.Wrap(err, "get primary pod")
return 0, errors.Wrap(ErrPrimaryPodNotFound, "get primary pod")
Comment thread
pooknull marked this conversation as resolved.
Outdated
}

podExecutor := postgres.Executor(func(ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string) error {
Expand All @@ -239,7 +265,7 @@ func (r *PGClusterReconciler) getWALLagBytes(
func (r *PGClusterReconciler) getCurrentWALLSN(ctx context.Context, cr *v2.PerconaPGCluster) (string, error) {
primary, err := perconaPG.GetPrimaryPod(ctx, r.Client, cr)
if err != nil {
return "", errors.Wrap(err, "get primary pod")
return "", errors.Wrapf(ErrPrimaryPodNotFound, "get primary pod: %s", err.Error())
Comment thread
pooknull marked this conversation as resolved.
}

podExecutor := postgres.Executor(func(ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string) error {
Expand Down Expand Up @@ -374,7 +400,7 @@ func pollAndRequeueStandbys(
if !cluster.ShouldCheckStandbyLag() || status.Standby == nil || shouldSkipLagCheck(&cluster) {
continue
}
log.Info("Requeuing standby cluster for lag check", "cluster", cluster.Name)
log.V(1).Info("Requeuing standby cluster for lag check", "cluster", cluster.Name)
events <- event.GenericEvent{Object: cluster.DeepCopy()}
}
case <-ctx.Done():
Expand Down
57 changes: 51 additions & 6 deletions percona/controller/pgcluster/standby_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func TestReconcileStandbyLag(t *testing.T) {

mockPodExec := func(wantLagBytes int64) func(context.Context, string, string, string, io.Reader, io.Writer, io.Writer, ...string) error {
return func(ctx context.Context, namespace, pod, container string,
stdin io.Reader, stdout, stderr io.Writer, command ...string) error {

stdin io.Reader, stdout, stderr io.Writer, command ...string,
) error {
if stdin == nil {
return nil
}
Expand All @@ -68,11 +68,9 @@ func TestReconcileStandbyLag(t *testing.T) {

newReconciler := func(
sourceCluster *v2.PerconaPGCluster,
standbyCluster *v2.PerconaPGCluster,
sourcePrimary *corev1.Pod,
standbyPrimary *corev1.Pod,
obj ...client.Object,
) *PGClusterReconciler {
cl, err := buildFakeClient(t.Context(), sourceCluster, standbyCluster, sourcePrimary, standbyPrimary)
cl, err := buildFakeClient(t.Context(), sourceCluster, obj...)
require.NoError(t, err)

return &PGClusterReconciler{
Expand Down Expand Up @@ -141,6 +139,53 @@ func TestReconcileStandbyLag(t *testing.T) {
assert.Equal(t, "MainSiteNotFound", cond.Reason)
})

t.Run("primary not found", func(t *testing.T) {
cluster := standbyCluster.DeepCopy()
cluster.Status.State = v2.AppStateInit
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: postgrescluster.ConditionStandbyLagging,
})

r := newReconciler(
sourceCluster,
cluster,
)
r.PodExec = mockPodExec(0)

err = r.reconcileStandbyLag(t.Context(), cluster)
require.NoError(t, err)

cond := meta.FindStatusCondition(cluster.Status.Conditions, postgrescluster.ConditionStandbyLagging)
assert.NotNil(t, cond)
assert.Equal(t, metav1.ConditionUnknown, cond.Status)
assert.Equal(t, "PrimaryNotFound", cond.Reason)
})

t.Run("invalid lag query output", func(t *testing.T) {
cluster := standbyCluster.DeepCopy()
cluster.Spec.Standby.Host = "streaming-primary-host"

r := newReconciler(
sourceCluster,
cluster,
primaryPodForCluster(sourceCluster),
primaryPodForCluster(cluster),
)
r.PodExec = func(ctx context.Context, namespace, pod, container string,
stdin io.Reader, stdout, stderr io.Writer, command ...string,
) error {
return nil
}

err = r.reconcileStandbyLag(t.Context(), cluster)
require.NoError(t, err)

cond := meta.FindStatusCondition(cluster.Status.Conditions, postgrescluster.ConditionStandbyLagging)
assert.NotNil(t, cond)
assert.Equal(t, metav1.ConditionUnknown, cond.Status)
assert.Equal(t, "InvalidLagQueryOutput", cond.Reason)
})

t.Run("lag not detected", func(t *testing.T) {
cluster := standbyCluster.DeepCopy()
r := newReconciler(
Expand Down
Loading