Skip to content
This repository was archived by the owner on Sep 18, 2020. It is now read-only.

Commit abaa220

Browse files
authored
Merge pull request #111 from dghubble/agent-controller-pattern
Use controller pattern for update-agent
2 parents 7e9d5a8 + 42aaba2 commit abaa220

2 files changed

Lines changed: 78 additions & 57 deletions

File tree

cmd/update-agent/main.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ func main() {
4141

4242
glog.Infof("%s running", os.Args[0])
4343

44-
if err := a.Run(); err != nil {
45-
glog.Fatalf("Error while running %s: %v", os.Args[0], err)
46-
}
44+
// Run agent until the stop channel is closed
45+
stop := make(chan struct{})
46+
defer close(stop)
47+
a.Run(stop)
4748
}

pkg/agent/agent.go

Lines changed: 74 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -60,59 +60,22 @@ func New(node string) (*Klocksmith, error) {
6060
return &Klocksmith{node, kc, nc, ue, lc}, nil
6161
}
6262

63-
// updateStatusCallback receives Status messages from update engine. If the
64-
// status is UpdateStatusUpdatedNeedReboot, indicate that with a label on our
65-
// node.
66-
func (k *Klocksmith) updateStatusCallback(s updateengine.Status) {
67-
glog.Info("Updating status")
68-
// update our status
69-
anno := map[string]string{
70-
constants.AnnotationStatus: s.CurrentOperation,
71-
constants.AnnotationLastCheckedTime: fmt.Sprintf("%d", s.LastCheckedTime),
72-
constants.AnnotationNewVersion: s.NewVersion,
73-
}
63+
// Run starts the agent to listen for an update_engine reboot signal and react
64+
// by draining pods and rebooting. Runs until the stop channel is closed.
65+
func (k *Klocksmith) Run(stop <-chan struct{}) {
66+
glog.V(5).Info("Starting agent")
7467

75-
// indicate we need a reboot
76-
if s.CurrentOperation == updateengine.UpdateStatusUpdatedNeedReboot {
77-
glog.Info("Indicating a reboot is needed")
78-
anno[constants.AnnotationRebootNeeded] = constants.True
68+
// agent process should reboot the node, no need to loop
69+
if err := k.process(stop); err != nil {
70+
glog.Errorf("Error running agent process: %v", err)
7971
}
8072

81-
wait.PollUntil(10*time.Second, func() (bool, error) {
82-
if err := k8sutil.SetNodeAnnotations(k.nc, k.node, anno); err != nil {
83-
glog.Errorf("Failed to set annotation %q: %v", constants.AnnotationStatus, err)
84-
return false, nil
85-
}
86-
87-
return true, nil
88-
}, wait.NeverStop)
73+
glog.V(5).Info("Stopping agent")
8974
}
9075

91-
// setInfoLabels labels our node with helpful info about Container Linux.
92-
func (k *Klocksmith) setInfoLabels() error {
93-
vi, err := k8sutil.GetVersionInfo()
94-
if err != nil {
95-
return fmt.Errorf("failed to get version info: %v", err)
96-
}
97-
98-
labels := map[string]string{
99-
constants.LabelID: vi.ID,
100-
constants.LabelGroup: vi.Group,
101-
constants.LabelVersion: vi.Version,
102-
}
103-
104-
if err := k8sutil.SetNodeLabels(k.nc, k.node, labels); err != nil {
105-
return err
106-
}
107-
108-
return nil
109-
}
110-
111-
// Run runs klocksmithd, reacting to the update_engine reboot signal by
112-
// draining pods on this kubernetes node and rebooting.
113-
//
114-
// TODO(mischief): try to be more resilient against transient failures
115-
func (k *Klocksmith) Run() error {
76+
// process performs the agent reconciliation to reboot the node or stops when
77+
// the stop channel is closed.
78+
func (k *Klocksmith) process(stop <-chan struct{}) error {
11679
glog.Info("Setting info labels")
11780
if err := k.setInfoLabels(); err != nil {
11881
return fmt.Errorf("failed to set node info: %v", err)
@@ -141,8 +104,7 @@ func (k *Klocksmith) Run() error {
141104
}
142105

143106
// watch update engine for status updates
144-
watchUpdateStatusStop := make(chan struct{})
145-
go k.watchUpdateStatus(k.updateStatusCallback, watchUpdateStatusStop)
107+
go k.watchUpdateStatus(k.updateStatusCallback, stop)
146108

147109
// block until constants.AnnotationOkToReboot is set
148110
for {
@@ -155,9 +117,6 @@ func (k *Klocksmith) Run() error {
155117
glog.Warningf("error waiting for an ok-to-reboot: %v", err)
156118
}
157119

158-
// stop watching the update status by closing the channel
159-
close(watchUpdateStatusStop)
160-
161120
// set constants.AnnotationRebootInProgress and drain self
162121
anno = map[string]string{
163122
constants.AnnotationRebootInProgress: constants.True,
@@ -205,7 +164,54 @@ func (k *Klocksmith) Run() error {
205164
k.lc.Reboot(false)
206165

207166
// cross fingers
208-
time.Sleep(24 * 7 * time.Hour)
167+
sleepOrDone(24*7*time.Hour, stop)
168+
return nil
169+
}
170+
171+
// updateStatusCallback receives Status messages from update engine. If the
172+
// status is UpdateStatusUpdatedNeedReboot, indicate that with a label on our
173+
// node.
174+
func (k *Klocksmith) updateStatusCallback(s updateengine.Status) {
175+
glog.Info("Updating status")
176+
// update our status
177+
anno := map[string]string{
178+
constants.AnnotationStatus: s.CurrentOperation,
179+
constants.AnnotationLastCheckedTime: fmt.Sprintf("%d", s.LastCheckedTime),
180+
constants.AnnotationNewVersion: s.NewVersion,
181+
}
182+
183+
// indicate we need a reboot
184+
if s.CurrentOperation == updateengine.UpdateStatusUpdatedNeedReboot {
185+
glog.Info("Indicating a reboot is needed")
186+
anno[constants.AnnotationRebootNeeded] = constants.True
187+
}
188+
189+
wait.PollUntil(10*time.Second, func() (bool, error) {
190+
if err := k8sutil.SetNodeAnnotations(k.nc, k.node, anno); err != nil {
191+
glog.Errorf("Failed to set annotation %q: %v", constants.AnnotationStatus, err)
192+
return false, nil
193+
}
194+
195+
return true, nil
196+
}, wait.NeverStop)
197+
}
198+
199+
// setInfoLabels labels our node with helpful info about Container Linux.
200+
func (k *Klocksmith) setInfoLabels() error {
201+
vi, err := k8sutil.GetVersionInfo()
202+
if err != nil {
203+
return fmt.Errorf("failed to get version info: %v", err)
204+
}
205+
206+
labels := map[string]string{
207+
constants.LabelID: vi.ID,
208+
constants.LabelGroup: vi.Group,
209+
constants.LabelVersion: vi.Version,
210+
}
211+
212+
if err := k8sutil.SetNodeLabels(k.nc, k.node, labels); err != nil {
213+
return err
214+
}
209215

210216
return nil
211217
}
@@ -340,3 +346,17 @@ func (k *Klocksmith) getPodsForDeletion() ([]v1.Pod, error) {
340346

341347
return pods, nil
342348
}
349+
350+
// sleepOrDone pauses the current goroutine until the done channel receives
351+
// or until at least the duration d has elapsed, whichever comes first. This
352+
// is similar to time.Sleep(d), except it can be interrupted.
353+
func sleepOrDone(d time.Duration, done <-chan struct{}) {
354+
sleep := time.NewTimer(d)
355+
defer sleep.Stop()
356+
select {
357+
case <-sleep.C:
358+
return
359+
case <-done:
360+
return
361+
}
362+
}

0 commit comments

Comments
 (0)