Skip to content

Commit 9a1f89b

Browse files
authored
fix: tight loop in RPC mode event stream reconnect (#1789)
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
1 parent c2fb0fe commit 9a1f89b

3 files changed

Lines changed: 165 additions & 61 deletions

File tree

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolver.java

Lines changed: 79 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
1313
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
1414
import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver;
15+
import dev.openfeature.contrib.providers.flagd.resolver.common.ShutdownUtils;
1516
import dev.openfeature.contrib.providers.flagd.resolver.common.StreamResponseModel;
1617
import dev.openfeature.contrib.providers.flagd.resolver.rpc.cache.Cache;
1718
import dev.openfeature.contrib.providers.flagd.resolver.rpc.strategy.ResolveFactory;
@@ -46,7 +47,10 @@
4647
import java.util.ArrayList;
4748
import java.util.List;
4849
import java.util.Map;
50+
import java.util.concurrent.Executors;
4951
import java.util.concurrent.LinkedBlockingQueue;
52+
import java.util.concurrent.RejectedExecutionException;
53+
import java.util.concurrent.ScheduledExecutorService;
5054
import java.util.concurrent.TimeUnit;
5155
import java.util.concurrent.atomic.AtomicBoolean;
5256
import java.util.function.Function;
@@ -62,16 +66,23 @@
6266
public final class RpcResolver implements Resolver {
6367
private static final int QUEUE_SIZE = 5;
6468
private final AtomicBoolean shutdown = new AtomicBoolean(false);
69+
private final AtomicBoolean shouldThrottle = new AtomicBoolean(false);
6570
private final AtomicBoolean successfulConnection = new AtomicBoolean(false);
6671
private final ChannelConnector connector;
6772
private final Cache cache;
6873
private final ResolveStrategy strategy;
6974
private final FlagdOptions options;
75+
private final int maxBackoffMs;
7076
private final LinkedBlockingQueue<StreamResponseModel<EventStreamResponse>> incomingQueue;
7177
private final TriConsumer<ProviderEvent, ProviderEventDetails, Structure> onProviderEvent;
7278
private final ServiceStub stub;
7379
private final ServiceBlockingStub blockingStub;
7480
private final List<String> fatalStatusCodes;
81+
private final ScheduledExecutorService retryScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
82+
Thread t = new Thread(r, "flagd-rpc-retry-scheduler");
83+
t.setDaemon(true);
84+
return t;
85+
});
7586

7687
/**
7788
* Resolves flag values using
@@ -96,6 +107,7 @@ public RpcResolver(
96107
this.blockingStub =
97108
ServiceGrpc.newBlockingStub(this.connector.getChannel()).withWaitForReady();
98109
this.fatalStatusCodes = options.getFatalStatusCodes();
110+
this.maxBackoffMs = options.getRetryBackoffMaxMs();
99111
}
100112

101113
// testing only
@@ -115,23 +127,14 @@ protected RpcResolver(
115127
this.stub = mockStub;
116128
this.blockingStub = mockBlockingStub;
117129
this.fatalStatusCodes = options.getFatalStatusCodes();
130+
this.maxBackoffMs = options.getRetryBackoffMaxMs();
118131
}
119132

120133
/**
121134
* Initialize RpcResolver resolver.
122135
*/
123136
public void init() throws Exception {
124-
Thread listener = new Thread(() -> {
125-
try {
126-
observeEventStream();
127-
} catch (InterruptedException e) {
128-
log.warn("gRPC event stream interrupted, flag configurations are stale", e);
129-
Thread.currentThread().interrupt();
130-
}
131-
});
132-
133-
listener.setDaemon(true);
134-
listener.start();
137+
retryScheduler.execute(this::observeEventStream);
135138
}
136139

