Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import dev.openfeature.sdk.Reason;
import dev.openfeature.sdk.Structure;
import dev.openfeature.sdk.Value;
import dev.openfeature.sdk.exceptions.FatalError;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
Expand All @@ -41,29 +42,46 @@
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

class DDEvaluator implements Evaluator, FeatureFlaggingGateway.ConfigListener {
class DDEvaluator
implements Evaluator,
FeatureFlaggingGateway.ConfigListener,
FeatureFlaggingGateway.FatalErrorListener {

private static final Set<Class<?>> SUPPORTED_RESOLUTION_TYPES =
new HashSet<>(asList(String.class, Boolean.class, Integer.class, Double.class, Value.class));

private final Runnable configCallback;
private final Runnable fatalCallback;
private final AtomicReference<ServerConfiguration> configuration = new AtomicReference<>();
private final CountDownLatch initializationLatch = new CountDownLatch(1);
private volatile String fatalErrorMessage = null;

public DDEvaluator(final Runnable configCallback) {
this(configCallback, () -> {});
}

public DDEvaluator(final Runnable configCallback, final Runnable fatalCallback) {
this.configCallback = configCallback;
this.fatalCallback = fatalCallback;
}

@Override
public boolean initialize(
final long timeout, final TimeUnit unit, final EvaluationContext context) throws Exception {
FeatureFlaggingGateway.addConfigListener(this);
return initializationLatch.await(timeout, unit); // await for initialization
FeatureFlaggingGateway.addFatalErrorListener(this);
initializationLatch.await(timeout, unit); // await for initialization or fatal error
final String fatal = fatalErrorMessage;
if (fatal != null) {
throw new FatalError(fatal);
}
return configuration.get() != null;
}

@Override
public void shutdown() {
FeatureFlaggingGateway.removeConfigListener(this);
FeatureFlaggingGateway.removeFatalErrorListener(this);
}

@Override
Expand All @@ -73,13 +91,24 @@ public void accept(final ServerConfiguration config) {
configCallback.run();
}

@Override
public void onFatalError(final int httpStatus, final String message) {
fatalErrorMessage = message != null ? message : "RC fatal error (HTTP " + httpStatus + ")";
initializationLatch.countDown();
fatalCallback.run();
}

@Override
public <T> ProviderEvaluation<T> evaluate(
final Class<T> target,
final String key,
final T defaultValue,
final EvaluationContext context) {
try {
final String fatal = fatalErrorMessage;
if (fatal != null) {
return error(defaultValue, ErrorCode.PROVIDER_FATAL, fatal);
}
final ServerConfiguration config = configuration.get();
if (config == null) {
return error(defaultValue, ErrorCode.PROVIDER_NOT_READY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,22 @@ private void onConfigurationChange() {
}
}

private void onProviderFatal() {
emit(
ProviderEvent.PROVIDER_ERROR,
ProviderEventDetails.builder()
.message("Remote configuration permanently rejected")
.build());
}

private Evaluator buildEvaluator() throws Exception {
if (evaluator != null) {
return evaluator;
}
final Class<?> evaluatorClass = loadEvaluatorClass();
final Constructor<?> ctor = evaluatorClass.getConstructor(Runnable.class);
return (Evaluator) ctor.newInstance((Runnable) this::onConfigurationChange);
final Constructor<?> ctor = evaluatorClass.getConstructor(Runnable.class, Runnable.class);
return (Evaluator)
ctor.newInstance((Runnable) this::onConfigurationChange, (Runnable) this::onProviderFatal);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.trace.api.openfeature;

import static dev.openfeature.sdk.ErrorCode.FLAG_NOT_FOUND;
import static dev.openfeature.sdk.ErrorCode.PROVIDER_FATAL;
import static dev.openfeature.sdk.ErrorCode.TARGETING_KEY_MISSING;
import static dev.openfeature.sdk.Reason.DEFAULT;
import static dev.openfeature.sdk.Reason.DISABLED;
Expand Down Expand Up @@ -147,6 +148,38 @@ public void testEvaluateNoConfig() {
assertThat(details.getErrorCode(), equalTo(ErrorCode.PROVIDER_NOT_READY));
}

@Test
public void testEvaluateAfterFatalError() {
final DDEvaluator evaluator = new DDEvaluator(mock(Runnable.class), mock(Runnable.class));
evaluator.onFatalError(403, "Unauthorized API key");
final ProviderEvaluation<?> details =
evaluator.evaluate(Integer.class, "test", 23, mock(EvaluationContext.class));
assertThat(details.getValue(), equalTo(23));
assertThat(details.getReason(), equalTo(ERROR.name()));
assertThat(details.getErrorCode(), equalTo(PROVIDER_FATAL));
}

@Test
public void testFatalErrorTakesPrecedenceOverConfig() {
final DDEvaluator evaluator = new DDEvaluator(mock(Runnable.class), mock(Runnable.class));
// Config received, then a fatal error arrives post-init
evaluator.accept(mock(ServerConfiguration.class));
evaluator.onFatalError(401, "RC permanently rejected");
final ProviderEvaluation<?> details =
evaluator.evaluate(Integer.class, "test", 23, mock(EvaluationContext.class));
assertThat(details.getValue(), equalTo(23));
assertThat(details.getReason(), equalTo(ERROR.name()));
assertThat(details.getErrorCode(), equalTo(PROVIDER_FATAL));
}

@Test
public void testFatalCallbackInvoked() {
final Runnable fatalCallback = mock(Runnable.class);
final DDEvaluator evaluator = new DDEvaluator(mock(Runnable.class), fatalCallback);
evaluator.onFatalError(403, "Bad credentials");
verify(fatalCallback, times(1)).run();
}

@Test
public void testEvaluateNoContext() {
final DDEvaluator evaluator = new DDEvaluator(mock(Runnable.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,19 @@ public interface ConfigListener extends Consumer<ServerConfiguration> {}

public interface ExposureListener extends Consumer<ExposureEvent> {}

/**
* Listener notified when a non-retryable fatal error is received from the RC endpoint (e.g. HTTP
* 401 Unauthorized). Implementations should transition the OpenFeature provider to
* PROVIDER_FATAL.
*/
public interface FatalErrorListener {
void onFatalError(int httpStatus, String message);
}

private static final List<ConfigListener> CONFIG_LISTENERS = new CopyOnWriteArrayList<>();
private static final List<ExposureListener> EXPOSURE_LISTENERS = new CopyOnWriteArrayList<>();
private static final List<FatalErrorListener> FATAL_ERROR_LISTENERS =
new CopyOnWriteArrayList<>();

private static final AtomicReference<ServerConfiguration> CURRENT_CONFIG =
new AtomicReference<>();
Expand Down Expand Up @@ -49,4 +60,16 @@ public static void removeExposureListener(final ExposureListener listener) {
public static void dispatch(final ExposureEvent event) {
EXPOSURE_LISTENERS.forEach(listener -> listener.accept(event));
}

public static void addFatalErrorListener(final FatalErrorListener listener) {
FATAL_ERROR_LISTENERS.add(listener);
}

public static void removeFatalErrorListener(final FatalErrorListener listener) {
FATAL_ERROR_LISTENERS.remove(listener);
}

public static void dispatchFatalError(final int httpStatus, final String message) {
FATAL_ERROR_LISTENERS.forEach(listener -> listener.onFatalError(httpStatus, message));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class RemoteConfigServiceImpl
implements RemoteConfigService, ConfigurationChangesTypedListener<ServerConfiguration> {

private final ConfigurationPoller configurationPoller;
private final ConfigurationPoller.NonRetryableErrorListener nonRetryableErrorListener =
FeatureFlaggingGateway::dispatchFatalError;

public RemoteConfigServiceImpl(final SharedCommunicationObjects sco, final Config config) {
configurationPoller = sco.configurationPoller(config);
Expand All @@ -36,13 +38,15 @@ public void init() {
configurationPoller.addCapabilities(Capabilities.CAPABILITY_FFE_FLAG_CONFIGURATION_RULES);
configurationPoller.addListener(
Product.FFE_FLAGS, UniversalFlagConfigDeserializer.INSTANCE, this);
configurationPoller.addNonRetryableErrorListener(nonRetryableErrorListener);
configurationPoller.start();
}

@Override
public void close() {
configurationPoller.removeCapabilities(Capabilities.CAPABILITY_FFE_FLAG_CONFIGURATION_RULES);
configurationPoller.removeListeners(Product.FFE_FLAGS);
configurationPoller.removeNonRetryableErrorListener(nonRetryableErrorListener);
configurationPoller.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,17 @@ <T> void addListener(
void start();

void stop();

/**
* Registers a listener that is called when a non-retryable HTTP error (e.g. 400, 401, 403, 404)
* is received from the RC endpoint. The default implementation is a no-op.
*/
default void addNonRetryableErrorListener(NonRetryableErrorListener listener) {}

default void removeNonRetryableErrorListener(NonRetryableErrorListener listener) {}

@FunctionalInterface
interface NonRetryableErrorListener {
void onNonRetryableError(int httpStatus, String message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class DefaultConfigurationPoller
private final Map<Product, ProductState> productStates = new EnumMap<>(Product.class);
private final Map<File, ConfigurationChangesListener> fileListeners = new HashMap<>();
private final List<ConfigurationEndListener> configurationEndListeners = new ArrayList<>();
private final List<NonRetryableErrorListener> nonRetryableErrorListeners = new ArrayList<>();

private final ClientState nextClientState = new ClientState();
private final AtomicInteger startCount = new AtomicInteger(0);
Expand Down Expand Up @@ -194,6 +195,16 @@ public synchronized void removeConfigurationEndListener(ConfigurationEndListener
this.configurationEndListeners.removeIf(l -> l == listener);
}

@Override
public synchronized void addNonRetryableErrorListener(NonRetryableErrorListener listener) {
this.nonRetryableErrorListeners.add(listener);
}

@Override
public synchronized void removeNonRetryableErrorListener(NonRetryableErrorListener listener) {
this.nonRetryableErrorListeners.removeIf(l -> l == listener);
}

@Override
public synchronized void addCapabilities(long flags) {
capabilities |= flags;
Expand Down Expand Up @@ -326,10 +337,6 @@ List<ConfigState> getConfigState() {

void sendRequest(Consumer<ResponseBody> responseBodyConsumer) throws IOException {
try (Response response = fetchConfiguration()) {
if (response.code() == 404) {
log.debug("Remote configuration endpoint is disabled");
return;
}
if (response.code() == 204) {
log.debug("No configuration changes (HTTP 204 No Content)");
return;
Expand All @@ -344,13 +351,15 @@ void sendRequest(Consumer<ResponseBody> responseBodyConsumer) throws IOException
return;
}
// Retrieve body content for detailed error messages
String bodyString = null;
if (body != null) {
try {
bodyString = body.string();
ratelimitedLogger.warn(
"Failed to retrieve remote configuration: unexpected response code {} {} {}",
response.message(),
response.code(),
body.string());
bodyString);
} catch (IOException ex) {
ExceptionHelper.rateLimitedLogException(
ratelimitedLogger, log, ex, "Error while getting error message body");
Expand All @@ -361,6 +370,31 @@ void sendRequest(Consumer<ResponseBody> responseBodyConsumer) throws IOException
response.message(),
response.code());
}
// Non-retryable 4xx responses (e.g. 401 Unauthorized, 403 Forbidden) indicate a permanent
// configuration error. Notify listeners so they can transition to a fatal state.
if (isNonRetryableError(response.code())) {
final String message =
"Remote configuration rejected with HTTP "
+ response.code()
+ " "
+ response.message()
+ (bodyString != null ? ": " + bodyString : "");
notifyNonRetryableErrorListeners(response.code(), message);
}
}
}

private static boolean isNonRetryableError(int code) {
return code == 400 || code == 401 || code == 403 || code == 404;
}

private synchronized void notifyNonRetryableErrorListeners(int code, String message) {
for (NonRetryableErrorListener listener : nonRetryableErrorListeners) {
try {
listener.onNonRetryableError(code, message);
} catch (RuntimeException ex) {
log.warn("Error notifying non-retryable error listener", ex);
}
}
}

Expand Down
Loading