Skip to content

Commit 8cdff83

Browse files
committed
as: Implement backoff for cooldown period of disabled webhooks
1 parent 51e3f42 commit 8cdff83

11 files changed

Lines changed: 542 additions & 381 deletions

File tree

api/ttn/lorawan/v3/api.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,8 @@ Application Server configuration.
12481248
| ----- | ---- | ----- | ----------- |
12491249
| `unhealthy_attempts_threshold` | [`int64`](#int64) | | |
12501250
| `unhealthy_retry_interval` | [`google.protobuf.Duration`](#google.protobuf.Duration) | | |
1251+
| `unhealthy_retry_backoff_interval_min` | [`google.protobuf.Duration`](#google.protobuf.Duration) | | |
1252+
| `unhealthy_retry_backoff_interval_max` | [`google.protobuf.Duration`](#google.protobuf.Duration) | | |
12511253

12521254
### <a name="ttn.lorawan.v3.DecodeDownlinkRequest">Message `DecodeDownlinkRequest`</a>
12531255

api/ttn/lorawan/v3/api.swagger.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17358,6 +17358,12 @@
1735817358
},
1735917359
"unhealthy_retry_interval": {
1736017360
"type": "string"
17361+
},
17362+
"unhealthy_retry_backoff_interval_min": {
17363+
"type": "string"
17364+
},
17365+
"unhealthy_retry_backoff_interval_max": {
17366+
"type": "string"
1736117367
}
1736217368
}
1736317369
},

api/ttn/lorawan/v3/applicationserver.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,10 @@ message AsConfiguration {
102102
message Webhooks {
103103
int64 unhealthy_attempts_threshold = 1;
104104
google.protobuf.Duration unhealthy_retry_interval = 2;
105-
106105
reserved 3;
107106
reserved "queue";
107+
google.protobuf.Duration unhealthy_retry_backoff_interval_min = 4;
108+
google.protobuf.Duration unhealthy_retry_backoff_interval_max = 5;
108109
}
109110
Webhooks webhooks = 2;
110111
}

pkg/applicationserver/config.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -155,21 +155,25 @@ type UplinkStorageConfig struct {
155155

156156
// WebhooksConfig defines the configuration of the webhooks integration.
157157
type WebhooksConfig struct {
158-
Registry web.WebhookRegistry `name:"-"`
159-
Target string `name:"target" description:"Target of the integration (direct)"`
160-
Timeout time.Duration `name:"timeout" description:"Wait timeout of the target to process the request"` // nolint:lll
161-
QueueSize int `name:"queue-size" description:"Number of requests to queue"`
162-
Workers int `name:"workers" description:"Number of workers to process requests"`
163-
UnhealthyAttemptsThreshold int `name:"unhealthy-attempts-threshold" description:"Number of failed webhook attempts before the webhook is disabled"` // nolint:lll
164-
UnhealthyRetryInterval time.Duration `name:"unhealthy-retry-interval" description:"Time interval after which disabled webhooks may execute again"`
165-
Templates web.TemplatesConfig `name:"templates" description:"The store of the webhook templates"`
166-
Downlinks web.DownlinksConfig `name:"downlink" description:"The downlink queue operations configuration"`
158+
Registry web.WebhookRegistry `name:"-"`
159+
Target string `name:"target" description:"Target of the integration (direct)"`
160+
Timeout time.Duration `name:"timeout" description:"Wait timeout of the target to process the request"` // nolint:lll
161+
QueueSize int `name:"queue-size" description:"Number of requests to queue"`
162+
Workers int `name:"workers" description:"Number of workers to process requests"`
163+
UnhealthyAttemptsThreshold int `name:"unhealthy-attempts-threshold" description:"Number of failed webhook attempts before the webhook is disabled"` // nolint:lll
164+
UnhealthyRetryInterval time.Duration `name:"unhealthy-retry-interval" description:"Time interval after which disabled webhooks may execute again"`
165+
UnhealthyRetryBackoffIntervalMin time.Duration `name:"unhealthy-retry-min-backoff-interval" description:"Minimum backoff interval for retrying unhealthy webhooks"`
166+
UnhealthyRetryBackoffIntervalMax time.Duration `name:"unhealthy-retry-max-backoff-interval" description:"Maximum backoff interval for retrying unhealthy webhooks"`
167+
Templates web.TemplatesConfig `name:"templates" description:"The store of the webhook templates"`
168+
Downlinks web.DownlinksConfig `name:"downlink" description:"The downlink queue operations configuration"`
167169
}
168170

169171
func (c WebhooksConfig) toProto() *ttnpb.AsConfiguration_Webhooks {
170172
return &ttnpb.AsConfiguration_Webhooks{
171-
UnhealthyAttemptsThreshold: int64(c.UnhealthyAttemptsThreshold),
172-
UnhealthyRetryInterval: durationpb.New(c.UnhealthyRetryInterval),
173+
UnhealthyAttemptsThreshold: int64(c.UnhealthyAttemptsThreshold),
174+
UnhealthyRetryInterval: durationpb.New(c.UnhealthyRetryInterval),
175+
UnhealthyRetryBackoffIntervalMin: durationpb.New(c.UnhealthyRetryBackoffIntervalMin),
176+
UnhealthyRetryBackoffIntervalMax: durationpb.New(c.UnhealthyRetryBackoffIntervalMax),
173177
}
174178
}
175179

@@ -273,7 +277,7 @@ func (c WebhooksConfig) NewWebhooks(ctx context.Context, server io.Server) (web.
273277
if c.UnhealthyAttemptsThreshold > 0 || c.UnhealthyRetryInterval > 0 {
274278
registry := web.NewHealthStatusRegistry(c.Registry)
275279
registry = web.NewCachedHealthStatusRegistry(registry)
276-
target = sink.NewHealthCheckSink(target, registry, c.UnhealthyAttemptsThreshold, c.UnhealthyRetryInterval)
280+
target = sink.NewHealthCheckSink(target, registry, c.UnhealthyAttemptsThreshold, c.UnhealthyRetryInterval, c.UnhealthyRetryBackoffIntervalMin, c.UnhealthyRetryBackoffIntervalMax)
277281
}
278282
if c.QueueSize > 0 || c.Workers > 0 {
279283
target = sink.NewPooledSink(ctx, server, target, c.Workers, c.QueueSize)

pkg/applicationserver/io/web/sink/health.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ type healthCheckSink struct {
3737

3838
unhealthyAttemptsThreshold int
3939
unhealthyRetryInterval time.Duration
40+
minBackoffInterval time.Duration
41+
maxBackoffInterval time.Duration
4042
}
4143

4244
// Process runs the health checks and sends the request to the underlying sink
@@ -82,8 +84,11 @@ func (hcs *healthCheckSink) preRunCheck(ctx context.Context) (healthState, error
8284
case h.GetUnhealthy() != nil:
8385
h := h.GetUnhealthy()
8486
monitorOnly := hcs.unhealthyAttemptsThreshold <= 0 || hcs.unhealthyRetryInterval <= 0
85-
nextAttemptAt := ttnpb.StdTime(h.LastFailedAttemptAt).Add(hcs.unhealthyRetryInterval)
87+
88+
backoffInterval := hcs.calculateBackoffInterval(h.FailedAttempts)
89+
nextAttemptAt := ttnpb.StdTime(h.LastFailedAttemptAt).Add(backoffInterval)
8690
retryIntervalPassed := time.Now().After(nextAttemptAt)
91+
8792
switch {
8893
case monitorOnly:
8994
// The system only monitors the health status, but does not block execution.
@@ -116,6 +121,36 @@ func (hcs *healthCheckSink) preRunCheck(ctx context.Context) (healthState, error
116121
}
117122
}
118123

124+
// calculateBackoffInterval calculates the retry interval using exponential backoff.
125+
// For webhooks that have been disabled (failedAttempts >= threshold), it uses exponential backoff
126+
// based on how many times the webhook has been disabled. The interval doubles on each disabled period
127+
// and is capped between minBackoffInterval and maxBackoffInterval.
128+
func (hcs *healthCheckSink) calculateBackoffInterval(failedAttempts uint64) time.Duration {
129+
if failedAttempts < uint64(hcs.unhealthyAttemptsThreshold) {
130+
return hcs.unhealthyRetryInterval
131+
}
132+
133+
// Calculate how many times the webhook has been disabled.
134+
disabledPeriods := failedAttempts / uint64(hcs.unhealthyAttemptsThreshold)
135+
136+
// Use exponential backoff: baseInterval * 2^disabledPeriods, capped at max.
137+
backoffInterval := hcs.unhealthyRetryInterval
138+
for i := uint64(1); i < disabledPeriods; i++ {
139+
backoffInterval *= 2
140+
if backoffInterval > hcs.maxBackoffInterval {
141+
backoffInterval = hcs.maxBackoffInterval
142+
break
143+
}
144+
}
145+
146+
// Ensure the backoff is at least the minimum.
147+
if backoffInterval < hcs.minBackoffInterval {
148+
backoffInterval = hcs.minBackoffInterval
149+
}
150+
151+
return backoffInterval
152+
}
153+
119154
// executeAndRecord runs the provided request using the underlying sink and records the health status.
120155
func (hcs *healthCheckSink) executeAndRecord(
121156
ctx context.Context, req *http.Request, lastKnownState healthState,
@@ -164,14 +199,19 @@ func (hcs *healthCheckSink) executeAndRecord(
164199
}
165200

166201
// NewHealthCheckSink creates a Sink that records the health status of the webhooks and stops them from executing if
167-
// too many fail in a specified interval of time.
202+
// too many fail in a specified interval of time. The cooldown period uses exponential backoff, starting from
203+
// unhealthyRetryInterval and doubling on each subsequent disabled period, capped between minBackoffInterval and
204+
// maxBackoffInterval.
168205
func NewHealthCheckSink(
169-
sink Sink, registry HealthStatusRegistry, unhealthyAttemptsThreshold int, unhealthyRetryInterval time.Duration,
206+
sink Sink, registry HealthStatusRegistry, unhealthyAttemptsThreshold int, unhealthyRetryInterval,
207+
minBackoffInterval, maxBackoffInterval time.Duration,
170208
) Sink {
171209
return &healthCheckSink{
172210
sink: sink,
173211
registry: registry,
174212
unhealthyAttemptsThreshold: unhealthyAttemptsThreshold,
175213
unhealthyRetryInterval: unhealthyRetryInterval,
214+
minBackoffInterval: minBackoffInterval,
215+
maxBackoffInterval: maxBackoffInterval,
176216
}
177217
}

pkg/applicationserver/io/web/sink/health_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func TestHealthCheckSink(t *testing.T) { // nolint:gocyclo
6666
target := mock.New(sinkCh)
6767

6868
registry := web.NewHealthStatusRegistry(webhookRegistry)
69-
healthSink := sink.NewHealthCheckSink(target, registry, 4, 8*timeout)
69+
healthSink := sink.NewHealthCheckSink(target, registry, 4, 8*timeout, 4*timeout, 32*timeout)
7070

7171
ctx = internal.WithWebhookData(ctx, &internal.WebhookData{
7272
EndDeviceIDs: registeredDeviceID,
@@ -151,7 +151,7 @@ func TestHealthCheckSink(t *testing.T) { // nolint:gocyclo
151151
}
152152
}
153153

154-
// We wait for the cooldown period to pass.
154+
// We wait for the first cooldown period to pass (8 * timeout).
155155
time.Sleep(8 * timeout)
156156

157157
// The sink should now do one attempt, and fail the rest.
@@ -191,8 +191,11 @@ func TestHealthCheckSink(t *testing.T) { // nolint:gocyclo
191191
}
192192
}
193193

194-
// We wait for the cooldown period to pass.
195-
time.Sleep(8 * timeout)
194+
// We wait for the second cooldown period to pass.
195+
// Since the webhook has been disabled twice now (failedAttempts went from 4 to 5, then to 8),
196+
// the backoff interval doubles from 8*timeout to 16*timeout.
197+
// We wait for 16*timeout to ensure the retry period has passed.
198+
time.Sleep(16 * timeout)
196199

197200
// We reset the error and expect the health status to recover.
198201
target.SetError(nil)

0 commit comments

Comments
 (0)