137140
/**
@@ -141,6 +144,9 @@ public void shutdown() throws Exception {
141144
if (shutdown.getAndSet(true)) {
142145
return;
143146
}
147+
this.retryScheduler.shutdownNow();
148+
ShutdownUtils.awaitTerminationQuietly(
149+
() -> retryScheduler.awaitTermination(options.getDeadline(), TimeUnit.MILLISECONDS));
144150
this.connector.shutdown();
145151
}
146152

@@ -337,63 +343,81 @@ private void restartStream() {
337343
}
338344

339345
/** Contains blocking calls, to be used concurrently. */
340-
private void observeEventStream() throws InterruptedException {
346+
private void observeEventStream() {
341347

342348
log.info("Initializing event stream observer");
343349

344350
// outer loop for re-issuing the stream request
345351
// "waitForReady" on the channel, plus our retry policy slow this loop down in error conditions
346352
while (!shutdown.get()) {
347-
348-
log.debug("Initializing event stream request");
349-
restartStream();
350-
// inner loop for handling messages
351-
while (!shutdown.get()) {
352-
final StreamResponseModel<EventStreamResponse> taken = incomingQueue.take();
353-
if (taken.isComplete()) {
354-
log.debug("Event stream completed, will reconnect");
355-
this.handleErrorOrComplete(false);
356-
// The stream is complete, we still try to reconnect
357-
break;
353+
try {
354+
// explicit backoff after stream errors to prevent tight loops when errors are returned immediately
355+
// (e.g., by intervening proxies like Envoy)
356+
if (shouldThrottle.getAndSet(false)) {
357+
log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs);
358+
try {
359+
retryScheduler.schedule(this::observeEventStream, this.maxBackoffMs, TimeUnit.MILLISECONDS);
360+
} catch (RejectedExecutionException e) {
361+
log.debug("Retry scheduling rejected, most likely shutdown was invoked", e);
362+
}
363+
return;
358364
}
359365

360-
Throwable streamException = taken.getError();
361-
if (streamException != null) {
362-
if (streamException instanceof StatusRuntimeException
363-
&& fatalStatusCodes.contains(((StatusRuntimeException) streamException)
364-
.getStatus()
365-
.getCode()
366-
.name())
367-
&& !successfulConnection.get()) {
368-
log.debug(
369-
"Fatal error code received: {}",
370-
((StatusRuntimeException) streamException)
371-
.getStatus()
372-
.getCode());
373-
this.handleErrorOrComplete(true);
374-
} else {
375-
log.debug(
376-
"Exception in event stream connection, streamException {}, will reconnect",
377-
streamException);
366+
log.debug("Initializing event stream request");
367+
restartStream();
368+
// inner loop for handling messages
369+
while (!shutdown.get()) {
370+
final StreamResponseModel<EventStreamResponse> taken = incomingQueue.take();
371+
if (taken.isComplete()) {
372+
log.debug("Event stream completed, will reconnect");
378373
this.handleErrorOrComplete(false);
374+
shouldThrottle.set(true);
375+
// the stream is complete, we still try to reconnect
376+
break;
379377
}
380-
break;
381-
}
382-
383-
successfulConnection.set(true);
384-
final EventStreamResponse response = taken.getResponse();
385-
log.debug("Got stream response: {}", response);
386378

387-
switch (response.getType()) {
388-
case Constants.CONFIGURATION_CHANGE:
389-
this.handleConfigurationChangeEvent(response);
390-
break;
391-
case Constants.PROVIDER_READY:
392-
this.handleProviderReadyEvent();
379+
Throwable streamException = taken.getError();
380+
if (streamException != null) {
381+
if (streamException instanceof StatusRuntimeException
382+
&& fatalStatusCodes.contains(((StatusRuntimeException) streamException)
383+
.getStatus()
384+
.getCode()
385+
.name())
386+
&& !successfulConnection.get()) {
387+
log.debug(
388+
"Fatal error code received: {}",
389+
((StatusRuntimeException) streamException)
390+
.getStatus()
391+
.getCode());
392+
this.handleErrorOrComplete(true);
393+
} else {
394+
log.debug(
395+
"Exception in event stream connection, streamException {}, will reconnect",
396+
streamException);
397+
this.handleErrorOrComplete(false);
398+
}
399+
shouldThrottle.set(true);
393400
break;
394-
default:
395-
log.debug("Unhandled event type {}", response.getType());
401+
}
402+
403+
successfulConnection.set(true);
404+
final EventStreamResponse response = taken.getResponse();
405+
log.debug("Got stream response: {}", response);
406+
407+
switch (response.getType()) {
408+
case Constants.CONFIGURATION_CHANGE:
409+
this.handleConfigurationChangeEvent(response);
410+
break;
411+
case Constants.PROVIDER_READY:
412+
this.handleProviderReadyEvent();
413+
break;
414+
default:
415+
log.debug("Unhandled event type {}", response.getType());
416+
}
396417
}
418+
} catch (InterruptedException ie) {
419+
log.debug("Stream observer interrupted, most likely shutdown was invoked", ie);
420+
Thread.currentThread().interrupt();
397421
}
398422
}
399423

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ private static Object getPrivateField(Object instance, String fieldName) {
176176
}
177177

178178
@Test
179-
void syncInitError_DoesNotBusyWait() throws Exception {
179+
void syncInitError_RetriesWithNonBlockingBackoff() throws Exception {
180180
// make sure we do not spin in a busy loop on immediately errors
181181

182182
int maxBackoffMs = 1000;
@@ -201,7 +201,7 @@ void syncInitError_DoesNotBusyWait() throws Exception {
201201
}
202202

203203
@Test
204-
void asyncInitError_DoesNotBusyWait() throws Exception {
204+
void asyncInitError_RetriesWithNonBlockingBackoff() throws Exception {
205205
// make sure we do not spin in a busy loop on async errors
206206

207207
int maxBackoffMs = 1000;

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/rpc/RpcResolverTest.java

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class RpcResolverTest {
3131
private ChannelConnector mockConnector;
3232
private ServiceBlockingStub blockingStub;
3333
private ServiceStub stub;
34+
private ServiceStub errorStub;
3435
private QueueingStreamObserver<EventStreamResponse> observer;
3536
private TriConsumer<ProviderEvent, ProviderEventDetails, Structure> consumer;
3637
private CountDownLatch latch; // used to wait for observer to be initialized
@@ -61,6 +62,28 @@ public Void answer(InvocationOnMock invocation) {
6162
})
6263
.when(stub)
6364
.eventStream(any(), any()); // Mock the initialize method
65+
66+
// stub that fires onError shortly after every eventStream call
67+
errorStub = mock(ServiceStub.class);
68+
when(errorStub.withDeadlineAfter(anyLong(), any())).thenReturn(errorStub);
69+
doAnswer((Answer<Void>) invocation -> {
70+
@SuppressWarnings("unchecked")
71+
QueueingStreamObserver<EventStreamResponse> obs = (QueueingStreamObserver<EventStreamResponse>)
72+
invocation.getArguments()[1];
73+
new Thread(() -> {
74+
try {
75+
Thread.sleep(10);
76+
} catch (InterruptedException e) {
77+
throw new RuntimeException(e);
78+
}
79+
obs.onError(new Exception("error"));
80+
latch.countDown();
81+
})
82+
.start();
83+
return null;
84+
})
85+
.when(errorStub)
86+
.eventStream(any(), any());
6487
}
6588

6689
@Test
@@ -102,8 +125,13 @@ void onNextWithChangedRunsConsumerWithChanged() throws Exception {
102125

103126
@Test
104127
void onCompletedRerunsStreamWithError() throws Exception {
105-
RpcResolver resolver =
106-
new RpcResolver(FlagdOptions.builder().build(), null, consumer, stub, blockingStub, mockConnector);
128+
RpcResolver resolver = new RpcResolver(
129+
FlagdOptions.builder().retryBackoffMaxMs(100).build(),
130+
null,
131+
consumer,
132+
stub,
133+
blockingStub,
134+
mockConnector);
107135
resolver.init();
108136
latch.await();
109137

@@ -118,8 +146,13 @@ void onCompletedRerunsStreamWithError() throws Exception {
118146

119147
@Test
120148
void onErrorRunsConsumerWithError() throws Exception {
121-
RpcResolver resolver =
122-
new RpcResolver(FlagdOptions.builder().build(), null, consumer, stub, blockingStub, mockConnector);
149+
RpcResolver resolver = new RpcResolver(
150+
FlagdOptions.builder().retryBackoffMaxMs(100).build(),
151+
null,
152+
consumer,
153+
stub,
154+
blockingStub,
155+
mockConnector);
123156
resolver.init();
124157
latch.await();
125158

@@ -131,4 +164,51 @@ void onErrorRunsConsumerWithError() throws Exception {
131164
// should have restarted the stream (2 calls)
132165
await().untilAsserted(() -> verify(stub, times(2)).eventStream(any(), any()));
133166
}
167+
168+
@Test
169+
void onError_RetriesWithNonBlockingBackoff() throws Exception {
170+
// make sure we do not spin in a busy loop on immediate errors
171+
int maxBackoffMs = 1000;
172+
RpcResolver resolver = new RpcResolver(
173+
FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(),
174+
null,
175+
consumer,
176+
errorStub,
177+
blockingStub,
178+
mockConnector);
179+
resolver.init();
180+
latch.await();
181+
182+
// wait 1.5x our delay for retries
183+
Thread.sleep(maxBackoffMs + (maxBackoffMs / 2));
184+
185+
// should have retried the stream (2 calls); initial + 1 retry
186+
// it's very important that the retry count is low, to confirm no busy-loop
187+
verify(errorStub, times(2)).eventStream(any(), any());
188+
}
189+
190+
@Test
191+
void onCompleted_RetriesWithNonBlockingBackoff() throws Exception {
192+
// make sure we do not spin in a busy loop on stream completion
193+
int maxBackoffMs = 1000;
194+
RpcResolver resolver = new RpcResolver(
195+
FlagdOptions.builder().retryBackoffMaxMs(maxBackoffMs).build(),
196+
null,
197+
consumer,
198+
stub,
199+
blockingStub,
200+
mockConnector);
201+
resolver.init();
202+
latch.await();
203+
204+
// fire completion
205+
observer.onCompleted();
206+
207+
// wait 1.5x our delay for retries
208+
Thread.sleep(maxBackoffMs + (maxBackoffMs / 2));
209+
210+
// should have retried the stream (2 calls); initial + 1 retry
211+
// it's very important that the retry count is low, to confirm no busy-loop
212+
verify(stub, times(2)).eventStream(any(), any());
213+
}
134214
}

0 commit comments

Comments
 (0)