Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 30 additions & 28 deletions pkg/pods/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func NewWithDefaults(name, ns string, client k8s.Interface) *Pod {
}
}

// podResult holds the result of pod status check
type podResult struct {
pod *corev1.Pod
err error
}

// Wait wait for the pod to get up and running
func (p *Pod) Wait() (*corev1.Pod, error) {
// ensure pod exists before we actually check for it
Expand All @@ -77,66 +83,62 @@ func (p *Pod) Wait() (*corev1.Pod, error) {
}

stopC := make(chan struct{})
eventC := make(chan interface{}, 10)
mu := sync.Mutex{}
defer func() {
mu.Lock()
close(stopC)
close(eventC)
mu.Unlock()
}()

p.watcher(stopC, eventC, &mu)
var result podResult

var pod *corev1.Pod
var err error
for e := range eventC {
pod, err = checkPodStatus(e)
if pod != nil || err != nil {
break
}
}
// Start watcher in a goroutine
go func() {
p.watcher(stopC, &result, &mu)
}()

return pod, err
// Wait for stopC
<-stopC
return result.pod, result.err
}

func (p *Pod) watcher(stopC <-chan struct{}, eventC chan<- interface{}, mu *sync.Mutex) {
func (p *Pod) watcher(stopC chan struct{}, result *podResult, mu *sync.Mutex) {
factory := informers.NewSharedInformerFactoryWithOptions(
p.Kc, time.Second*10,
informers.WithNamespace(p.Ns),
informers.WithTweakListOptions(podOpts(p.Name)))

updatePodStatus := func(obj interface{}) {
mu.Lock()
defer mu.Unlock()

pod, err := checkPodStatus(obj)
if pod != nil || err != nil {
result.pod = pod
result.err = err
close(stopC)
}
}

_, err := factory.Core().V1().Pods().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mu.Lock()
defer mu.Unlock()
select {
case <-stopC:
return
default:
// default is used to avoid pseudo-random selection of multiple matching cases
eventC <- obj
updatePodStatus(obj)
}
},
UpdateFunc: func(_, newObj interface{}) {
mu.Lock()
defer mu.Unlock()
select {
case <-stopC:
return
default:
eventC <- newObj
updatePodStatus(newObj)
}
},
DeleteFunc: func(obj interface{}) {
mu.Lock()
defer mu.Unlock()
select {
case <-stopC:
return
default:
eventC <- obj
updatePodStatus(obj)
}
},
})
Expand Down