Skip to content

Commit e24a1e5

Browse files
authored
Add new env var for sqsMsgVisibilityTimeoutSec (#1220)
1 parent 1786a02 commit e24a1e5

7 files changed

Lines changed: 37 additions & 2 deletions

File tree

cmd/node-termination-handler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ func main() {
236236
ASG: autoscaling.New(sess),
237237
EC2: ec2Client,
238238
BeforeCompleteLifecycleAction: func() { <-time.After(completeLifecycleActionDelay) },
239+
SqsMsgVisibilityTimeoutSec: nthConfig.SqsMsgVisibilityTimeoutSec,
239240
}
240241
monitoringFns[sqsEvents] = sqsMonitor
241242
}

config/helm/aws-node-termination-handler/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ The configuration in this table applies to AWS Node Termination Handler in queue
123123
| `topologySpreadConstraints` | [Topology Spread Constraints](https://kubernetes.io/docs/concepts/scheduling-eviction/topology-spread-constraints/) for pod scheduling. Useful with a highly available deployment to reduce the risk of running multiple replicas on the same Node | `[]` |
124124
| `heartbeatInterval` | The time period in seconds between consecutive heartbeat signals. Valid range: 30-3600 seconds (30 seconds to 1 hour). | `-1` |
125125
| `heartbeatUntil` | The duration in seconds over which heartbeat signals are sent. Valid range: 60-172800 seconds (1 minute to 48 hours). | `-1` |
126+
| `sqsMsgVisibilityTimeoutSec` | Duration in seconds that a message is hidden from other consumers after being retrieved from the SQS queue by sqs-monitor. Valid range: 1-119 seconds. | `20` |
126127

127128
### IMDS Mode Configuration
128129

config/helm/aws-node-termination-handler/templates/deployment.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ spec:
174174
value: {{ .Values.heartbeatInterval | quote }}
175175
- name: HEARTBEAT_UNTIL
176176
value: {{ .Values.heartbeatUntil | quote }}
177+
- name: SQS_MSG_VISIBILITY_TIMEOUT_SEC
178+
value: {{ .Values.sqsMsgVisibilityTimeoutSec | quote }}
177179
{{- with .Values.extraEnv }}
178180
{{- toYaml . | nindent 12 }}
179181
{{- end }}

config/helm/aws-node-termination-handler/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,10 @@ heartbeatInterval: -1
294294
# The duration in seconds over which heartbeat signals are sent. Valid range: 60-172800 seconds (1 minute to 48 hours).
295295
heartbeatUntil: -1
296296

297+
# Duration in seconds that a message is hidden from other consumers after being retrieved from the SQS queue by sqs-monitor. Valid range: 1-119 seconds.
298+
sqsMsgVisibilityTimeoutSec: 20
299+
300+
297301
# ---------------------------------------------------------------------------------------------------------------------
298302
# Testing
299303
# ---------------------------------------------------------------------------------------------------------------------

pkg/config/config.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ const (
117117
// heartbeat
118118
heartbeatIntervalKey = "HEARTBEAT_INTERVAL"
119119
heartbeatUntilKey = "HEARTBEAT_UNTIL"
120+
// sqs monitor
121+
sqsMsgVisibilityTimeoutSecConfigKey = "SQS_MSG_VISIBILITY_TIMEOUT_SEC"
122+
SqsMsgVisibilityTimeoutSecDefault = 20
120123
)
121124

122125
// Config arguments set via CLI, environment variables, or defaults
@@ -174,6 +177,7 @@ type Config struct {
174177
UseAPIServerCacheToListPods bool
175178
HeartbeatInterval int
176179
HeartbeatUntil int
180+
SqsMsgVisibilityTimeoutSec int
177181
}
178182

179183
// ParseCliArgs parses cli arguments and uses environment variables as fallback values
@@ -241,6 +245,7 @@ func ParseCliArgs() (config Config, err error) {
241245
flag.BoolVar(&config.UseAPIServerCacheToListPods, "use-apiserver-cache", getBoolEnv(useAPIServerCache, false), "If true, leverage the k8s apiserver's index on pod's spec.nodeName to list pods on a node, instead of doing an etcd quorum read.")
242246
flag.IntVar(&config.HeartbeatInterval, "heartbeat-interval", getIntEnv(heartbeatIntervalKey, -1), "The time period in seconds between consecutive heartbeat signals. Valid range: 30-3600 seconds (30 seconds to 1 hour).")
243247
flag.IntVar(&config.HeartbeatUntil, "heartbeat-until", getIntEnv(heartbeatUntilKey, -1), "The duration in seconds over which heartbeat signals are sent. Valid range: 60-172800 seconds (1 minute to 48 hours).")
248+
flag.IntVar(&config.SqsMsgVisibilityTimeoutSec, "sqs-msg-visibility-timeout-sec", getIntEnv(sqsMsgVisibilityTimeoutSecConfigKey, SqsMsgVisibilityTimeoutSecDefault), "Duration in seconds that a message is hidden from other consumers after being retrieved from the SQS queue by sqs-monitor. Valid range: 1-119 seconds.")
244249
flag.Parse()
245250

246251
if isConfigProvided("pod-termination-grace-period", podTerminationGracePeriodConfigKey) && isConfigProvided("grace-period", gracePeriodConfigKey) {
@@ -306,6 +311,10 @@ func ParseCliArgs() (config Config, err error) {
306311
return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval should be less than or equal to heartbeat-until")
307312
}
308313

314+
if config.EnableSQSTerminationDraining && (config.SqsMsgVisibilityTimeoutSec <= 0 || config.SqsMsgVisibilityTimeoutSec >= 120) {
315+
return config, fmt.Errorf("invalid SqsMsgVisibilityTimeoutSec configuration: SqsMsgVisibilityTimeoutSec valid range from 1 to 119")
316+
}
317+
309318
// client-go expects these to be set in env vars
310319
os.Setenv(kubernetesServiceHostConfigKey, config.KubernetesServiceHost)
311320
os.Setenv(kubernetesServicePortConfigKey, config.KubernetesServicePort)
@@ -367,6 +376,7 @@ func (c Config) PrintJsonConfigArgs() {
367376
Bool("use_apiserver_cache", c.UseAPIServerCacheToListPods).
368377
Int("heartbeat_interval", c.HeartbeatInterval).
369378
Int("heartbeat_until", c.HeartbeatUntil).
379+
Int("sqs_msg_visibility_timeout_sec", c.SqsMsgVisibilityTimeoutSec).
370380
Msg("aws-node-termination-handler arguments")
371381
}
372382

@@ -421,7 +431,8 @@ func (c Config) PrintHumanConfigArgs() {
421431
"\taws-endpoint: %s,\n"+
422432
"\tuse-apiserver-cache: %t,\n"+
423433
"\theartbeat-interval: %d,\n"+
424-
"\theartbeat-until: %d\n",
434+
"\theartbeat-until: %d\n"+
435+
"\tsqs-msg-visibility-timeout-sec: %d\n",
425436
c.DryRun,
426437
c.NodeName,
427438
c.PodName,
@@ -465,6 +476,7 @@ func (c Config) PrintHumanConfigArgs() {
465476
c.UseAPIServerCacheToListPods,
466477
c.HeartbeatInterval,
467478
c.HeartbeatUntil,
479+
c.SqsMsgVisibilityTimeoutSec,
468480
)
469481
}
470482

pkg/config/config_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func TestParseCliArgsEnvSuccess(t *testing.T) {
8484
h.Equals(t, true, nthConfig.UseAPIServerCacheToListPods)
8585
h.Equals(t, 30, nthConfig.HeartbeatInterval)
8686
h.Equals(t, 60, nthConfig.HeartbeatUntil)
87+
h.Equals(t, 20, nthConfig.SqsMsgVisibilityTimeoutSec)
8788

8889
// Check that env vars were set
8990
value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST")
@@ -153,6 +154,7 @@ func TestParseCliArgsSuccess(t *testing.T) {
153154
h.Equals(t, true, nthConfig.UseAPIServerCacheToListPods)
154155
h.Equals(t, 30, nthConfig.HeartbeatInterval)
155156
h.Equals(t, 60, nthConfig.HeartbeatUntil)
157+
h.Equals(t, 20, nthConfig.SqsMsgVisibilityTimeoutSec)
156158

157159
// Check that env vars were set
158160
value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST")
@@ -186,6 +188,7 @@ func TestParseCliArgsOverrides(t *testing.T) {
186188
t.Setenv("CORDON_ONLY", "true")
187189
t.Setenv("HEARTBEAT_INTERVAL", "3601")
188190
t.Setenv("HEARTBEAT_UNTIL", "172801")
191+
t.Setenv("SQS_MSG_VISIBILITY_TIMEOUT_SEC", "30")
189192

190193
os.Args = []string{
191194
"cmd",
@@ -214,6 +217,7 @@ func TestParseCliArgsOverrides(t *testing.T) {
214217
"--prometheus-server-port=2112",
215218
"--heartbeat-interval=3600",
216219
"--heartbeat-until=172800",
220+
"--sqs-msg-visibility-timeout-sec=30",
217221
}
218222
nthConfig, err := config.ParseCliArgs()
219223
h.Ok(t, err)
@@ -244,11 +248,15 @@ func TestParseCliArgsOverrides(t *testing.T) {
244248
h.Equals(t, 2112, nthConfig.PrometheusPort)
245249
h.Equals(t, 3600, nthConfig.HeartbeatInterval)
246250
h.Equals(t, 172800, nthConfig.HeartbeatUntil)
251+
h.Equals(t, 30, nthConfig.SqsMsgVisibilityTimeoutSec)
247252

248253
// Check that env vars were set
249254
value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST")
250255
h.Equals(t, true, ok)
251256
h.Equals(t, "KUBERNETES_SERVICE_HOST", value)
257+
value, ok = os.LookupEnv("SQS_MSG_VISIBILITY_TIMEOUT_SEC")
258+
h.Equals(t, true, ok)
259+
h.Equals(t, "30", value)
252260
}
253261

254262
func TestParseCliArgsWithGracePeriodSuccess(t *testing.T) {

pkg/monitor/sqsevent/sqs-monitor.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"errors"
1919
"fmt"
2020

21+
"github.com/aws/aws-node-termination-handler/pkg/config"
2122
"github.com/aws/aws-node-termination-handler/pkg/logging"
2223
"github.com/aws/aws-node-termination-handler/pkg/monitor"
2324
"github.com/aws/aws-sdk-go/aws"
@@ -54,6 +55,7 @@ type SQSMonitor struct {
5455
CheckIfManaged bool
5556
ManagedTag string
5657
BeforeCompleteLifecycleAction func()
58+
SqsMsgVisibilityTimeoutSec int
5759
}
5860

5961
// InterruptionEventWrapper is a convenience wrapper for associating an interruption event with its error, if any
@@ -294,6 +296,11 @@ func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []Interr
294296

295297
// receiveQueueMessages checks the configured SQS queue for new messages
296298
func (m SQSMonitor) receiveQueueMessages(qURL string) ([]*sqs.Message, error) {
299+
visibilityTimeout := m.SqsMsgVisibilityTimeoutSec
300+
if visibilityTimeout <= 0 || visibilityTimeout >= 120 {
301+
visibilityTimeout = config.SqsMsgVisibilityTimeoutSecDefault
302+
}
303+
297304
result, err := m.SQS.ReceiveMessage(&sqs.ReceiveMessageInput{
298305
AttributeNames: []*string{
299306
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
@@ -303,7 +310,7 @@ func (m SQSMonitor) receiveQueueMessages(qURL string) ([]*sqs.Message, error) {
303310
},
304311
QueueUrl: &qURL,
305312
MaxNumberOfMessages: aws.Int64(10),
306-
VisibilityTimeout: aws.Int64(20), // 20 seconds
313+
VisibilityTimeout: aws.Int64(int64(visibilityTimeout)),
307314
WaitTimeSeconds: aws.Int64(20), // Max long polling
308315
})
309316

0 commit comments

Comments
 (0)