Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions grpc-circuitbreaker-utils/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
plugins {
`java-library`
jacoco
id("org.hypertrace.publish-plugin")
id("org.hypertrace.jacoco-report-plugin")
}

dependencies {

api(platform("io.grpc:grpc-bom:1.68.3"))
api("io.grpc:grpc-api")

implementation("org.slf4j:slf4j-api:1.7.36")
implementation("io.github.resilience4j:resilience4j-circuitbreaker:1.7.1")
implementation("com.typesafe:config:1.4.2")
implementation("com.google.inject:guice:7.0.0")

annotationProcessor("org.projectlombok:lombok:1.18.24")
compileOnly("org.projectlombok:lombok:1.18.24")

testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
testImplementation("org.mockito:mockito-core:5.8.0")
}

tasks.test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.hypertrace.circuitbreaker.grpcutils;

import com.typesafe.config.Config;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CircuitBreakerConfigProvider {

public static final String DEFAULT_CONFIG_KEY = "default";

// Whether to enable circuit breaker or not.
private static final String ENABLED = "enabled";

// Percentage of failures to trigger OPEN state
private static final String FAILURE_RATE_THRESHOLD = "failureRateThreshold";
// Percentage of slow calls to trigger OPEN state
private static final String SLOW_CALL_RATE_THRESHOLD = "slowCallRateThreshold";
// Define what a "slow" call is
private static final String SLOW_CALL_DURATION_THRESHOLD = "slowCallDurationThreshold";
// Number of calls to consider in the sliding window
private static final String SLIDING_WINDOW_SIZE = "slidingWindowSize";
// Time before retrying after OPEN state
private static final String WAIT_DURATION_IN_OPEN_STATE = "waitDurationInOpenState";
// Minimum calls before evaluating failure rate
private static final String MINIMUM_NUMBER_OF_CALLS = "minimumNumberOfCalls";
// Calls allowed in HALF_OPEN state before deciding to
// CLOSE or OPEN again
private static final String PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE =
"permittedNumberOfCallsInHalfOpenState";
private static final String SLIDING_WINDOW_TYPE = "slidingWindowType";

// Global flag for circuit breaker enablement
private boolean circuitBreakerEnabled = false;
private Map<String, CircuitBreakerConfiguration> circuitBreakerConfigurationMap;

public CircuitBreakerConfigProvider(Config config) {
this.initialize(config);
}

/** Checks if Circuit Breaker is globally enabled. */
public boolean isCircuitBreakerEnabled() {
return circuitBreakerEnabled;
}

private void initialize(Config circuitBreakerConfig) {
circuitBreakerEnabled =
circuitBreakerConfig.hasPath(ENABLED) && circuitBreakerConfig.getBoolean(ENABLED);
this.circuitBreakerConfigurationMap =
circuitBreakerConfig.root().keySet().stream()
.filter(key -> !key.equals(ENABLED)) // Ignore the global enabled flag
.collect(
Collectors.toMap(
key -> key, // Circuit breaker key
key -> createCircuitBreakerConfig(circuitBreakerConfig.getConfig(key))));
log.info(
"Loaded {} circuit breaker configurations, Global Enabled: {}. Configs: {}",
circuitBreakerConfigurationMap.size(),
circuitBreakerEnabled,
circuitBreakerConfigurationMap);
}

public Map<String, CircuitBreakerConfiguration> getConfigMap() {
return circuitBreakerConfigurationMap;
}

private CircuitBreakerConfiguration createCircuitBreakerConfig(Config config) {
return CircuitBreakerConfiguration.builder()
.failureRateThreshold((float) config.getDouble(FAILURE_RATE_THRESHOLD))
.slowCallRateThreshold((float) config.getDouble(SLOW_CALL_RATE_THRESHOLD))
.slowCallDurationThreshold(config.getDuration(SLOW_CALL_DURATION_THRESHOLD))
.slidingWindowType(getSlidingWindowType(config.getString(SLIDING_WINDOW_TYPE)))
.slidingWindowSize(config.getInt(SLIDING_WINDOW_SIZE))
.waitDurationInOpenState(config.getDuration(WAIT_DURATION_IN_OPEN_STATE))
.permittedNumberOfCallsInHalfOpenState(
config.getInt(PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE))
.minimumNumberOfCalls(config.getInt(MINIMUM_NUMBER_OF_CALLS))
.build();
}

private CircuitBreakerConfiguration.SlidingWindowType getSlidingWindowType(
String slidingWindowType) {
return CircuitBreakerConfiguration.SlidingWindowType.valueOf(slidingWindowType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.hypertrace.circuitbreaker.grpcutils;

import java.time.Duration;
import lombok.Builder;
import lombok.Setter;
import lombok.Value;

@Value
@Builder
@Setter
public class CircuitBreakerConfiguration {
// Percentage of failures to trigger OPEN state
float failureRateThreshold;
// Percentage of slow calls to trigger OPEN state
float slowCallRateThreshold;
// Define what a "slow" call is
Duration slowCallDurationThreshold;
// Number of calls to consider in the sliding window
SlidingWindowType slidingWindowType;
int slidingWindowSize;
// Time before retrying after OPEN state
Duration waitDurationInOpenState;
// Minimum calls before evaluating failure rate
int minimumNumberOfCalls;
// Calls allowed in HALF_OPEN state before deciding to
// CLOSE or OPEN again
int permittedNumberOfCallsInHalfOpenState;

public enum SlidingWindowType {
COUNT_BASED,
TIME_BASED
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.hypertrace.circuitbreaker.grpcutils;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;

public abstract class CircuitBreakerInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
if (!isCircuitBreakerEnabled()) {
return next.newCall(method, callOptions);

Check warning on line 14 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerInterceptor.java#L14

Added line #L14 was not covered by tests
}
return createInterceptedCall(method, callOptions, next);
}

protected abstract boolean isCircuitBreakerEnabled();

protected abstract <ReqT, RespT> ClientCall<ReqT, RespT> createInterceptedCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.hypertrace.circuitbreaker.grpcutils.resilience;

import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import java.util.Map;
import java.util.stream.Collectors;
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerConfiguration;

/** Utility class to parse CircuitBreakerConfiguration to Resilience4j CircuitBreakerConfig */
public class ResilienceCircuitBreakerConfigParser {

Check warning on line 9 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerConfigParser.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerConfigParser.java#L9

Added line #L9 was not covered by tests

public static Map<String, CircuitBreakerConfig> getCircuitBreakerConfigs(
Map<String, CircuitBreakerConfiguration> configurationMap) {
return configurationMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> getConfig(entry.getValue())));
}

static CircuitBreakerConfig getConfig(CircuitBreakerConfiguration configuration) {
return CircuitBreakerConfig.custom()
.failureRateThreshold(configuration.getFailureRateThreshold())
.slowCallRateThreshold(configuration.getSlowCallRateThreshold())
.slowCallDurationThreshold(configuration.getSlowCallDurationThreshold())
.slidingWindowType(getSlidingWindowType(configuration))
.slidingWindowSize(configuration.getSlidingWindowSize())
.waitDurationInOpenState(configuration.getWaitDurationInOpenState())
.permittedNumberOfCallsInHalfOpenState(
configuration.getPermittedNumberOfCallsInHalfOpenState())
.minimumNumberOfCalls(configuration.getMinimumNumberOfCalls())
.build();
}

private static CircuitBreakerConfig.SlidingWindowType getSlidingWindowType(
CircuitBreakerConfiguration configuration) {
switch (configuration.getSlidingWindowType()) {
case COUNT_BASED:
return CircuitBreakerConfig.SlidingWindowType.COUNT_BASED;
case TIME_BASED:
default:
return CircuitBreakerConfig.SlidingWindowType.TIME_BASED;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package org.hypertrace.circuitbreaker.grpcutils.resilience;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Singleton;
import com.typesafe.config.Config;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerConfigProvider;
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerInterceptor;

@Slf4j
@Singleton
public class ResilienceCircuitBreakerInterceptor extends CircuitBreakerInterceptor {

public static final CallOptions.Key<String> CIRCUIT_BREAKER_KEY =
CallOptions.Key.createWithDefault("circuitBreakerKey", "default");
private final CircuitBreakerRegistry resilicenceCircuitBreakerRegistry;
private final CircuitBreakerConfigProvider circuitBreakerConfigProvider;
private final Map<String, CircuitBreakerConfig> resilienceCircuitBreakerConfig;
private final ResilienceCircuitBreakerProvider resilienceCircuitBreakerProvider;
private final Clock clock;

public ResilienceCircuitBreakerInterceptor(Config config, Clock clock) {
this.circuitBreakerConfigProvider = new CircuitBreakerConfigProvider(config);
this.resilienceCircuitBreakerConfig =
ResilienceCircuitBreakerConfigParser.getCircuitBreakerConfigs(
circuitBreakerConfigProvider.getConfigMap());
this.resilicenceCircuitBreakerRegistry =

Check warning on line 43 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L38-L43

Added lines #L38 - L43 were not covered by tests
new ResilienceCircuitBreakerRegistryProvider(resilienceCircuitBreakerConfig)
.getCircuitBreakerRegistry();
this.resilienceCircuitBreakerProvider =

Check warning on line 46 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L45-L46

Added lines #L45 - L46 were not covered by tests
new ResilienceCircuitBreakerProvider(
resilicenceCircuitBreakerRegistry, resilienceCircuitBreakerConfig);
this.clock = clock;
}

Check warning on line 50 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L49-L50

Added lines #L49 - L50 were not covered by tests

@VisibleForTesting
public ResilienceCircuitBreakerInterceptor(
Config config,
Clock clock,
CircuitBreakerRegistry resilicenceCircuitBreakerRegistry,
ResilienceCircuitBreakerProvider resilienceCircuitBreakerProvider) {
this.circuitBreakerConfigProvider = new CircuitBreakerConfigProvider(config);
this.resilienceCircuitBreakerConfig =
ResilienceCircuitBreakerConfigParser.getCircuitBreakerConfigs(
circuitBreakerConfigProvider.getConfigMap());
this.resilicenceCircuitBreakerRegistry = resilicenceCircuitBreakerRegistry;
this.resilienceCircuitBreakerProvider = resilienceCircuitBreakerProvider;
this.clock = clock;
}

@Override
protected boolean isCircuitBreakerEnabled() {
return circuitBreakerConfigProvider.isCircuitBreakerEnabled();
}

@Override
protected <ReqT, RespT> ClientCall<ReqT, RespT> createInterceptedCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// Get circuit breaker key from CallOptions
String circuitBreakerKey = callOptions.getOption(CIRCUIT_BREAKER_KEY);
CircuitBreaker circuitBreaker =
resilienceCircuitBreakerProvider.getCircuitBreaker(circuitBreakerKey);
return new ForwardingClientCall.SimpleForwardingClientCall<>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
Instant startTime = clock.instant();

Check warning on line 83 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L83

Added line #L83 was not covered by tests
// Wrap response listener to track failures
Listener<RespT> wrappedListener =
wrapListenerWithCircuitBreaker(responseListener, startTime);
super.start(wrappedListener, headers);
}

Check warning on line 88 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L85-L88

Added lines #L85 - L88 were not covered by tests

@Override
public void sendMessage(ReqT message) {
if (!circuitBreaker.tryAcquirePermission()) {
logCircuitBreakerRejection(circuitBreakerKey, circuitBreaker);
String rejectionReason =
circuitBreaker.getState() == CircuitBreaker.State.HALF_OPEN
? "Circuit Breaker is HALF-OPEN and rejecting excess requests"

Check warning on line 96 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L96

Added line #L96 was not covered by tests
: "Circuit Breaker is OPEN and blocking requests";
throw Status.RESOURCE_EXHAUSTED.withDescription(rejectionReason).asRuntimeException();
}
super.sendMessage(message);
}

Check warning on line 101 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L100-L101

Added lines #L100 - L101 were not covered by tests

private ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>
wrapListenerWithCircuitBreaker(Listener<RespT> responseListener, Instant startTime) {
return new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(
responseListener) {

Check warning on line 106 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L105-L106

Added lines #L105 - L106 were not covered by tests
@Override
public void onClose(Status status, Metadata trailers) {
long duration = Duration.between(startTime, clock.instant()).toNanos();

Check warning on line 109 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L109

Added line #L109 was not covered by tests
if (status.isOk()) {
circuitBreaker.onSuccess(duration, TimeUnit.NANOSECONDS);

Check warning on line 111 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L111

Added line #L111 was not covered by tests
} else {
log.debug(

Check warning on line 113 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L113

Added line #L113 was not covered by tests
"Circuit Breaker '{}' detected failure. Status: {}, Description: {}",
circuitBreaker.getName(),
status.getCode(),
status.getDescription());
circuitBreaker.onError(duration, TimeUnit.NANOSECONDS, status.asRuntimeException());

Check warning on line 118 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L115-L118

Added lines #L115 - L118 were not covered by tests
}
super.onClose(status, trailers);
}

Check warning on line 121 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L120-L121

Added lines #L120 - L121 were not covered by tests
};
}
};
}

private void logCircuitBreakerRejection(String circuitBreakerKey, CircuitBreaker circuitBreaker) {
Map<CircuitBreaker.State, String> stateMessages =
Map.of(
CircuitBreaker.State.HALF_OPEN, "is HALF-OPEN and rejecting excess requests.",
CircuitBreaker.State.OPEN, "is OPEN and blocking requests");
log.debug(
"Circuit Breaker '{}' {}",
circuitBreakerKey,
stateMessages.getOrDefault(circuitBreaker.getState(), "is in an unexpected state"));
}
}
Loading
Loading