Skip to content

Commit 85fdac2

Browse files
committed
chore: unwired failover strategy
1 parent 00aea5f commit 85fdac2

2 files changed

Lines changed: 242 additions & 0 deletions

File tree

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// reference implementation here: https://github.com/Unleash/unleash-node-sdk/pull/780
2+
3+
package io.getunleash.streaming;
4+
5+
import java.time.Instant;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import java.util.Set;
9+
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
class FailoverStrategy {
14+
15+
private static final Logger LOGGER = LoggerFactory.getLogger(FailoverStrategy.class);
16+
private static final Set<String> FAILOVER_SERVER_HINTS = Set.of("polling");
17+
private static final Set<Integer> HARD_FAILOVER_STATUS_CODES = Set.of(401, 403, 404, 429, 501);
18+
private static final Set<Integer> SOFT_FAILOVER_STATUS_CODES = Set.of(408, 500, 502, 503, 504);
19+
20+
private final int maxFails;
21+
private final long relaxTimeMs;
22+
private final List<FailEvent> failures = new ArrayList<>();
23+
24+
FailoverStrategy(int maxFails, long relaxTimeMs) {
25+
this.maxFails = maxFails;
26+
this.relaxTimeMs = relaxTimeMs;
27+
}
28+
29+
boolean shouldFailover(FailEvent event) {
30+
return shouldFailover(event, Instant.now());
31+
}
32+
33+
boolean shouldFailover(FailEvent event, Instant now) {
34+
long nowMs = now.toEpochMilli();
35+
pruneOldFailures(nowMs);
36+
37+
switch (event.getType()) {
38+
case HTTP_STATUS_ERROR:
39+
return handleHttpStatus((HttpStatusError) event);
40+
case SERVER_HINT:
41+
return handleServerEvent((ServerEvent) event);
42+
case NETWORK_ERROR:
43+
return handleNetwork(event);
44+
default:
45+
LOGGER.warn(
46+
"Responding to an unknown event, this should not have occurred please report this. Streaming will continue to operate without failing over to polling");
47+
return false;
48+
}
49+
}
50+
51+
private boolean handleServerEvent(ServerEvent event) {
52+
return FAILOVER_SERVER_HINTS.contains(event.getEvent());
53+
}
54+
55+
private boolean handleNetwork(FailEvent event) {
56+
return hasTooManyFails(event);
57+
}
58+
59+
private boolean handleHttpStatus(HttpStatusError event) {
60+
if (HARD_FAILOVER_STATUS_CODES.contains(event.getStatusCode())) {
61+
return true;
62+
}
63+
if (SOFT_FAILOVER_STATUS_CODES.contains(event.getStatusCode())) {
64+
return hasTooManyFails(event);
65+
}
66+
return false;
67+
}
68+
69+
private boolean hasTooManyFails(FailEvent event) {
70+
failures.add(event);
71+
return failures.size() >= maxFails;
72+
}
73+
74+
private void pruneOldFailures(long nowMs) {
75+
long cutoff = nowMs - relaxTimeMs;
76+
int write = 0;
77+
for (int read = 0; read < failures.size(); read++) {
78+
FailEvent failure = failures.get(read);
79+
if (failure.getOccurredAt().toEpochMilli() >= cutoff) {
80+
failures.set(write++, failure);
81+
}
82+
}
83+
if (write < failures.size()) {
84+
failures.subList(write, failures.size()).clear();
85+
}
86+
}
87+
88+
interface FailEvent {
89+
Instant getOccurredAt();
90+
91+
String getMessage();
92+
93+
EventType getType();
94+
}
95+
96+
private enum EventType {
97+
NETWORK_ERROR,
98+
HTTP_STATUS_ERROR,
99+
SERVER_HINT
100+
}
101+
102+
abstract static class BaseFailEvent implements FailEvent {
103+
private final Instant occurredAt;
104+
private final String message;
105+
106+
protected BaseFailEvent(Instant occurredAt, String message) {
107+
this.occurredAt = occurredAt;
108+
this.message = message;
109+
}
110+
111+
@Override
112+
public Instant getOccurredAt() {
113+
return occurredAt;
114+
}
115+
116+
@Override
117+
public String getMessage() {
118+
return message;
119+
}
120+
}
121+
122+
public static final class NetworkEventError extends BaseFailEvent {
123+
public NetworkEventError(Instant occurredAt, String message) {
124+
super(occurredAt, message);
125+
}
126+
127+
@Override
128+
public EventType getType() {
129+
return EventType.NETWORK_ERROR;
130+
}
131+
}
132+
133+
public static final class HttpStatusError extends BaseFailEvent {
134+
private final int statusCode;
135+
136+
public HttpStatusError(Instant occurredAt, String message, int statusCode) {
137+
super(occurredAt, message);
138+
this.statusCode = statusCode;
139+
}
140+
141+
public int getStatusCode() {
142+
return statusCode;
143+
}
144+
145+
@Override
146+
public EventType getType() {
147+
return EventType.HTTP_STATUS_ERROR;
148+
}
149+
}
150+
151+
public static final class ServerEvent extends BaseFailEvent {
152+
private final String event;
153+
154+
public ServerEvent(Instant occurredAt, String message, String event) {
155+
super(occurredAt, message);
156+
this.event = event;
157+
}
158+
159+
public String getEvent() {
160+
return event;
161+
}
162+
163+
@Override
164+
public EventType getType() {
165+
return EventType.SERVER_HINT;
166+
}
167+
}
168+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package io.getunleash.streaming;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import io.getunleash.streaming.FailoverStrategy.HttpStatusError;
6+
import io.getunleash.streaming.FailoverStrategy.NetworkEventError;
7+
import io.getunleash.streaming.FailoverStrategy.ServerEvent;
8+
import java.time.Instant;
9+
import org.junit.jupiter.api.Test;
10+
11+
class FailoverStrategyTest {
12+
13+
@Test
14+
void should_fail_immediately_on_server_hint_polling() {
15+
FailoverStrategy strategy = new FailoverStrategy(3, 5_000);
16+
ServerEvent event = new ServerEvent(Instant.now(), "Switch to polling", "polling");
17+
18+
assertThat(strategy.shouldFailover(event)).isTrue();
19+
}
20+
21+
@Test
22+
void should_ignore_unknown_server_hint() {
23+
FailoverStrategy strategy = new FailoverStrategy(3, 5_000);
24+
ServerEvent event = new ServerEvent(Instant.now(), "Restart complete", "unleash-restarted");
25+
26+
assertThat(strategy.shouldFailover(event)).isFalse();
27+
}
28+
29+
@Test
30+
void should_fail_immediately_on_hard_http_status() {
31+
FailoverStrategy strategy = new FailoverStrategy(3, 5_000);
32+
HttpStatusError error = new HttpStatusError(Instant.now(), "Too many connections", 429);
33+
34+
assertThat(strategy.shouldFailover(error)).isTrue();
35+
}
36+
37+
@Test
38+
void should_fail_after_sliding_window_on_soft_http_status() {
39+
FailoverStrategy strategy = new FailoverStrategy(2, 10_000);
40+
Instant now = Instant.now();
41+
42+
HttpStatusError first = new HttpStatusError(now, "Temporary error", 503);
43+
HttpStatusError second = new HttpStatusError(now.plusMillis(5), "Still failing", 503);
44+
45+
assertThat(strategy.shouldFailover(first, now)).isFalse();
46+
assertThat(strategy.shouldFailover(second, now.plusMillis(5))).isTrue();
47+
}
48+
49+
@Test
50+
void network_errors_contribute_to_sliding_window() {
51+
FailoverStrategy strategy = new FailoverStrategy(2, 10_000);
52+
Instant now = Instant.now();
53+
54+
NetworkEventError first = new NetworkEventError(now, "Connection reset");
55+
NetworkEventError second = new NetworkEventError(now.plusMillis(1), "Socket timeout");
56+
57+
assertThat(strategy.shouldFailover(first, now)).isFalse();
58+
assertThat(strategy.shouldFailover(second, now.plusMillis(1))).isTrue();
59+
}
60+
61+
@Test
62+
void sliding_window_drops_old_failures() {
63+
FailoverStrategy strategy = new FailoverStrategy(2, 1_000);
64+
Instant base = Instant.EPOCH;
65+
66+
NetworkEventError first = new NetworkEventError(base, "Initial blip");
67+
NetworkEventError second = new NetworkEventError(base.plusMillis(1_500), "Recovered blip");
68+
NetworkEventError third = new NetworkEventError(base.plusMillis(1_510), "Another blip");
69+
70+
assertThat(strategy.shouldFailover(first, base)).isFalse();
71+
assertThat(strategy.shouldFailover(second, base.plusMillis(1_500))).isFalse();
72+
assertThat(strategy.shouldFailover(third, base.plusMillis(1_510))).isTrue();
73+
}
74+
}

0 commit comments

Comments
 (0)