Skip to content

Commit f911005

Browse files
authored
fix: correct cache client status reporting (#5793)
* fix: wait for cache client readiness before binding dataset Signed-off-by: CAICAIIs <3360776475@qq.com> * fix: require full cache client readiness before binding Signed-off-by: CAICAIIs <3360776475@qq.com> * fix: simplify cache client readiness check Signed-off-by: CAICAIIs <3360776475@qq.com> * fix: restore cache runtime readiness semantics Signed-off-by: CAICAIIs <3360776475@qq.com> * fix: treat zero desired cache client as not ready Signed-off-by: CAICAIIs <3360776475@qq.com> * fix(cache): clarify client readiness logging Signed-off-by: CAICAIIs <3360776475@qq.com> * chore: address cache client review feedback Signed-off-by: CAICAIIs <3360776475@qq.com> * test: wait for curvine client readiness in e2e Signed-off-by: CAICAIIs <3360776475@qq.com> * test: merge curvine e2e conflict resolution Signed-off-by: CAICAIIs <3360776475@qq.com> * test: wait for curvine client pod readiness Signed-off-by: CAICAIIs <3360776475@qq.com> * test: fix curvine client selector Signed-off-by: CAICAIIs <3360776475@qq.com> * test: harden alluxio e2e wait Signed-off-by: CAICAIIs <3360776475@qq.com> --------- Signed-off-by: CAICAIIs <3360776475@qq.com>
1 parent 113aa3f commit f911005

4 files changed

Lines changed: 407 additions & 31 deletions

File tree

pkg/ddc/cache/engine/status.go

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,33 @@ func (e *CacheEngine) setWorkerComponentStatus(componentValue *common.CacheRunti
7979
status.CacheAffinity = affinity
8080
return ready, err
8181
}
82-
func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (err error) {
82+
func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (fullyReady bool, err error) {
8383
manager := component.NewComponentHelper(componentValue.WorkloadType, e.Client)
8484

8585
clientStatus, err := manager.ConstructComponentStatus(context.TODO(), componentValue)
8686
if err != nil {
87-
return err
87+
return false, err
8888
}
89-
if clientStatus.DesiredReplicas > 0 {
90-
if clientStatus.DesiredReplicas == clientStatus.ReadyReplicas {
91-
clientStatus.Phase = fluidapi.RuntimePhaseReady
92-
} else if clientStatus.ReadyReplicas >= 1 {
93-
clientStatus.Phase = fluidapi.RuntimePhasePartialReady
94-
}
89+
if clientStatus.DesiredReplicas > 0 && clientStatus.ReadyReplicas >= clientStatus.DesiredReplicas {
90+
clientStatus.Phase = fluidapi.RuntimePhaseReady
91+
fullyReady = true
92+
} else if clientStatus.ReadyReplicas > 0 {
93+
clientStatus.Phase = fluidapi.RuntimePhasePartialReady
94+
} else {
95+
clientStatus.Phase = fluidapi.RuntimePhaseNotReady
9596
}
9697
status.Client = clientStatus
9798

98-
return nil
99+
return fullyReady, nil
99100
}
100101
func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValue) (bool, error) {
101-
var masterReady, workerReady, runtimeReady = true, true, false
102+
runtimeReady := false
102103

103104
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
105+
// Reset readiness on each retry to avoid stale state after conflicts.
106+
masterReady, workerReady, clientFullyReady := true, true, false
107+
runtimeReady = false
108+
104109
runtime, err := e.getRuntime()
105110
if err != nil {
106111
return err
@@ -122,16 +127,21 @@ func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValu
122127
}
123128

124129
if value.Client.Enabled {
125-
err = e.setClientComponentStatus(value.Client, &runtimeToUpdate.Status)
130+
clientFullyReady, err = e.setClientComponentStatus(value.Client, &runtimeToUpdate.Status)
126131
if err != nil {
127132
return err
128133
}
129134
}
130135

131-
if masterReady && workerReady {
132-
runtimeReady = true
133-
} else {
134-
e.Log.Info(fmt.Sprintf("MasterReady: %v, workerReady: %v", masterReady, workerReady))
136+
runtimeReady = masterReady && workerReady
137+
if !runtimeReady {
138+
e.Log.Info(fmt.Sprintf(
139+
"MasterReady: %v, workerReady: %v, clientFullyReady: %v, clientPhase: %s",
140+
masterReady,
141+
workerReady,
142+
clientFullyReady,
143+
runtimeToUpdate.Status.Client.Phase,
144+
))
135145
}
136146

137147
// Update the setup time
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
/*
2+
Copyright 2026 The Fluid Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package engine
18+
19+
import (
20+
"context"
21+
"errors"
22+
"testing"
23+
"time"
24+
25+
appsv1 "k8s.io/api/apps/v1"
26+
apierrors "k8s.io/apimachinery/pkg/api/errors"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/runtime/schema"
29+
"k8s.io/apimachinery/pkg/types"
30+
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
31+
32+
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
33+
"github.com/fluid-cloudnative/fluid/pkg/common"
34+
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
35+
)
36+
37+
const (
38+
testStatusNamespace = "default"
39+
testStatusRuntime = "curvine-demo"
40+
testStatusMaster = "curvine-demo-master"
41+
testStatusWorker = "curvine-demo-worker"
42+
testStatusClient = "curvine-demo-client"
43+
testCacheRuntimeGR = "cacheruntimes"
44+
testCacheRuntimeGV = "data.fluid.io"
45+
testStatusWorkloadAP = "apps/v1"
46+
)
47+
48+
func TestCheckAndUpdateRuntimeStatusClientNotReadyDoesNotBlockRuntimeReady(t *testing.T) {
49+
engine, client := newStatusTestEngineWithClient(
50+
t,
51+
fake.NewFakeClientWithScheme(
52+
datav1alpha1.UnitTestScheme,
53+
newStatusTestRuntime(),
54+
newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1),
55+
newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1),
56+
newDaemonSetComponent(testStatusClient, testStatusNamespace, 1, 0),
57+
),
58+
)
59+
60+
ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(true))
61+
if err != nil {
62+
t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err)
63+
}
64+
if !ready {
65+
t.Fatalf("expected runtime to become ready once master and worker are ready")
66+
}
67+
68+
updatedRuntime := getUpdatedRuntime(t, client)
69+
if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseNotReady {
70+
t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseNotReady, updatedRuntime.Status.Client.Phase)
71+
}
72+
if updatedRuntime.Status.SetupDuration == "" {
73+
t.Fatalf("expected setup duration to be recorded once runtime is ready")
74+
}
75+
}
76+
77+
func TestCheckAndUpdateRuntimeStatusClientPartialReadyDoesNotBlockRuntimeReady(t *testing.T) {
78+
engine, client := newStatusTestEngineWithClient(
79+
t,
80+
fake.NewFakeClientWithScheme(
81+
datav1alpha1.UnitTestScheme,
82+
newStatusTestRuntime(),
83+
newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1),
84+
newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1),
85+
newDaemonSetComponent(testStatusClient, testStatusNamespace, 2, 1),
86+
),
87+
)
88+
89+
ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(true))
90+
if err != nil {
91+
t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err)
92+
}
93+
if !ready {
94+
t.Fatalf("expected runtime to become ready once master and worker are ready")
95+
}
96+
97+
updatedRuntime := getUpdatedRuntime(t, client)
98+
if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhasePartialReady {
99+
t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhasePartialReady, updatedRuntime.Status.Client.Phase)
100+
}
101+
if updatedRuntime.Status.SetupDuration == "" {
102+
t.Fatalf("expected setup duration to be recorded once runtime is ready")
103+
}
104+
}
105+
106+
func TestCheckAndUpdateRuntimeStatusClientZeroDesiredReplicasReportsNotReady(t *testing.T) {
107+
engine, client := newStatusTestEngineWithClient(
108+
t,
109+
fake.NewFakeClientWithScheme(
110+
datav1alpha1.UnitTestScheme,
111+
newStatusTestRuntime(),
112+
newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1),
113+
newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1),
114+
newDaemonSetComponent(testStatusClient, testStatusNamespace, 0, 0),
115+
),
116+
)
117+
118+
ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(true))
119+
if err != nil {
120+
t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err)
121+
}
122+
if !ready {
123+
t.Fatalf("expected runtime to stay ready when client desires zero replicas")
124+
}
125+
126+
updatedRuntime := getUpdatedRuntime(t, client)
127+
if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseNotReady {
128+
t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseNotReady, updatedRuntime.Status.Client.Phase)
129+
}
130+
if updatedRuntime.Status.Client.DesiredReplicas != 0 {
131+
t.Fatalf("expected desired replicas to stay 0, got %d", updatedRuntime.Status.Client.DesiredReplicas)
132+
}
133+
}
134+
135+
func TestCheckAndUpdateRuntimeStatusClientFullyReadyReportsReady(t *testing.T) {
136+
engine, client := newStatusTestEngineWithClient(
137+
t,
138+
fake.NewFakeClientWithScheme(
139+
datav1alpha1.UnitTestScheme,
140+
newStatusTestRuntime(),
141+
newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1),
142+
newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1),
143+
newDaemonSetComponent(testStatusClient, testStatusNamespace, 2, 2),
144+
),
145+
)
146+
147+
ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(true))
148+
if err != nil {
149+
t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err)
150+
}
151+
if !ready {
152+
t.Fatalf("expected runtime to stay ready when client is fully ready")
153+
}
154+
155+
updatedRuntime := getUpdatedRuntime(t, client)
156+
if updatedRuntime.Status.Client.Phase != datav1alpha1.RuntimePhaseReady {
157+
t.Fatalf("expected client phase %q, got %q", datav1alpha1.RuntimePhaseReady, updatedRuntime.Status.Client.Phase)
158+
}
159+
if updatedRuntime.Status.Client.ReadyReplicas != updatedRuntime.Status.Client.DesiredReplicas {
160+
t.Fatalf("expected ready replicas to match desired replicas, got %d/%d", updatedRuntime.Status.Client.ReadyReplicas, updatedRuntime.Status.Client.DesiredReplicas)
161+
}
162+
}
163+
164+
func TestCheckAndUpdateRuntimeStatusRecomputesRuntimeReadyOnRetry(t *testing.T) {
165+
baseClient := fake.NewFakeClientWithScheme(
166+
datav1alpha1.UnitTestScheme,
167+
newStatusTestRuntime(),
168+
newStatefulSetComponent(testStatusMaster, testStatusNamespace, 1, 1),
169+
newStatefulSetComponent(testStatusWorker, testStatusNamespace, 1, 1),
170+
)
171+
172+
client := &conflictOnceClient{
173+
Client: baseClient,
174+
statusWriter: &conflictOnceStatusWriter{
175+
StatusWriter: baseClient.Status(),
176+
beforeConflict: func(ctx context.Context) error {
177+
worker := &appsv1.StatefulSet{}
178+
if err := baseClient.Get(ctx, types.NamespacedName{Name: testStatusWorker, Namespace: testStatusNamespace}, worker); err != nil {
179+
return err
180+
}
181+
182+
worker.Status.ReadyReplicas = 0
183+
worker.Status.AvailableReplicas = 0
184+
return baseClient.Status().Update(ctx, worker)
185+
},
186+
},
187+
}
188+
189+
engine, _ := newStatusTestEngineWithClient(t, client)
190+
ready, err := engine.CheckAndUpdateRuntimeStatus(newStatusTestRuntimeValue(false))
191+
if err != nil {
192+
t.Fatalf("CheckAndUpdateRuntimeStatus() unexpected error = %v", err)
193+
}
194+
if ready {
195+
t.Fatalf("expected runtime to be not ready after retry sees worker become not ready")
196+
}
197+
198+
updatedRuntime := getUpdatedRuntime(t, client)
199+
if updatedRuntime.Status.Worker.Phase != datav1alpha1.RuntimePhaseNotReady {
200+
t.Fatalf("expected worker phase %q, got %q", datav1alpha1.RuntimePhaseNotReady, updatedRuntime.Status.Worker.Phase)
201+
}
202+
if updatedRuntime.Status.SetupDuration != "" {
203+
t.Fatalf("expected setup duration to stay empty when final runtime status is not ready, got %q", updatedRuntime.Status.SetupDuration)
204+
}
205+
}
206+
207+
func newStatusTestEngineWithClient(t *testing.T, client ctrlclient.Client) (*CacheEngine, ctrlclient.Client) {
208+
t.Helper()
209+
210+
return &CacheEngine{
211+
Client: client,
212+
name: testStatusRuntime,
213+
namespace: testStatusNamespace,
214+
Log: fake.NullLogger(),
215+
}, client
216+
}
217+
218+
func newStatusTestRuntimeValue(enableClient bool) *common.CacheRuntimeValue {
219+
value := &common.CacheRuntimeValue{
220+
Master: newStatusTestComponentValue(testStatusMaster, "StatefulSet"),
221+
Worker: newStatusTestComponentValue(testStatusWorker, "StatefulSet"),
222+
Client: newStatusTestComponentValue(testStatusClient, "DaemonSet"),
223+
}
224+
value.Client.Enabled = enableClient
225+
226+
return value
227+
}
228+
229+
func newStatusTestComponentValue(name, kind string) *common.CacheRuntimeComponentValue {
230+
return &common.CacheRuntimeComponentValue{
231+
Enabled: true,
232+
Name: name,
233+
Namespace: testStatusNamespace,
234+
WorkloadType: metav1.TypeMeta{APIVersion: testStatusWorkloadAP, Kind: kind},
235+
}
236+
}
237+
238+
func newStatusTestRuntime() *datav1alpha1.CacheRuntime {
239+
return &datav1alpha1.CacheRuntime{
240+
ObjectMeta: metav1.ObjectMeta{
241+
Name: testStatusRuntime,
242+
Namespace: testStatusNamespace,
243+
CreationTimestamp: metav1.NewTime(time.Now().Add(-time.Minute)),
244+
},
245+
}
246+
}
247+
248+
func getUpdatedRuntime(t *testing.T, client ctrlclient.Client) *datav1alpha1.CacheRuntime {
249+
t.Helper()
250+
251+
updatedRuntime := &datav1alpha1.CacheRuntime{}
252+
if err := client.Get(context.TODO(), types.NamespacedName{Name: testStatusRuntime, Namespace: testStatusNamespace}, updatedRuntime); err != nil {
253+
t.Fatalf("failed to get updated runtime: %v", err)
254+
}
255+
256+
return updatedRuntime
257+
}
258+
259+
func newStatefulSetComponent(name, namespace string, desiredReplicas, readyReplicas int32) *appsv1.StatefulSet {
260+
replicas := desiredReplicas
261+
return &appsv1.StatefulSet{
262+
ObjectMeta: metav1.ObjectMeta{
263+
Name: name,
264+
Namespace: namespace,
265+
},
266+
Spec: appsv1.StatefulSetSpec{
267+
Replicas: &replicas,
268+
},
269+
Status: appsv1.StatefulSetStatus{
270+
CurrentReplicas: desiredReplicas,
271+
AvailableReplicas: readyReplicas,
272+
ReadyReplicas: readyReplicas,
273+
},
274+
}
275+
}
276+
277+
func newDaemonSetComponent(name, namespace string, desiredReplicas, readyReplicas int32) *appsv1.DaemonSet {
278+
return &appsv1.DaemonSet{
279+
ObjectMeta: metav1.ObjectMeta{
280+
Name: name,
281+
Namespace: namespace,
282+
},
283+
Status: appsv1.DaemonSetStatus{
284+
CurrentNumberScheduled: desiredReplicas,
285+
DesiredNumberScheduled: desiredReplicas,
286+
NumberAvailable: readyReplicas,
287+
NumberReady: readyReplicas,
288+
NumberUnavailable: desiredReplicas - readyReplicas,
289+
},
290+
}
291+
}
292+
293+
type conflictOnceClient struct {
294+
ctrlclient.Client
295+
statusWriter ctrlclient.StatusWriter
296+
}
297+
298+
func (c *conflictOnceClient) Status() ctrlclient.StatusWriter {
299+
return c.statusWriter
300+
}
301+
302+
type conflictOnceStatusWriter struct {
303+
ctrlclient.StatusWriter
304+
beforeConflict func(ctx context.Context) error
305+
conflicted bool
306+
}
307+
308+
func (w *conflictOnceStatusWriter) Update(ctx context.Context, obj ctrlclient.Object, opts ...ctrlclient.SubResourceUpdateOption) error {
309+
if !w.conflicted {
310+
w.conflicted = true
311+
if w.beforeConflict != nil {
312+
if err := w.beforeConflict(ctx); err != nil {
313+
return err
314+
}
315+
}
316+
317+
return apierrors.NewConflict(
318+
schema.GroupResource{Group: testCacheRuntimeGV, Resource: testCacheRuntimeGR},
319+
obj.GetName(),
320+
errors.New("injected conflict"),
321+
)
322+
}
323+
324+
return w.StatusWriter.Update(ctx, obj, opts...)
325+
}

0 commit comments

Comments
 (0)