Skip to content

Commit c40a539

Browse files
author
Justin Reagor
committed
Implement new signal based events
This PR implements the process outlined in the tracking issue. I've also included the ability for running jobs to create an environment variable including their PID (CONTAINERPILOT_<name>_PID) as well as both the supervisor and normal worker process signal handlers. So far, the functionality is working out great and you can fire jobs based on a container receiving a specific signal (or CP process itself). This is more of a scheduler based feature for CP, acts as a refill for missing CP2 functionality, and not something particularly useful for the wider Triton audience. * Parse signal events in a job's when configuration * Set the current PID for a job's running command in an env var * Publish signal events onto the event bus * Update test_envvars to contain our new PID var * Add test_sighup integration test * Generate stringer code for new Signal eventcode * Supervisor passes signal events through worker process Fixes: #513
1 parent 9c8c376 commit c40a539

20 files changed

Lines changed: 443 additions & 54 deletions

commands/commands.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import (
88
"fmt"
99
"os"
1010
"os/exec"
11+
"path/filepath"
12+
"regexp"
13+
"strconv"
14+
"strings"
1115
"sync"
1216
"syscall"
1317
"time"
@@ -51,6 +55,31 @@ func NewCommand(rawArgs interface{}, timeout time.Duration, fields log.Fields) (
5155
return cmd, nil
5256
}
5357

58+
// EnvName formats Name for use as an environment variable name (PID).
59+
func (c *Command) EnvName() string {
60+
if c.Name == "" {
61+
return c.Name
62+
}
63+
64+
var name string
65+
name = filepath.Base(c.Name)
66+
67+
// remove command extension if exec was used as name
68+
if strings.Contains(name, ".") {
69+
name = strings.Replace(name, filepath.Ext(name), "", 1)
70+
}
71+
72+
// convert all non-alphanums into an underscore
73+
matchSyms := regexp.MustCompile("[^[:alnum:]]+")
74+
name = matchSyms.ReplaceAllString(name, "_")
75+
76+
// compact multiple underscores into singles
77+
matchScores := regexp.MustCompile("__+")
78+
name = matchScores.ReplaceAllString(name, "_")
79+
80+
return strings.ToUpper(name)
81+
}
82+
5483
// Run creates an exec.Cmd for the Command and runs it asynchronously.
5584
// If the parent context is closed/canceled this will terminate the
5685
// child process and do any cleanup we need.
@@ -106,6 +135,11 @@ func (c *Command) Run(pctx context.Context, bus *events.EventBus) {
106135
// our logger fields
107136
if c.Cmd != nil && c.Cmd.Process != nil {
108137
pid := c.Cmd.Process.Pid
138+
139+
envName := fmt.Sprintf("CONTAINERPILOT_%s_PID", c.EnvName())
140+
os.Setenv(envName, strconv.Itoa(pid))
141+
defer os.Unsetenv(envName)
142+
109143
if len(c.fields) > 0 {
110144
c.fields["pid"] = pid
111145
c.logger = *log.WithFields(c.fields)

commands/commands_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,28 @@ func TestCommandPassthru(t *testing.T) {
8989
assert.NotEqual(t, cmd.Cmd.Stdout, os.Stdout)
9090
}
9191

92+
func TestEnvName(t *testing.T) {
93+
tests := []struct {
94+
name, input, output string
95+
}{
96+
{"mixed case", "testCase", "TESTCASE"},
97+
{"hyphen", "test-case", "TEST_CASE"},
98+
{"exec no ext", "/bin/to/testCase", "TESTCASE"},
99+
{"exec hyphen", "/bin/to/test-case", "TEST_CASE"},
100+
{"exec ext", "/bin/to/testCase.sh", "TESTCASE"},
101+
{"exec cwd", "./bin/to/testCase.sh", "TESTCASE"},
102+
{"exec hyphen", "/bin/to/test-Case.sh", "TEST_CASE"},
103+
{"exec multi hyphen", "/bin/to/test-Case--now.sh", "TEST_CASE_NOW"},
104+
}
105+
for _, test := range tests {
106+
t.Run(test.name, func(t *testing.T) {
107+
cmd, _ := NewCommand(test.input, time.Duration(0), nil)
108+
assert.Equal(t, test.input, cmd.Name)
109+
assert.Equal(t, test.output, cmd.EnvName())
110+
})
111+
}
112+
}
113+
92114
// test helpers
93115

94116
func runtestCommandRun(cmd *Command) map[events.Event]int {

core/app.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,13 @@ func (a *App) Terminate() {
171171
a.Bus.Shutdown()
172172
}
173173

174+
// SignalEvent publishes a signal event onto the event bus
175+
func (a *App) SignalEvent(sig string) {
176+
a.signalLock.Lock()
177+
defer a.signalLock.Unlock()
178+
a.Bus.PublishSignal(sig)
179+
}
180+
174181
// reload does the actual work of reloading the configuration and
175182
// updating the App with those changes. The EventBus should be
176183
// already shut down before we call this.

core/app_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,23 @@ the core and config packages are working together
2323
func TestJobConfigRequiredFields(t *testing.T) {
2424
// Missing `name`
2525
var testCfg = `{"consul": "consul:8500", jobs: [
26-
{"name": "", "port": 8080, health: {interval: 30, "ttl": 19 }}]}`
26+
{"name": "", "port": 8080, health: {interval: 30, "ttl": 19 }}]}`
2727
f1 := testCfgToTempFile(t, testCfg)
2828
defer os.Remove(f1.Name())
2929
_, err := NewApp(f1.Name())
3030
assert.Error(t, err, "unable to parse jobs: 'name' must not be blank")
3131

3232
// Missing `interval`
3333
testCfg = `{"consul": "consul:8500", jobs: [
34-
{"name": "name", "port": 8080, health: {ttl: 19}}]}`
34+
{"name": "name", "port": 8080, health: {ttl: 19}}]}`
3535
f2 := testCfgToTempFile(t, testCfg)
3636
defer os.Remove(f2.Name())
3737
_, err = NewApp(f2.Name())
3838
assert.Error(t, err, "unable to parse jobs: job[name].health.interval must be > 0")
3939

4040
// Missing `ttl`
4141
testCfg = `{"consul": "consul:8500", jobs: [
42-
{"name": "name", "port": 8080, health: {interval: 19}}]}`
42+
{"name": "name", "port": 8080, health: {interval: 19}}]}`
4343
f3 := testCfgToTempFile(t, testCfg)
4444
defer os.Remove(f3.Name())
4545
_, err = NewApp(f3.Name())
@@ -65,11 +65,11 @@ func TestWatchConfigRequiredFields(t *testing.T) {
6565
func TestMetricServiceCreation(t *testing.T) {
6666

6767
f := testCfgToTempFile(t, `{
68-
"consul": "consul:8500",
69-
"telemetry": {
70-
"interfaces": ["inet", "lo0"],
71-
"port": 9090
72-
}
68+
"consul": "consul:8500",
69+
"telemetry": {
70+
"interfaces": ["inet", "lo0"],
71+
"port": 9090
72+
}
7373
}`)
7474
defer os.Remove(f.Name())
7575
app, err := NewApp(f.Name())

core/signals.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@ import (
99
// HandleSignals listens for and captures signals used for orchestration
1010
func (a *App) handleSignals() {
1111
recvSig := make(chan os.Signal, 1)
12-
signal.Notify(recvSig, syscall.SIGTERM, syscall.SIGINT)
12+
signal.Notify(recvSig,
13+
syscall.SIGTERM,
14+
syscall.SIGINT,
15+
syscall.SIGHUP,
16+
syscall.SIGUSR2,
17+
)
1318
go func() {
1419
for {
1520
sig := <-recvSig
@@ -20,8 +25,23 @@ func (a *App) handleSignals() {
2025
case syscall.SIGTERM:
2126
a.Terminate()
2227
return
28+
case syscall.SIGHUP, syscall.SIGUSR2:
29+
if s := toString(sig); s != "" {
30+
a.SignalEvent(s)
31+
}
2332
default:
2433
}
2534
}
2635
}()
2736
}
37+
38+
func toString(sig os.Signal) string {
39+
switch sig {
40+
case syscall.SIGHUP:
41+
return "SIGHUP"
42+
case syscall.SIGUSR2:
43+
return "SIGUSR2"
44+
default:
45+
return ""
46+
}
47+
}

core/signals_test.go

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@ import (
1212
"github.com/joyent/containerpilot/events"
1313
"github.com/joyent/containerpilot/jobs"
1414
"github.com/joyent/containerpilot/tests/mocks"
15+
"github.com/stretchr/testify/assert"
1516
)
1617

1718
// ------------------------------------------
1819
// Test setup
1920

20-
func getSignalTestConfig(t *testing.T) *App {
21+
func getSignalTestConfig() *App {
2122
cfg := &jobs.Config{
2223
Name: "test-service",
2324
Port: 1,
@@ -37,12 +38,32 @@ func getSignalTestConfig(t *testing.T) *App {
3738
return app
3839
}
3940

41+
func getSignalEventTestConfig(signals []string) *App {
42+
appJobs := make([]*jobs.Job, len(signals))
43+
for n, sig := range signals {
44+
cfg := &jobs.Config{
45+
Name: "test-" + sig,
46+
Port: 1,
47+
Interfaces: []string{"inet"},
48+
Exec: []string{"./testdata/test.sh", "interruptSleep"},
49+
When: &jobs.WhenConfig{Source: sig},
50+
}
51+
cfg.Validate(&mocks.NoopDiscoveryBackend{})
52+
appJobs[n] = jobs.NewJob(cfg)
53+
}
54+
app := EmptyApp()
55+
app.StopTimeout = 1
56+
app.Jobs = appJobs
57+
app.Bus = events.NewEventBus()
58+
return app
59+
}
60+
4061
// Test handler for SIGTERM. Note that the SIGCHLD handler is fired
4162
// by this same test, but that we don't have a separate unit test
4263
// because they'll interfere with each other's state.
4364
func TestTerminateSignal(t *testing.T) {
4465
stopCh := make(chan bool)
45-
app := getSignalTestConfig(t)
66+
app := getSignalTestConfig()
4667
bus := app.Bus
4768
ctx, cancel := context.WithCancel(context.Background())
4869
for _, job := range app.Jobs {
@@ -69,6 +90,46 @@ func TestTerminateSignal(t *testing.T) {
6990
}
7091
}
7192

93+
// Test handler for handling signal events SIGHUP (and SIGUSR2). Note that the
94+
// SIGUSR1 is currently setup to handle reloading ContainerPilot's log file.
95+
func TestSignalEvent(t *testing.T) {
96+
stopCh := make(chan bool)
97+
signals := []string{"SIGHUP", "SIGUSR2"}
98+
app := getSignalEventTestConfig(signals)
99+
bus := app.Bus
100+
ctx, cancel := context.WithCancel(context.Background())
101+
for _, job := range app.Jobs {
102+
job.Subscribe(bus)
103+
job.Register(bus)
104+
}
105+
for _, job := range app.Jobs {
106+
job.Run(ctx, stopCh)
107+
}
108+
for _, sig := range signals {
109+
app.SignalEvent(sig)
110+
}
111+
112+
cancel()
113+
bus.Wait()
114+
results := bus.DebugEvents()
115+
116+
got := map[events.Event]int{}
117+
for _, result := range results {
118+
got[result]++
119+
}
120+
121+
if !reflect.DeepEqual(got, map[events.Event]int{
122+
{Code: events.Signal, Source: "SIGHUP"}: 1,
123+
{Code: events.Signal, Source: "SIGUSR2"}: 1,
124+
{Code: events.Stopped, Source: "test-SIGHUP"}: 1,
125+
{Code: events.Stopping, Source: "test-SIGHUP"}: 1,
126+
{Code: events.Stopped, Source: "test-SIGUSR2"}: 1,
127+
{Code: events.Stopping, Source: "test-SIGUSR2"}: 1,
128+
}) {
129+
t.Fatalf("expected shutdown but got:\n%v", results)
130+
}
131+
}
132+
72133
// Test that only ensures that we cover a straight-line run through
73134
// the handleSignals setup code
74135
func TestSignalWiring(t *testing.T) {
@@ -96,3 +157,20 @@ func sendAndWaitForSignal(t *testing.T, s os.Signal) {
96157
t.Fatalf("timeout waiting for %v\n", s)
97158
}
98159
}
160+
161+
func TestToString(t *testing.T) {
162+
tests := []struct {
163+
name string
164+
input os.Signal
165+
output string
166+
}{
167+
{"SIGHUP", syscall.SIGHUP, "SIGHUP"},
168+
{"SIGUSR2", syscall.SIGUSR2, "SIGUSR2"},
169+
{"SIGTERM", syscall.SIGTERM, ""},
170+
}
171+
for _, test := range tests {
172+
t.Run(test.name, func(t *testing.T) {
173+
assert.Equal(t, toString(test.input), test.output)
174+
})
175+
}
176+
}

events/bus.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ func (bus *EventBus) Publish(event Event) {
135135
bus.enqueue(event)
136136
}
137137

138+
// PublishSignal publishes a signal event through the EventBus to any Jobs that
139+
// are subscribed to trigger on them.
140+
func (bus *EventBus) PublishSignal(sig string) {
141+
bus.Publish(Event{Code: Signal, Source: sig})
142+
}
143+
138144
// SetReloadFlag sets the flag that Wait will use to signal to the main
139145
// App that we want to restart rather than be shut down
140146
func (bus *EventBus) SetReloadFlag() {

events/eventcode_string.go

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

events/events.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const (
3535
Metric
3636
Startup // fired once after events are set up and event loop is started
3737
Shutdown // fired once after all jobs exit or on receiving SIGTERM
38+
Signal // fired when a UNIX signal hits a CP process/supervisor
3839
)
3940

4041
// global events
@@ -78,6 +79,8 @@ func FromString(codeName string) (EventCode, error) {
7879
return Startup, nil
7980
case "shutdown":
8081
return Shutdown, nil
82+
case "SIGHUP", "SIGUSR2":
83+
return Signal, nil
8184
}
8285
return None, fmt.Errorf("%s is not a valid event code", codeName)
8386
}

0 commit comments

Comments
 (0)