Skip to content

Commit bdb2bd3

Browse files
committed
Add periodic requeue for Running phase
1 parent bfa2b6c commit bdb2bd3

5 files changed

Lines changed: 78 additions & 37 deletions

pkg/controller/bearer_from_chunk_controller.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,5 +208,7 @@ func (c *BearerFromChunkController) handler(ctx context.Context, name string) {
208208
klog.Errorf("failed to update bearer status: %v", err)
209209
return
210210
}
211+
case v1alpha1.ChunkPhaseRunning:
212+
c.workqueue.AddAfter(name, 10*time.Second)
211213
}
212214
}

pkg/controller/bearer_to_chunk_controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ func (c *BearerToChunkController) handler(ctx context.Context, name string) {
163163
return
164164
}
165165

166+
c.workqueue.AddAfter(name, 10*time.Second)
166167
case v1alpha1.BearerPhaseSucceeded:
167168
c.cleanupBearer(bearer)
168169
}

pkg/controller/blob_from_chunk_controller.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ func (c *BlobFromChunkController) handler(ctx context.Context, name string) {
167167
klog.Errorf("failed to update blob status: %v", err)
168168
return
169169
}
170+
171+
if blob.Status.Phase == v1alpha1.BlobPhaseRunning {
172+
c.workqueue.AddAfter(name, 10*time.Second)
173+
}
170174
return
171175
}
172176

@@ -187,6 +191,10 @@ func (c *BlobFromChunkController) handler(ctx context.Context, name string) {
187191
c.workqueue.AddAfter(name, 5*time.Second)
188192
return
189193
}
194+
195+
if blob.Status.Phase == v1alpha1.BlobPhaseRunning {
196+
c.workqueue.AddAfter(name, 10*time.Second)
197+
}
190198
return
191199
}
192200

@@ -212,6 +220,10 @@ func (c *BlobFromChunkController) handler(ctx context.Context, name string) {
212220
klog.Errorf("failed to update blob status: %v", err)
213221
return
214222
}
223+
224+
if blob.Status.Phase == v1alpha1.BlobPhaseRunning {
225+
c.workqueue.AddAfter(name, 10*time.Second)
226+
}
215227
return
216228
}
217229

@@ -232,6 +244,10 @@ func (c *BlobFromChunkController) handler(ctx context.Context, name string) {
232244
klog.Errorf("failed to update blob status: %v", err)
233245
return
234246
}
247+
248+
if blob.Status.Phase == v1alpha1.BlobPhaseRunning {
249+
c.workqueue.AddAfter(name, 10*time.Second)
250+
}
235251
}
236252

237253
func (c *BlobFromChunkController) fromHeadChunk(ctx context.Context, blob *v1alpha1.Blob) error {

pkg/controller/blob_to_chunk_controller.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ func (c *BlobToChunkController) handler(ctx context.Context, name string) {
185185
klog.Errorf("failed to create chunk for blob %s: %v", blob.Name, err)
186186
return
187187
}
188+
189+
c.workqueue.AddAfter(name, 10*time.Second)
188190
return
189191
}
190192

@@ -195,6 +197,8 @@ func (c *BlobToChunkController) handler(ctx context.Context, name string) {
195197
klog.Errorf("failed to create head chunk for blob %s: %v", blob.Name, err)
196198
return
197199
}
200+
201+
c.workqueue.AddAfter(name, 10*time.Second)
198202
return
199203
}
200204

@@ -205,6 +209,8 @@ func (c *BlobToChunkController) handler(ctx context.Context, name string) {
205209
klog.Errorf("failed to create chunks for blob %s: %v", blob.Name, err)
206210
return
207211
}
212+
213+
c.workqueue.AddAfter(name, 10*time.Second)
208214
return
209215
}
210216

@@ -215,6 +221,7 @@ func (c *BlobToChunkController) handler(ctx context.Context, name string) {
215221
return
216222
}
217223

224+
c.workqueue.AddAfter(name, 10*time.Second)
218225
case v1alpha1.BlobPhaseSucceeded:
219226
c.cleanupBlob(blob)
220227
}

pkg/controller/chunk_from_bearer_controller.go

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
informers "github.com/OpenCIDN/cidn/pkg/informers/externalversions/task/v1alpha1"
2727
"github.com/OpenCIDN/cidn/pkg/internal/utils"
2828
apierrors "k8s.io/apimachinery/pkg/api/errors"
29+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930
"k8s.io/apimachinery/pkg/labels"
3031
"k8s.io/client-go/tools/cache"
3132
"k8s.io/client-go/util/workqueue"
@@ -54,28 +55,18 @@ func NewChunkFromBearerController(
5455
bearerInformer: sharedInformerFactory.Task().V1alpha1().Bearers(),
5556
client: client,
5657
workqueue: workqueue.NewTypedDelayingQueue[string](),
57-
concurrency: 6,
58+
concurrency: 5,
5859
}
5960

