Skip to content

Commit cab07db

Browse files
committed
Add --with-heartbeat flag for heartbeat logging during long-running errands
Errands produce no CLI output while the Agent executes the script. This silence causes CI/CD systems to kill the process due to inactivity timeouts and leaves operators unsure whether the task is still running. Add an opt-in --with-heartbeat flag to `bosh run-errand` that prints periodic heartbeat status lines while a task is processing or queued. bosh run-errand smoke_tests --with-heartbeat bosh run-errand smoke_tests --with-heartbeat=10 Output: Task 185528 | 16:16:23 | Task state: processing (5s elapsed) No Director changes required — uses existing task API fields (state, started_at). The HeartbeatReporter interface extends TaskReporter via safe type assertion so no existing code is affected. Throttling is handled in the reporter via timestamp comparison. Made-with: Cursor
1 parent c5619f3 commit cab07db

File tree

9 files changed

+230
-6
lines changed

9 files changed

+230
-6
lines changed

cmd/cmd.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,16 @@ func (c Cmd) Execute() (cmdErr error) {
224224
return NewErrandsCmd(deps.UI, c.deployment()).Run()
225225

226226
case *RunErrandOpts:
227-
director, deployment := c.directorAndDeployment()
227+
sess := c.sessionImpl()
228+
director, err := sess.Director()
229+
c.panicIfErr(err)
230+
deployment, err := sess.Deployment()
231+
c.panicIfErr(err)
232+
233+
if opts.WithHeartbeat != nil && sess.taskReporter != nil {
234+
sess.taskReporter.EnableHeartbeat(time.Duration(*opts.WithHeartbeat) * time.Second)
235+
}
236+
228237
downloader := NewUIDownloader(director, deps.Time, deps.FS, deps.UI)
229238
return NewRunErrandCmd(deployment, downloader, deps.UI).Run(*opts)
230239

@@ -546,7 +555,14 @@ func (c Cmd) config() cmdconf.Config {
546555
}
547556

548557
func (c Cmd) session() Session {
549-
return NewSessionFromOpts(c.BoshOpts, c.config(), c.deps.UI, true, true, c.deps.FS, c.deps.Logger)
558+
return c.sessionImpl()
559+
}
560+
561+
func (c Cmd) sessionImpl() *SessionImpl {
562+
return NewSessionImpl(
563+
NewSessionContextImpl(c.BoshOpts, c.config(), c.deps.FS),
564+
c.deps.UI, true, true, c.deps.Logger,
565+
)
550566
}
551567

552568
func (c Cmd) director() boshdir.Director {

cmd/opts/opts.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,8 @@ type RunErrandOpts struct {
713713
KeepAlive bool `long:"keep-alive" description:"Use existing VM to run an errand and keep it after completion"`
714714
WhenChanged bool `long:"when-changed" description:"Run errand only if errand configuration has changed or if the previous run was unsuccessful"`
715715

716+
WithHeartbeat *int `long:"with-heartbeat" description:"Print task state every N seconds while waiting. Use '=' to specify interval" optional:"true" optional-value:"30"`
717+
716718
DownloadLogs bool `long:"download-logs" description:"Download logs"`
717719
LogsDirectory DirOrCWDArg `long:"logs-dir" description:"Destination directory for logs" default:"."`
718720

cmd/opts/opts_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2150,6 +2150,14 @@ var _ = Describe("Opts", func() {
21502150
})
21512151
})
21522152

2153+
Describe("WithHeartbeat", func() {
2154+
It("contains desired values", func() {
2155+
Expect(getStructTagForName("WithHeartbeat", opts)).To(Equal(
2156+
`long:"with-heartbeat" description:"Print task state every N seconds while waiting. Use '=' to specify interval" optional:"true" optional-value:"30"`,
2157+
))
2158+
})
2159+
})
2160+
21532161
Describe("DownloadLogs", func() {
21542162
It("contains desired values", func() {
21552163
Expect(getStructTagForName("DownloadLogs", opts)).To(Equal(

cmd/session.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type SessionImpl struct {
2222

2323
// Memoized
2424
director boshdir.Director
25+
taskReporter *boshuit.ReporterImpl
2526
directorInfo boshdir.Info
2627
directorInfoSet bool
2728
}
@@ -118,10 +119,10 @@ func (c *SessionImpl) Director() (boshdir.Director, error) {
118119
c.ui.PrintLinef("Using environment '%s' as %s", c.Environment(), creds.Description())
119120
}
120121

121-
taskReporter := boshuit.NewReporter(c.ui, true)
122+
c.taskReporter = boshuit.NewReporter(c.ui, true)
122123
fileReporter := boshui.NewFileReporter(c.ui)
123124

124-
director, err := boshdir.NewFactory(c.logger).New(dirConfig, taskReporter, fileReporter)
125+
director, err := boshdir.NewFactory(c.logger).New(dirConfig, c.taskReporter, fileReporter)
125126
if err != nil {
126127
return nil, err
127128
}

director/interfaces.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,13 @@ type TaskReporter interface {
306306
TaskOutputChunk(int, []byte)
307307
}
308308

309+
// HeartbeatReporter is an optional extension of TaskReporter. Implementations
310+
// that also satisfy this interface will receive periodic heartbeat calls while
311+
// a task is being polled, keeping CI/CD systems alive and informing users.
312+
type HeartbeatReporter interface {
313+
TaskHeartbeat(id int, state string, startedAt int64)
314+
}
315+
309316
//counterfeiter:generate . OrphanDisk
310317

311318
type OrphanDisk interface {

director/task_client_request.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ func NewTaskClientRequest(
2727
}
2828

2929
type taskShortResp struct {
30-
ID int // 165
31-
State string // e.g. "queued", "processing", "done", "error", "cancelled"
30+
ID int `json:"id"`
31+
State string `json:"state"` // e.g. "queued", "processing", "done", "error", "cancelled"
32+
StartedAt int64 `json:"started_at"` // 1440318199
3233
}
3334

3435
func (r taskShortResp) IsRunning() bool {
@@ -95,6 +96,8 @@ func (r TaskClientRequest) WaitForCompletion(id int, type_ string, taskReporter
9596
taskReporter.TaskFinished(id, taskResp.State)
9697
}()
9798

99+
heartbeatReporter, _ := taskReporter.(HeartbeatReporter)
100+
98101
taskPath := fmt.Sprintf("/tasks/%d", id)
99102

100103
for {
@@ -111,6 +114,9 @@ func (r TaskClientRequest) WaitForCompletion(id int, type_ string, taskReporter
111114
}
112115

113116
if taskResp.IsRunning() {
117+
if heartbeatReporter != nil {
118+
heartbeatReporter.TaskHeartbeat(taskResp.ID, taskResp.State, taskResp.StartedAt)
119+
}
114120
time.Sleep(r.taskCheckStepDuration)
115121
continue
116122
}

director/task_client_request_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,21 @@ import (
1515
fakedir "github.com/cloudfoundry/bosh-cli/v7/director/directorfakes"
1616
)
1717

18+
type heartbeatCall struct {
19+
id int
20+
state string
21+
startedAt int64
22+
}
23+
24+
type fakeHeartbeatReporter struct {
25+
fakedir.FakeTaskReporter
26+
heartbeats []heartbeatCall
27+
}
28+
29+
func (f *fakeHeartbeatReporter) TaskHeartbeat(id int, state string, startedAt int64) {
30+
f.heartbeats = append(f.heartbeats, heartbeatCall{id, state, startedAt})
31+
}
32+
1833
var _ = Describe("TaskClientRequest", func() {
1934
var (
2035
server *ghttp.Server
@@ -246,6 +261,88 @@ var _ = Describe("TaskClientRequest", func() {
246261
})
247262
})
248263

264+
Describe("WaitForCompletion heartbeat", func() {
265+
It("emits a heartbeat for a processing task when the reporter implements HeartbeatReporter", func() {
266+
hbReporter := &fakeHeartbeatReporter{}
267+
hbReq := buildReq(hbReporter)
268+
269+
server.AppendHandlers(
270+
ghttp.CombineHandlers(
271+
ghttp.VerifyRequest("GET", "/tasks/42"),
272+
ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"processing","description":"run errand 'smoke'","started_at":1700000000}`),
273+
),
274+
ghttp.CombineHandlers(
275+
ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"),
276+
ghttp.RespondWith(http.StatusOK, ""),
277+
),
278+
ghttp.CombineHandlers(
279+
ghttp.VerifyRequest("GET", "/tasks/42"),
280+
ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"done"}`),
281+
),
282+
ghttp.CombineHandlers(
283+
ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"),
284+
ghttp.RespondWith(http.StatusOK, ""),
285+
),
286+
)
287+
288+
err := hbReq.WaitForCompletion(42, "event", hbReporter)
289+
Expect(err).ToNot(HaveOccurred())
290+
291+
Expect(len(hbReporter.heartbeats)).To(BeNumerically(">=", 1))
292+
hb := hbReporter.heartbeats[0]
293+
Expect(hb.id).To(Equal(42))
294+
Expect(hb.state).To(Equal("processing"))
295+
Expect(hb.startedAt).To(Equal(int64(1700000000)))
296+
})
297+
298+
It("does not emit heartbeats when the reporter does not implement HeartbeatReporter", func() {
299+
plainReporter := &fakedir.FakeTaskReporter{}
300+
301+
server.AppendHandlers(
302+
ghttp.CombineHandlers(
303+
ghttp.VerifyRequest("GET", "/tasks/42"),
304+
ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"processing","description":"run errand"}`),
305+
),
306+
ghttp.CombineHandlers(
307+
ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"),
308+
ghttp.RespondWith(http.StatusOK, ""),
309+
),
310+
ghttp.CombineHandlers(
311+
ghttp.VerifyRequest("GET", "/tasks/42"),
312+
ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"done"}`),
313+
),
314+
ghttp.CombineHandlers(
315+
ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"),
316+
ghttp.RespondWith(http.StatusOK, ""),
317+
),
318+
)
319+
320+
err := req.WaitForCompletion(42, "event", plainReporter)
321+
Expect(err).ToNot(HaveOccurred())
322+
// No panic, no heartbeat — plain reporter doesn't implement HeartbeatReporter
323+
})
324+
325+
It("does not emit heartbeats for tasks that immediately finish", func() {
326+
hbReporter := &fakeHeartbeatReporter{}
327+
hbReq := buildReq(hbReporter)
328+
329+
server.AppendHandlers(
330+
ghttp.CombineHandlers(
331+
ghttp.VerifyRequest("GET", "/tasks/42"),
332+
ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"done"}`),
333+
),
334+
ghttp.CombineHandlers(
335+
ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"),
336+
ghttp.RespondWith(http.StatusOK, ""),
337+
),
338+
)
339+
340+
err := hbReq.WaitForCompletion(42, "event", hbReporter)
341+
Expect(err).ToNot(HaveOccurred())
342+
Expect(hbReporter.heartbeats).To(BeEmpty())
343+
})
344+
})
345+
249346
Describe("WaitForCompletion", func() {
250347
var (
251348
taskReporter *fakedir.FakeTaskReporter

ui/task/reporter.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,22 @@ import (
55
"fmt"
66
"strings"
77
"sync"
8+
"time"
89

910
"golang.org/x/text/cases"
1011
"golang.org/x/text/language"
1112

1213
boshui "github.com/cloudfoundry/bosh-cli/v7/ui"
14+
boshuifmt "github.com/cloudfoundry/bosh-cli/v7/ui/fmt"
1315
)
1416

1517
type ReporterImpl struct {
1618
ui boshui.UI
1719
isForEvents bool
1820

21+
heartbeatInterval time.Duration
22+
lastHeartbeat time.Time
23+
1924
events map[int][]*Event
2025
eventMarkers []eventMarker
2126
lastGlobalEvent *Event
@@ -45,6 +50,12 @@ func NewReporter(ui boshui.UI, isForEvents bool) *ReporterImpl {
4550
}
4651
}
4752

53+
func (r *ReporterImpl) EnableHeartbeat(interval time.Duration) {
54+
r.Lock()
55+
defer r.Unlock()
56+
r.heartbeatInterval = interval
57+
}
58+
4859
func (r *ReporterImpl) TaskStarted(id int) {
4960
r.Lock()
5061
defer r.Unlock()
@@ -111,6 +122,30 @@ func (r *ReporterImpl) TaskOutputChunk(id int, chunk []byte) {
111122
r.eventMarkers = append(r.eventMarkers, eventMarker{TaskID: id, Type: taskOutput})
112123
}
113124

125+
func (r *ReporterImpl) TaskHeartbeat(id int, state string, startedAt int64) {
126+
r.Lock()
127+
defer r.Unlock()
128+
129+
if r.heartbeatInterval <= 0 {
130+
return
131+
}
132+
133+
now := time.Now()
134+
if !r.lastHeartbeat.IsZero() && now.Sub(r.lastHeartbeat) < r.heartbeatInterval {
135+
return
136+
}
137+
r.lastHeartbeat = now
138+
139+
msg := "Task state: " + state
140+
if state != "queued" && startedAt > 0 {
141+
elapsed := time.Since(time.Unix(startedAt, 0)).Truncate(time.Second)
142+
msg += fmt.Sprintf(" (%s elapsed)", elapsed)
143+
}
144+
145+
r.printBlock(fmt.Sprintf("\nTask %d | %s | ", id, now.UTC().Format(boshuifmt.TimeHoursFmt)))
146+
r.printBlock(msg)
147+
}
148+
114149
func (r *ReporterImpl) showEvent(id int, str string) {
115150
event := Event{TaskID: id}
116151

ui/task/reporter_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package task_test
22

33
import (
44
"bytes"
5+
"fmt"
6+
"strings"
7+
"time"
58

69
boshlog "github.com/cloudfoundry/bosh-utils/logger"
710
. "github.com/onsi/ginkgo/v2"
@@ -335,4 +338,53 @@ Task 101 done
335338
`))
336339
})
337340
})
341+
342+
Describe("TaskHeartbeat", func() {
343+
It("does not print when heartbeat is not enabled", func() {
344+
impl := boshuit.NewReporter(fakeUI, true)
345+
impl.TaskHeartbeat(42, "processing", int64(1700000000))
346+
Expect(fakeUI.Blocks).To(BeNil())
347+
})
348+
349+
It("prints state for a queued task when heartbeat is enabled", func() {
350+
impl := boshuit.NewReporter(fakeUI, true)
351+
impl.EnableHeartbeat(10 * time.Second)
352+
impl.TaskHeartbeat(42, "queued", int64(0))
353+
Expect(len(fakeUI.Blocks)).To(BeNumerically(">=", 1))
354+
combined := fmt.Sprintf("%v", fakeUI.Blocks)
355+
Expect(combined).To(ContainSubstring("Task state: queued"))
356+
Expect(combined).NotTo(ContainSubstring("elapsed"))
357+
})
358+
359+
It("prints state with elapsed time for a processing task with startedAt", func() {
360+
impl := boshuit.NewReporter(fakeUI, true)
361+
impl.EnableHeartbeat(10 * time.Second)
362+
impl.TaskHeartbeat(42, "processing", int64(1700000000))
363+
Expect(len(fakeUI.Blocks)).To(BeNumerically(">=", 1))
364+
combined := fmt.Sprintf("%v", fakeUI.Blocks)
365+
Expect(combined).To(ContainSubstring("Task state: processing"))
366+
Expect(combined).To(ContainSubstring("elapsed"))
367+
})
368+
369+
It("prints state without elapsed time when processing but startedAt is 0", func() {
370+
impl := boshuit.NewReporter(fakeUI, true)
371+
impl.EnableHeartbeat(10 * time.Second)
372+
impl.TaskHeartbeat(42, "processing", int64(0))
373+
Expect(len(fakeUI.Blocks)).To(BeNumerically(">=", 1))
374+
combined := fmt.Sprintf("%v", fakeUI.Blocks)
375+
Expect(combined).To(ContainSubstring("Task state: processing"))
376+
Expect(combined).NotTo(ContainSubstring("elapsed"))
377+
Expect(combined).NotTo(ContainSubstring("queued"))
378+
})
379+
380+
It("throttles heartbeat output based on the configured interval", func() {
381+
impl := boshuit.NewReporter(fakeUI, true)
382+
impl.EnableHeartbeat(1 * time.Hour)
383+
impl.TaskHeartbeat(42, "processing", int64(1700000000))
384+
impl.TaskHeartbeat(42, "processing", int64(1700000000))
385+
impl.TaskHeartbeat(42, "processing", int64(1700000000))
386+
combined := fmt.Sprintf("%v", fakeUI.Blocks)
387+
Expect(strings.Count(combined, "Task state")).To(Equal(1))
388+
})
389+
})
338390
})

0 commit comments

Comments
 (0)