Skip to content

Commit c23f0b4

Browse files
committed
chore: wire up failover
1 parent 85fdac2 commit c23f0b4

3 files changed

Lines changed: 191 additions & 5 deletions

File tree

src/main/java/io/getunleash/streaming/FailoverStrategy.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import java.util.ArrayList;
77
import java.util.List;
88
import java.util.Set;
9-
109
import org.slf4j.Logger;
1110
import org.slf4j.LoggerFactory;
1211

src/main/java/io/getunleash/streaming/StreamingFeatureFetcherImpl.java

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.launchdarkly.eventsource.ConnectStrategy;
44
import com.launchdarkly.eventsource.EventSource;
55
import com.launchdarkly.eventsource.MessageEvent;
6+
import com.launchdarkly.eventsource.StreamHttpErrorException;
67
import com.launchdarkly.eventsource.background.BackgroundEventHandler;
78
import com.launchdarkly.eventsource.background.BackgroundEventSource;
89
import io.getunleash.UnleashException;
@@ -15,18 +16,22 @@
1516
import io.getunleash.util.UnleashConfig;
1617
import java.net.URI;
1718
import java.time.Duration;
19+
import java.time.Instant;
1820
import okhttp3.Headers;
1921
import okhttp3.OkHttpClient;
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224

2325
public class StreamingFeatureFetcherImpl implements FetchWorker {
2426
private static final Logger LOGGER = LoggerFactory.getLogger(StreamingFeatureFetcherImpl.class);
27+
private static final int DEFAULT_MAX_FAILURES = 5;
28+
private static final long DEFAULT_FAIL_WINDOW_MS = 60_000L;
2529

2630
private final UnleashConfig config;
2731
private final EventDispatcher eventDispatcher;
2832
private final UnleashEngine engine;
2933
private final BackupHandler featureBackupHandler;
34+
private final FailoverStrategy failoverStrategy;
3035
private boolean ready;
3136

3237
private volatile BackgroundEventSource eventSource;
@@ -36,10 +41,25 @@ public StreamingFeatureFetcherImpl(
3641
EventDispatcher eventDispatcher,
3742
UnleashEngine engine,
3843
BackupHandler featureBackupHandler) {
44+
this(
45+
config,
46+
eventDispatcher,
47+
engine,
48+
featureBackupHandler,
49+
new FailoverStrategy(DEFAULT_MAX_FAILURES, DEFAULT_FAIL_WINDOW_MS));
50+
}
51+
52+
StreamingFeatureFetcherImpl(
53+
UnleashConfig config,
54+
EventDispatcher eventDispatcher,
55+
UnleashEngine engine,
56+
BackupHandler featureBackupHandler,
57+
FailoverStrategy failoverStrategy) {
3958
this.config = config;
4059
this.eventDispatcher = eventDispatcher;
4160
this.engine = engine;
4261
this.featureBackupHandler = featureBackupHandler;
62+
this.failoverStrategy = failoverStrategy;
4363
}
4464

4565
public void start() {
@@ -116,6 +136,61 @@ synchronized void handleStreamingUpdate(String data) {
116136
}
117137
}
118138

139+
void handleStreamingError(Throwable throwable) {
140+
handleFailoverDecision(toFailEvent(throwable));
141+
}
142+
143+
void handleModeChange(String eventData) {
144+
if (eventData.equals("polling")) {
145+
FailoverStrategy.ServerEvent failEvent =
146+
new FailoverStrategy.ServerEvent(
147+
Instant.now(),
148+
"Server has explicitly requested switching to polling mode",
149+
eventData);
150+
handleFailoverDecision(failEvent);
151+
} else {
152+
LOGGER.debug("Ignoring an unrecognized fetch mode change to {}", eventData);
153+
}
154+
}
155+
156+
void handleServerDisconnect() {
157+
FailoverStrategy.NetworkEventError failEvent =
158+
new FailoverStrategy.NetworkEventError(
159+
Instant.now(), "Server closed the streaming connection");
160+
handleFailoverDecision(failEvent);
161+
}
162+
163+
private FailoverStrategy.FailEvent toFailEvent(Throwable throwable) {
164+
Instant now = Instant.now();
165+
if (throwable instanceof StreamHttpErrorException) {
166+
int statusCode = ((StreamHttpErrorException) throwable).getCode();
167+
String message =
168+
throwable.getMessage() != null
169+
? throwable.getMessage()
170+
: String.format(
171+
"Streaming failed with http status code %d", statusCode);
172+
return new FailoverStrategy.HttpStatusError(now, message, statusCode);
173+
}
174+
175+
// Not an HTTP problem so something has likely gone wrong on the network layer
176+
String message =
177+
(throwable != null && throwable.getMessage() != null)
178+
? throwable.getMessage()
179+
: "Network error occurred in streaming";
180+
return new FailoverStrategy.NetworkEventError(now, message);
181+
}
182+
183+
private void handleFailoverDecision(FailoverStrategy.FailEvent failEvent) {
184+
boolean shouldFail = failoverStrategy.shouldFailover(failEvent, Instant.now());
185+
if (shouldFail) {
186+
LOGGER.warn(
187+
"Streaming failover triggered: {}. Client is switching over to polling mode.",
188+
failEvent.getMessage());
189+
190+
// changeToPolling()
191+
}
192+
}
193+
119194
private class UnleashEventHandler implements BackgroundEventHandler {
120195

121196
@Override
@@ -126,6 +201,7 @@ public void onOpen() throws Exception {
126201
@Override
127202
public void onClosed() throws Exception {
128203
LOGGER.info("Streaming connection to Unleash server closed");
204+
handleServerDisconnect();
129205
}
130206

131207
@Override
@@ -141,6 +217,9 @@ public void onMessage(String event, MessageEvent messageEvent) throws Exception
141217
case "unleash-updated":
142218
handleStreamingUpdate(messageEvent.getData());
143219
break;
220+
case "fetch-mode":
221+
handleModeChange(messageEvent.getData());
222+
break;
144223
default:
145224
LOGGER.debug("Ignoring unknown event type: {}", event);
146225
}
@@ -154,13 +233,13 @@ public void onMessage(String event, MessageEvent messageEvent) throws Exception
154233
}
155234

156235
@Override
157-
public void onComment(String comment) throws Exception {}
236+
public void onComment(String comment) throws Exception {
237+
// gotta implement this because inheritance reasons but we don't care about it
238+
}
158239

159240
@Override
160241
public void onError(Throwable t) {
161-
UnleashException unleashException =
162-
new UnleashException("Streaming connection error", t);
163-
eventDispatcher.dispatch(unleashException);
242+
handleStreamingError(t);
164243
}
165244
}
166245
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package io.getunleash.streaming;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.mockito.ArgumentMatchers.any;
5+
import static org.mockito.Mockito.mock;
6+
import static org.mockito.Mockito.verify;
7+
import static org.mockito.Mockito.when;
8+
9+
import com.launchdarkly.eventsource.StreamHttpErrorException;
10+
import io.getunleash.engine.UnleashEngine;
11+
import io.getunleash.event.EventDispatcher;
12+
import io.getunleash.repository.BackupHandler;
13+
import io.getunleash.util.UnleashConfig;
14+
import java.time.Instant;
15+
import org.junit.jupiter.api.BeforeEach;
16+
import org.junit.jupiter.api.Test;
17+
import org.mockito.ArgumentCaptor;
18+
19+
class StreamingFailoverEventsTest {
20+
21+
private UnleashConfig config;
22+
private EventDispatcher dispatcher;
23+
private UnleashEngine engine;
24+
private BackupHandler backupHandler;
25+
26+
@BeforeEach
27+
void setup() {
28+
config =
29+
UnleashConfig.builder()
30+
.appName("streaming-failover-test")
31+
.unleashAPI("http://localhost:4242/api/")
32+
.instanceId("failover-test")
33+
.experimentalStreamingMode()
34+
.disableMetrics()
35+
.build();
36+
dispatcher = new EventDispatcher(config);
37+
engine = new UnleashEngine();
38+
backupHandler = mock(BackupHandler.class);
39+
}
40+
41+
@Test
42+
void fetch_mode_polling_event_is_forwarded_as_server_hint() {
43+
FailoverStrategy strategy = mock(FailoverStrategy.class);
44+
when(strategy.shouldFailover(any(FailoverStrategy.FailEvent.class), any(Instant.class)))
45+
.thenReturn(false);
46+
StreamingFeatureFetcherImpl fetcher = newFetcher(strategy);
47+
48+
fetcher.handleModeChange("polling");
49+
50+
ArgumentCaptor<FailoverStrategy.FailEvent> captor =
51+
ArgumentCaptor.forClass(FailoverStrategy.FailEvent.class);
52+
verify(strategy).shouldFailover(captor.capture(), any(Instant.class));
53+
54+
assertThat(captor.getValue()).isInstanceOf(FailoverStrategy.ServerEvent.class);
55+
FailoverStrategy.ServerEvent event = (FailoverStrategy.ServerEvent) captor.getValue();
56+
assertThat(event.getEvent()).isEqualTo("polling");
57+
assertThat(event.getMessage())
58+
.isEqualTo("Server has explicitly requested switching to polling mode");
59+
}
60+
61+
@Test
62+
void http_errors_are_forwarded_as_http_status_events() {
63+
FailoverStrategy strategy = mock(FailoverStrategy.class);
64+
when(strategy.shouldFailover(any(FailoverStrategy.FailEvent.class), any(Instant.class)))
65+
.thenReturn(false);
66+
StreamingFeatureFetcherImpl fetcher = newFetcher(strategy);
67+
68+
StreamHttpErrorException exception = mock(StreamHttpErrorException.class);
69+
when(exception.getCode()).thenReturn(503);
70+
when(exception.getMessage()).thenReturn("503 from upstream");
71+
72+
fetcher.handleStreamingError(exception);
73+
74+
ArgumentCaptor<FailoverStrategy.FailEvent> captor =
75+
ArgumentCaptor.forClass(FailoverStrategy.FailEvent.class);
76+
ArgumentCaptor<Instant> timestampCaptor = ArgumentCaptor.forClass(Instant.class);
77+
verify(strategy).shouldFailover(captor.capture(), timestampCaptor.capture());
78+
79+
assertThat(captor.getValue()).isInstanceOf(FailoverStrategy.HttpStatusError.class);
80+
FailoverStrategy.HttpStatusError event =
81+
(FailoverStrategy.HttpStatusError) captor.getValue();
82+
assertThat(event.getStatusCode()).isEqualTo(503);
83+
assertThat(event.getMessage()).isEqualTo("503 from upstream");
84+
assertThat(event.getOccurredAt()).isBeforeOrEqualTo(timestampCaptor.getValue());
85+
}
86+
87+
@Test
88+
void server_disconnect_is_treated_as_network_error() {
89+
FailoverStrategy strategy = mock(FailoverStrategy.class);
90+
when(strategy.shouldFailover(any(FailoverStrategy.FailEvent.class), any(Instant.class)))
91+
.thenReturn(false);
92+
StreamingFeatureFetcherImpl fetcher = newFetcher(strategy);
93+
94+
fetcher.handleServerDisconnect();
95+
96+
ArgumentCaptor<FailoverStrategy.FailEvent> captor =
97+
ArgumentCaptor.forClass(FailoverStrategy.FailEvent.class);
98+
verify(strategy).shouldFailover(captor.capture(), any(Instant.class));
99+
100+
assertThat(captor.getValue()).isInstanceOf(FailoverStrategy.NetworkEventError.class);
101+
assertThat(captor.getValue().getMessage())
102+
.isEqualTo("Server closed the streaming connection");
103+
}
104+
105+
private StreamingFeatureFetcherImpl newFetcher(FailoverStrategy strategy) {
106+
return new StreamingFeatureFetcherImpl(config, dispatcher, engine, backupHandler, strategy);
107+
}
108+
}

0 commit comments

Comments
 (0)