6061
c.bearerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
6162
AddFunc: func(obj interface{}) {
6263
bearer := obj.(*v1alpha1.Bearer)
6364
key := bearer.Name
64-
65-
if bearer.Status.Phase != v1alpha1.BearerPhaseSucceeded {
66-
return
67-
}
68-
6965
c.workqueue.Add(key)
7066
},
7167
UpdateFunc: func(oldObj, newObj interface{}) {
7268
bearer := newObj.(*v1alpha1.Bearer)
7369
key := bearer.Name
74-
75-
if bearer.Status.Phase != v1alpha1.BearerPhaseSucceeded {
76-
return
77-
}
78-
7970
c.workqueue.Add(key)
8071
},
8172
})
@@ -108,40 +99,64 @@ func (c *ChunkFromBearerController) processNextItem(ctx context.Context) bool {
10899
}
109100

110101
func (c *ChunkFromBearerController) handler(ctx context.Context, name string) {
111-
chunkList, err := c.chunkInformer.Lister().List(labels.Everything())
102+
bearer, err := c.bearerInformer.Lister().Get(name)
112103
if err != nil {
113-
c.workqueue.AddAfter(name, 5*time.Second)
114-
klog.Errorf("failed to list chunks: %v", err)
115-
return
104+
if !apierrors.IsNotFound(err) {
105+
c.workqueue.AddAfter(name, 5*time.Second)
106+
klog.Errorf("failed to get bearer '%s': %v", name, err)
107+
return
108+
}
109+
bearer, err = c.client.TaskV1alpha1().Bearers().Get(ctx, name, metav1.GetOptions{})
110+
if err != nil {
111+
if apierrors.IsNotFound(err) {
112+
return
113+
}
114+
c.workqueue.AddAfter(name, 5*time.Second)
115+
klog.Errorf("failed to get bearer '%s' from API server: %v", name, err)
116+
return
117+
}
116118
}
117119

118-
var retry bool
119-
for _, chunk := range chunkList {
120-
if chunk.Spec.BearerName != name {
121-
continue
122-
}
123-
if chunk.Status.Phase != v1alpha1.ChunkPhaseFailed {
124-
continue
120+
switch bearer.Status.Phase {
121+
case v1alpha1.BearerPhaseSucceeded:
122+
chunkList, err := c.chunkInformer.Lister().List(labels.Everything())
123+
if err != nil {
124+
c.workqueue.AddAfter(name, 5*time.Second)
125+
klog.Errorf("failed to list chunks: %v", err)
126+
return
125127
}
126128

127-
if !chunk.Status.Retryable {
128-
continue
129+
var retry bool
130+
for _, chunk := range chunkList {
131+
if chunk.Spec.BearerName != name {
132+
continue
133+
}
134+
if chunk.Status.Phase != v1alpha1.ChunkPhaseFailed {
135+
continue
136+
}
137+
138+
if !chunk.Status.Retryable {
139+
continue
140+
}
141+
142+
_, err = utils.UpdateResourceStatusWithRetry(ctx, c.client.TaskV1alpha1().Chunks(), chunk, func(ch *v1alpha1.Chunk) *v1alpha1.Chunk {
143+
ch.Status.HandlerName = ""
144+
ch.Status.Phase = v1alpha1.ChunkPhasePending
145+
ch.Status.Progress = 0
146+
ch.Status.Conditions = nil
147+
return ch
148+
})
149+
if err != nil && !apierrors.IsNotFound(err) {
150+
klog.Errorf("failed to release chunk %s: %v", chunk.Name, err)
151+
retry = true
152+
}
129153
}
130154

131-
_, err = utils.UpdateResourceStatusWithRetry(ctx, c.client.TaskV1alpha1().Chunks(), chunk, func(ch *v1alpha1.Chunk) *v1alpha1.Chunk {
132-
ch.Status.HandlerName = ""
133-
ch.Status.Phase = v1alpha1.ChunkPhasePending
134-
ch.Status.Progress = 0
135-
ch.Status.Conditions = nil
136-
return ch
137-
})
138-
if err != nil && !apierrors.IsNotFound(err) {
139-
klog.Errorf("failed to release chunk %s: %v", chunk.Name, err)
140-
retry = true
155+
if retry {
156+
c.workqueue.AddAfter(name, 5*time.Second)
141157
}
158+
case v1alpha1.BearerPhaseRunning, v1alpha1.BearerPhaseUnknown:
159+
c.workqueue.AddAfter(name, 10*time.Second)
142160
}
143161

144-
if retry {
145-
c.workqueue.AddAfter(name, 5*time.Second)
146-
}
147162
}

0 commit comments

Comments
 (0)