Skip to content

Commit 4cc285b

Browse files
author
Justin Reagor
committed
Clean-up a few things around global/job shutdown
This commit makes a few small tweaks around how shutdown occurs. I've added a shared lock when setting a job as completed and also swapped the complete channel to send a struct instead of a boolean (not much of a change).
1 parent 87bcd66 commit 4cc285b

4 files changed

Lines changed: 17 additions & 14 deletions

File tree

core/app.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (a *App) Run() {
118118
// signal handler above, and reload endpoint, only need to fire a
119119
// GlobalShutdown across the event bus. Context handles everything after
120120
// that process.
121-
completedCh := make(chan bool)
121+
completedCh := make(chan struct{}, 1)
122122
go func() {
123123
for {
124124
select {
@@ -159,7 +159,6 @@ func (a *App) Run() {
159159
log.Error(err)
160160
break
161161
}
162-
cancel()
163162
close(completedCh)
164163
}
165164
}
@@ -198,7 +197,7 @@ func (a *App) reload() error {
198197

199198
// HandlePolling sets up polling functions and write their quit channels
200199
// back to our config
201-
func (a *App) runTasks(ctx context.Context, completedCh chan bool) {
200+
func (a *App) runTasks(ctx context.Context, completedCh chan struct{}) {
202201
// we need to subscribe to events before we Run all the jobs
203202
// to avoid races where a job finishes and fires events before
204203
// other jobs are even subscribed to listen for them.

core/signals_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func getSignalEventTestConfig(signals []string) *App {
6262
// by this same test, but that we don't have a separate unit test
6363
// because they'll interfere with each other's state.
6464
func TestTerminateSignal(t *testing.T) {
65-
stopCh := make(chan bool)
65+
stopCh := make(chan struct{}, 1)
6666
app := getSignalTestConfig()
6767
bus := app.Bus
6868
ctx, cancel := context.WithCancel(context.Background())
@@ -93,7 +93,7 @@ func TestTerminateSignal(t *testing.T) {
9393
// Test handler for handling signal events SIGHUP (and SIGUSR2). Note that the
9494
// SIGUSR1 is currently setup to handle reloading ContainerPilot's log file.
9595
func TestSignalEvent(t *testing.T) {
96-
stopCh := make(chan bool)
96+
stopCh := make(chan struct{}, 1)
9797
signals := []string{"SIGHUP", "SIGUSR2"}
9898
app := getSignalEventTestConfig(signals)
9999
bus := app.Bus

jobs/jobs.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ type Job struct {
5252
frequency time.Duration
5353

5454
// completed
55-
IsComplete bool
55+
IsComplete bool
56+
completeLock *sync.RWMutex
5657

5758
events.Subscriber
5859
events.Publisher
@@ -76,6 +77,7 @@ func NewJob(cfg *Config) *Job {
7677
frequency: cfg.freqInterval,
7778
}
7879
job.statusLock = &sync.RWMutex{}
80+
job.completeLock = &sync.RWMutex{}
7981
job.Rx = make(chan events.Event, eventBufferSize)
8082
if job.Name == "containerpilot" {
8183
// right now this hardcodes the telemetry service to
@@ -119,6 +121,8 @@ func (job *Job) setStatus(status JobStatus) {
119121
}
120122

121123
func (job *Job) setComplete() {
124+
job.completeLock.Lock()
125+
defer job.completeLock.Unlock()
122126
job.IsComplete = true
123127
}
124128

@@ -130,7 +134,7 @@ func (job *Job) Kill() {
130134
}
131135

132136
// Run executes the event loop for the Job
133-
func (job *Job) Run(pctx context.Context, completedCh chan bool) {
137+
func (job *Job) Run(pctx context.Context, completedCh chan struct{}) {
134138
ctx, cancel := context.WithCancel(pctx)
135139

136140
if job.frequency > 0 {
@@ -152,7 +156,7 @@ func (job *Job) Run(pctx context.Context, completedCh chan bool) {
152156
go func() {
153157
defer func() {
154158
job.cleanup(ctx, cancel)
155-
completedCh <- true
159+
completedCh <- struct{}{}
156160
}()
157161
for {
158162
select {

jobs/jobs_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
func TestJobRunSafeClose(t *testing.T) {
1616
bus := events.NewEventBus()
17-
stopCh := make(chan bool)
17+
stopCh := make(chan struct{}, 1)
1818
cfg := &Config{
1919
Name: "myjob",
2020
Exec: "sleep 10",
@@ -50,7 +50,7 @@ func TestJobRunSafeClose(t *testing.T) {
5050
// A Job should timeout if not started before the startupTimeout
5151
func TestJobRunStartupTimeout(t *testing.T) {
5252
bus := events.NewEventBus()
53-
stopCh := make(chan bool)
53+
stopCh := make(chan struct{}, 1)
5454
cfg := &Config{Name: "myjob", Exec: "true",
5555
When: &WhenConfig{Source: "never", Once: "startup", Timeout: "100ms"}}
5656
cfg.Validate(noop)
@@ -88,7 +88,7 @@ func TestJobRunStartupTimeout(t *testing.T) {
8888
// A Job should not timeout if started before the startupTimeout
8989
func TestJobRunStartupNoTimeout(t *testing.T) {
9090
bus := events.NewEventBus()
91-
stopCh := make(chan bool)
91+
stopCh := make(chan struct{}, 1)
9292
cfg := &Config{Name: "myjob", Exec: "sleep 5",
9393
When: &WhenConfig{Timeout: "500ms"}}
9494
cfg.Validate(noop)
@@ -127,7 +127,7 @@ func TestJobRunStartupNoTimeout(t *testing.T) {
127127
func TestJobRunRestarts(t *testing.T) {
128128
runRestartsTest := func(restarts interface{}, expected int) {
129129
bus := events.NewEventBus()
130-
stopCh := make(chan bool)
130+
stopCh := make(chan struct{}, 1)
131131
cfg := &Config{
132132
Name: "myjob",
133133
whenEvent: events.GlobalStartup,
@@ -165,7 +165,7 @@ func TestJobRunRestarts(t *testing.T) {
165165

166166
func TestJobRunPeriodic(t *testing.T) {
167167
bus := events.NewEventBus()
168-
stopCh := make(chan bool)
168+
stopCh := make(chan struct{}, 1)
169169
cfg := &Config{
170170
Name: "myjob",
171171
Exec: []string{"./testdata/test.sh", "doStuff", "runPeriodicTest"},
@@ -208,7 +208,7 @@ func TestJobRunPeriodic(t *testing.T) {
208208
func TestJobMaintenance(t *testing.T) {
209209
testFunc := func(t *testing.T, startingState JobStatus, event events.Event) JobStatus {
210210
bus := events.NewEventBus()
211-
stopCh := make(chan bool)
211+
stopCh := make(chan struct{}, 1)
212212
cfg := &Config{
213213
Name: "myjob",
214214
Exec: "true",

0 commit comments

Comments
 (0)