Skip to content

Commit c174aba

Browse files
committed
Clean up
1 parent c9441dc commit c174aba

11 files changed

Lines changed: 420 additions & 348 deletions

pkg/controller/bearer_from_chunk_controller.go

Lines changed: 54 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package controller
1919
import (
2020
"context"
2121
"encoding/json"
22-
"fmt"
23-
"math/rand"
2422
"time"
2523

2624
"github.com/OpenCIDN/cidn/pkg/apis/task/v1alpha1"
@@ -59,41 +57,23 @@ func NewBearerFromChunkController(
5957
}
6058

6159
c.bearerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
62-
UpdateFunc: func(oldObj, newObj interface{}) {
63-
bearer := newObj.(*v1alpha1.Bearer)
64-
key := bearer.Name
65-
c.workqueue.Add(key)
60+
AddFunc: func(obj interface{}) {
61+
bearer := obj.(*v1alpha1.Bearer)
62+
c.workqueue.Add(bearer.Name)
6663
},
6764
})
6865

6966
c.chunkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
7067
AddFunc: func(obj interface{}) {
71-
chunk, ok := obj.(*v1alpha1.Chunk)
72-
if !ok {
73-
return
74-
}
75-
68+
chunk := obj.(*v1alpha1.Chunk)
7669
bearerName := chunk.Annotations[BearerNameAnnotationKey]
7770
if bearerName == "" {
7871
return
7972
}
8073
c.workqueue.Add(bearerName)
8174
},
8275
UpdateFunc: func(oldObj, newObj interface{}) {
83-
newChunk := newObj.(*v1alpha1.Chunk)
84-
85-
bearerName := newChunk.Annotations[BearerNameAnnotationKey]
86-
if bearerName == "" {
87-
return
88-
}
89-
c.workqueue.Add(bearerName)
90-
},
91-
DeleteFunc: func(obj interface{}) {
92-
chunk, ok := obj.(*v1alpha1.Chunk)
93-
if !ok {
94-
return
95-
}
96-
76+
chunk := newObj.(*v1alpha1.Chunk)
9777
bearerName := chunk.Annotations[BearerNameAnnotationKey]
9878
if bearerName == "" {
9979
return
@@ -124,58 +104,72 @@ func (c *BearerFromChunkController) processNextItem(ctx context.Context) bool {
124104
}
125105
defer c.workqueue.Done(key)
126106

127-
err := c.chunkHandler(ctx, key)
128-
if err != nil {
129-
c.workqueue.AddAfter(key, 5*time.Second+time.Duration(rand.Intn(100))*time.Millisecond)
130-
klog.Errorf("error bearer chunking '%s': %v, requeuing", key, err)
131-
return true
132-
}
107+
c.handler(ctx, key)
133108

134109
return true
135110
}
136111

137-
func (c *BearerFromChunkController) chunkHandler(ctx context.Context, name string) error {
112+
func (c *BearerFromChunkController) handler(ctx context.Context, name string) {
138113
bearer, err := c.bearerInformer.Lister().Get(name)
139114
if err != nil {
140-
if apierrors.IsNotFound(err) {
141-
return nil
115+
if !apierrors.IsNotFound(err) {
116+
c.workqueue.AddAfter(name, 5*time.Second)
117+
klog.Errorf("failed to get bearer '%s': %v", name, err)
118+
return
119+
}
120+
bearer, err = c.client.TaskV1alpha1().Bearers().Get(ctx, name, metav1.GetOptions{})
121+
if err != nil {
122+
if apierrors.IsNotFound(err) {
123+
return
124+
}
125+
c.workqueue.AddAfter(name, 5*time.Second)
126+
klog.Errorf("failed to get bearer '%s' from API server: %v", name, err)
127+
return
142128
}
143-
return err
144129
}
145130

146131
if bearer.Status.HandlerName != c.handlerName {
147-
return nil
132+
return
148133
}
149134

150135
if bearer.Status.Phase == v1alpha1.BearerPhaseSucceeded {
151-
return nil
136+
return
152137
}
153138

154-
err = c.fromChunk(ctx, bearer)
155-
if err != nil {
156-
return fmt.Errorf("failed to update bearer status for bearer %s: %v", bearer.Name, err)
157-
}
158-
159-
return nil
160-
}
161-
162-
func (c *BearerFromChunkController) fromChunk(ctx context.Context, bearer *v1alpha1.Bearer) error {
163139
chunkName := buildBearerChunkName(bearer.Name)
164140
chunk, err := c.chunkInformer.Lister().Get(chunkName)
165141
if err != nil {
166-
return fmt.Errorf("failed to get chunk: %w", err)
142+
if apierrors.IsNotFound(err) && c.chunkInformer.Informer().HasSynced() {
143+
return
144+
}
145+
c.workqueue.AddAfter(name, 5*time.Second)
146+
klog.Errorf("failed to get chunk %s: %v", chunkName, err)
147+
return
167148
}
168149

169150
if chunk.Status.SourceResponse == nil {
170-
return nil
151+
return
171152
}
172153

173154
switch chunk.Status.Phase {
174155
case v1alpha1.ChunkPhaseSucceeded:
175156
bti := v1alpha1.BearerTokenInfo{}
176157
err = json.Unmarshal(chunk.Status.ResponseBody, &bti)
177158
if err != nil {
178-
return fmt.Errorf("failed to unmarshal chunk response body: %v", err)
159+
_, err = utils.UpdateResourceStatusWithRetry(ctx, c.client.TaskV1alpha1().Bearers(), bearer, func(b *v1alpha1.Bearer) *v1alpha1.Bearer {
160+
b.Status.Phase = v1alpha1.BearerPhaseFailed
161+
b.Status.Conditions = v1alpha1.AppendConditions(b.Status.Conditions, v1alpha1.Condition{
162+
Type: "InvalidTokenInfo",
163+
Message: "Failed to unmarshal token info from chunk response body: " + err.Error(),
164+
})
165+
return b
166+
})
167+
if err != nil {
168+
c.workqueue.AddAfter(name, 5*time.Second)
169+
klog.Errorf("failed to update bearer status: %v", err)
170+
return
171+
}
172+
return
179173
}
180174

181175
_, err = utils.UpdateResourceStatusWithRetry(ctx, c.client.TaskV1alpha1().Bearers(), bearer, func(b *v1alpha1.Bearer) *v1alpha1.Bearer {
@@ -184,14 +178,19 @@ func (c *BearerFromChunkController) fromChunk(ctx context.Context, bearer *v1alp
184178
return b
185179
})
186180
if err != nil {
187-
return fmt.Errorf("failed to update bearer status: %v", err)
181+
c.workqueue.AddAfter(name, 5*time.Second)
182+
klog.Errorf("failed to update bearer status: %v", err)
183+
return
188184
}
189185

190186
err = c.client.TaskV1alpha1().Chunks().Delete(ctx, chunkName, metav1.DeleteOptions{})
191187
if err != nil {
192-
if !apierrors.IsNotFound(err) {
193-
return fmt.Errorf("failed to update bearer status: %v", err)
188+
if apierrors.IsNotFound(err) {
189+
return
194190
}
191+
c.workqueue.AddAfter(name, 5*time.Second)
192+
klog.Errorf("failed to delete chunk %s: %v", chunkName, err)
193+
return
195194
}
196195
case v1alpha1.ChunkPhaseFailed:
197196
_, err = utils.UpdateResourceStatusWithRetry(ctx, c.client.TaskV1alpha1().Bearers(), bearer, func(b *v1alpha1.Bearer) *v1alpha1.Bearer {
@@ -205,8 +204,9 @@ func (c *BearerFromChunkController) fromChunk(ctx context.Context, bearer *v1alp
205204
return b
206205
})
207206
if err != nil {
208-
return fmt.Errorf("failed to update bearer status: %v", err)
207+
c.workqueue.AddAfter(name, 5*time.Second)
208+
klog.Errorf("failed to update bearer status: %v", err)
209+
return
209210
}
210211
}
211-
return nil
212212
}

pkg/controller/bearer_hold_controller.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package controller
1919
import (
2020
"context"
2121
"fmt"
22-
"math/rand"
2322
"sync"
2423
"time"
2524

@@ -130,42 +129,47 @@ func (c *BearerHoldController) processNextItem(ctx context.Context) bool {
130129
}
131130
defer c.workqueue.Done(key)
132131

133-
err := c.chunkHandler(ctx, key)
134-
if err != nil {
135-
c.workqueue.AddAfter(key, 5*time.Second+time.Duration(rand.Intn(100))*time.Millisecond)
136-
klog.Errorf("error bearer chunking '%s': %v, requeuing", key, err)
137-
return true
138-
}
132+
c.handler(ctx, key)
139133

140134
return true
141135
}
142136

143-
func (c *BearerHoldController) chunkHandler(ctx context.Context, name string) error {
137+
func (c *BearerHoldController) handler(ctx context.Context, name string) {
144138
bearer, err := c.bearerInformer.Lister().Get(name)
145139
if err != nil {
146-
if apierrors.IsNotFound(err) {
147-
return nil
140+
if !apierrors.IsNotFound(err) {
141+
c.workqueue.AddAfter(name, 5*time.Second)
142+
klog.Errorf("failed to get bearer '%s': %v", name, err)
143+
return
144+
}
145+
bearer, err = c.client.TaskV1alpha1().Bearers().Get(ctx, name, metav1.GetOptions{})
146+
if err != nil {
147+
if apierrors.IsNotFound(err) {
148+
return
149+
}
150+
c.workqueue.AddAfter(name, 5*time.Second)
151+
klog.Errorf("failed to get bearer '%s' from API server: %v", name, err)
152+
return
148153
}
149-
return err
150154
}
151155

152156
if bearer.Status.HandlerName != "" {
153-
return nil
157+
return
154158
}
155159

156160
if bearer.Status.Phase != v1alpha1.BearerPhasePending {
157-
return nil
161+
return
158162
}
159163

160164
bearer.Status.HandlerName = c.handlerName
161165
bearer.Status.Phase = v1alpha1.BearerPhaseRunning
162166
_, err = c.client.TaskV1alpha1().Bearers().UpdateStatus(ctx, bearer, metav1.UpdateOptions{})
163167
if err != nil {
164-
if apierrors.IsConflict(err) {
165-
return nil
168+
if apierrors.IsConflict(err) || apierrors.IsNotFound(err) {
169+
return
166170
}
167-
return fmt.Errorf("failed to update bearer %s: %v", bearer.Name, err)
171+
c.workqueue.AddAfter(name, 5*time.Second)
172+
klog.Errorf("failed to update bearer %s: %v", bearer.Name, err)
173+
return
168174
}
169-
170-
return nil
171175
}

pkg/controller/bearer_to_chunk_controller.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package controller
1919
import (
2020
"context"
2121
"fmt"
22-
"math/rand"
2322
"net/http"
2423
"net/url"
2524
"time"
@@ -127,48 +126,53 @@ func (c *BearerToChunkController) processNextItem(ctx context.Context) bool {
127126
}
128127
defer c.workqueue.Done(key)
129128

130-
err := c.chunkHandler(ctx, key)
131-
if err != nil {
132-
c.workqueue.AddAfter(key, 5*time.Second+time.Duration(rand.Intn(100))*time.Millisecond)
133-
klog.Errorf("error bearer chunking '%s': %v, requeuing", key, err)
134-
return true
135-
}
129+
c.handler(ctx, key)
136130

137131
return true
138132
}
139133

140-
func (c *BearerToChunkController) chunkHandler(ctx context.Context, name string) error {
134+
func (c *BearerToChunkController) handler(ctx context.Context, name string) {
141135
bearer, err := c.bearerInformer.Lister().Get(name)
142136
if err != nil {
143-
if apierrors.IsNotFound(err) {
144-
return nil
137+
if !apierrors.IsNotFound(err) {
138+
c.workqueue.AddAfter(name, 5*time.Second)
139+
klog.Errorf("failed to get bearer '%s': %v", name, err)
140+
return
141+
}
142+
bearer, err = c.client.TaskV1alpha1().Bearers().Get(ctx, name, metav1.GetOptions{})
143+
if err != nil {
144+
if apierrors.IsNotFound(err) {
145+
return
146+
}
147+
c.workqueue.AddAfter(name, 5*time.Second)
148+
klog.Errorf("failed to get bearer '%s' from API server: %v", name, err)
149+
return
145150
}
146-
return err
147151
}
148152

149153
if bearer.Status.HandlerName != c.handlerName {
150-
return nil
154+
return
151155
}
152156

153157
switch bearer.Status.Phase {
154158
case v1alpha1.BearerPhaseRunning, v1alpha1.BearerPhaseUnknown:
155-
err := c.toGetChunk(ctx, bearer)
159+
err := c.toChunk(ctx, bearer)
156160
if err != nil {
157-
return fmt.Errorf("failed to create chunk for bearer %s: %v", bearer.Name, err)
161+
c.workqueue.AddAfter(name, 5*time.Second)
162+
klog.Errorf("failed to create chunk for bearer %s: %v", bearer.Name, err)
163+
return
158164
}
159165

160166
case v1alpha1.BearerPhaseSucceeded:
161167
c.cleanupBearer(bearer)
162168
}
163-
164-
return nil
165169
}
166170

167171
func buildBearerChunkName(bearerName string) string {
168172
return fmt.Sprintf("bearer:%s", bearerName)
169173
}
170174

171-
func (c *BearerToChunkController) toGetChunk(ctx context.Context, bearer *v1alpha1.Bearer) error {
175+
func (c *BearerToChunkController) toChunk(ctx context.Context, bearer *v1alpha1.Bearer) error {
172176
chunkName := buildBearerChunkName(bearer.Name)
173177
existingChunk, err := c.chunkInformer.Lister().Get(chunkName)
174178
if err == nil && existingChunk != nil {

0 commit comments

Comments
 (0)