Skip to content

Commit bfec760

Browse files
committed
Reset chunk status to pending if unchanged for over 2 minutes
1 parent 786a887 commit bfec760

1 file changed

Lines changed: 54 additions & 11 deletions

File tree

pkg/runner/chunk_runner.go

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,7 @@ func (r *ChunkRunner) process(continues <-chan struct{}, chunk *v1alpha1.Chunk)
677677
func (r *ChunkRunner) startProgressUpdater(ctx context.Context, s *state, gsr **readCount, gdrs *[]*swmrCount) func() {
678678
var (
679679
prevStatus *v1alpha1.ChunkStatus
680-
lastUpdateTime time.Time
680+
lastUpdateTime = time.Now()
681681
)
682682

683683
ctx, cancel := context.WithCancel(ctx)
@@ -688,24 +688,67 @@ func (r *ChunkRunner) startProgressUpdater(ctx context.Context, s *state, gsr **
688688
updateProgress(&ss.Status, &ss.Spec, *gsr, *gdrs)
689689
}
690690

691-
if prevStatus != nil && reflect.DeepEqual(prevStatus, &ss.Status) && time.Since(lastUpdateTime) < 2*r.updateDuration {
692-
klog.Infof("No changes detected for chunk %s, skipping update. Last update was %s ago", ss.Name, time.Since(lastUpdateTime))
693-
return ss
694-
}
691+
if reflect.DeepEqual(prevStatus, &ss.Status) {
692+
since := time.Since(lastUpdateTime)
693+
if since <= 2*time.Minute {
694+
return ss
695+
}
695696

696-
s := ss.Status.DeepCopy()
697-
chunk, err := utils.UpdateResourceStatusWithRetry(ctx, r.client.TaskV1alpha1().Chunks(), ss, func(ss *v1alpha1.Chunk) *v1alpha1.Chunk {
698-
ss.Status = *s
697+
chunk := ss.DeepCopy()
698+
chunk.Status.Phase = v1alpha1.ChunkPhasePending
699+
chunk.Status.HandlerName = ""
700+
_, err := r.client.TaskV1alpha1().Chunks().UpdateStatus(ctx, chunk, metav1.UpdateOptions{})
701+
if err != nil {
702+
klog.Infof("Failed to reset chunk status for chunk %s: %v", chunk.Name, err)
703+
return ss
704+
}
705+
klog.Infof("Reset chunk status %s", chunk.Name)
706+
cancel()
699707
return ss
700-
})
708+
}
701709

710+
chunk, err := r.client.TaskV1alpha1().Chunks().UpdateStatus(ctx, ss, metav1.UpdateOptions{})
702711
if err != nil {
703712
if apierrors.IsNotFound(err) {
713+
klog.Infof("Chunk %s not found: %v", ss.Name, err)
704714
cancel()
705715
return ss
706716
}
707-
klog.Infof("Failed to update chunk status for chunk %s: %v", ss.Name, err)
708-
return ss
717+
if !apierrors.IsConflict(err) {
718+
klog.Infof("Failed to update chunk status for chunk %s: %v", ss.Name, err)
719+
return ss
720+
}
721+
722+
chunk, err = r.client.TaskV1alpha1().Chunks().Get(ctx, ss.Name, metav1.GetOptions{})
723+
if err != nil {
724+
if apierrors.IsNotFound(err) {
725+
klog.Infof("Chunk %s not found: %v", ss.Name, err)
726+
cancel()
727+
return ss
728+
}
729+
klog.Infof("Failed to get chunk %s: %v", ss.Name, err)
730+
return ss
731+
}
732+
if chunk.Status.HandlerName != r.handlerName {
733+
klog.Infof("Chunk %s handler name changed: %v", ss.Name, err)
734+
cancel()
735+
return ss
736+
}
737+
738+
if *gsr != nil {
739+
updateProgress(&chunk.Status, &chunk.Spec, *gsr, *gdrs)
740+
}
741+
742+
chunk, err = r.client.TaskV1alpha1().Chunks().UpdateStatus(ctx, chunk, metav1.UpdateOptions{})
743+
if err != nil {
744+
if apierrors.IsNotFound(err) {
745+
klog.Infof("Chunk %s not found: %v", ss.Name, err)
746+
cancel()
747+
return ss
748+
}
749+
klog.Infof("Failed to update chunk status for chunk %s: %v", ss.Name, err)
750+
return ss
751+
}
709752
}
710753

711754
prevStatus = chunk.Status.DeepCopy()

0 commit comments

Comments
 (0)