Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions grpc-circuitbreaker-utils/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
plugins {
`java-library`
jacoco
id("org.hypertrace.publish-plugin")
id("org.hypertrace.jacoco-report-plugin")
}

dependencies {

api(platform("io.grpc:grpc-bom:1.68.3"))
api("io.grpc:grpc-api")
api(project(":grpc-context-utils"))

implementation("org.slf4j:slf4j-api:1.7.36")
implementation("io.github.resilience4j:resilience4j-circuitbreaker:1.7.1")
implementation("com.typesafe:config:1.4.2")
implementation("com.google.inject:guice:7.0.0")

annotationProcessor("org.projectlombok:lombok:1.18.24")
compileOnly("org.projectlombok:lombok:1.18.24")

testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
testImplementation("org.mockito:mockito-core:5.8.0")
testImplementation("org.mockito:mockito-junit-jupiter:5.8.0")
}

tasks.test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.hypertrace.circuitbreaker.grpcutils;

import com.typesafe.config.Config;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CircuitBreakerConfigParser {

Check warning on line 9 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java#L8-L9

Added lines #L8 - L9 were not covered by tests

public static final String DEFAULT_CONFIG_KEY = "default";

// Percentage of failures to trigger OPEN state
private static final String FAILURE_RATE_THRESHOLD = "failureRateThreshold";
// Percentage of slow calls to trigger OPEN state
private static final String SLOW_CALL_RATE_THRESHOLD = "slowCallRateThreshold";
// Define what a "slow" call is
private static final String SLOW_CALL_DURATION_THRESHOLD = "slowCallDurationThreshold";
// Number of calls to consider in the sliding window
private static final String SLIDING_WINDOW_SIZE = "slidingWindowSize";
// Time before retrying after OPEN state
private static final String WAIT_DURATION_IN_OPEN_STATE = "waitDurationInOpenState";
// Minimum calls before evaluating failure rate
private static final String MINIMUM_NUMBER_OF_CALLS = "minimumNumberOfCalls";
// Calls allowed in HALF_OPEN state before deciding to
// CLOSE or OPEN again
private static final String PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE =
"permittedNumberOfCallsInHalfOpenState";
private static final String SLIDING_WINDOW_TYPE = "slidingWindowType";

public static <T> CircuitBreakerConfiguration.CircuitBreakerConfigurationBuilder<T> parseConfig(
Config config) {
CircuitBreakerConfiguration.CircuitBreakerConfigurationBuilder<T> builder =
CircuitBreakerConfiguration.builder();
Map<String, CircuitBreakerThresholds> circuitBreakerThresholdsMap =
config.root().keySet().stream()
.collect(
Collectors.toMap(
key -> key, // Circuit breaker key
key -> buildCircuitBreakerThresholds(config.getConfig(key))));
builder.circuitBreakerThresholdsMap(circuitBreakerThresholdsMap);
log.info("Loaded circuit breaker configs: {}", circuitBreakerThresholdsMap);
return builder;

Check warning on line 43 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java#L34-L43

Added lines #L34 - L43 were not covered by tests
}

private static CircuitBreakerThresholds buildCircuitBreakerThresholds(Config config) {
return CircuitBreakerThresholds.builder()
.failureRateThreshold((float) config.getDouble(FAILURE_RATE_THRESHOLD))
.slowCallRateThreshold((float) config.getDouble(SLOW_CALL_RATE_THRESHOLD))
.slowCallDurationThreshold(config.getDuration(SLOW_CALL_DURATION_THRESHOLD))
.slidingWindowType(getSlidingWindowType(config.getString(SLIDING_WINDOW_TYPE)))
.slidingWindowSize(config.getInt(SLIDING_WINDOW_SIZE))
.waitDurationInOpenState(config.getDuration(WAIT_DURATION_IN_OPEN_STATE))
.permittedNumberOfCallsInHalfOpenState(
config.getInt(PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE))
.minimumNumberOfCalls(config.getInt(MINIMUM_NUMBER_OF_CALLS))
.build();

Check warning on line 57 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java#L47-L57

Added lines #L47 - L57 were not covered by tests
}

private static CircuitBreakerThresholds.SlidingWindowType getSlidingWindowType(
String slidingWindowType) {
return CircuitBreakerThresholds.SlidingWindowType.valueOf(slidingWindowType);

Check warning on line 62 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfigParser.java#L62

Added line #L62 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.hypertrace.circuitbreaker.grpcutils;

import java.util.Map;
import java.util.function.BiFunction;
import lombok.Builder;
import lombok.Value;
import org.hypertrace.core.grpcutils.context.RequestContext;

@Value
@Builder

Check warning on line 10 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfiguration.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfiguration.java#L10

Added line #L10 was not covered by tests
public class CircuitBreakerConfiguration<T> {
Class<T> requestClass;
BiFunction<RequestContext, T, String> keyFunction;
boolean enabled;
Map<String, CircuitBreakerThresholds> circuitBreakerThresholdsMap;

Check warning on line 15 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfiguration.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerConfiguration.java#L12-L15

Added lines #L12 - L15 were not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.hypertrace.circuitbreaker.grpcutils;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;

public abstract class CircuitBreakerInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
if (!isCircuitBreakerEnabled()) {
return next.newCall(method, callOptions);

Check warning on line 14 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerInterceptor.java#L14

Added line #L14 was not covered by tests
}
return createInterceptedCall(method, callOptions, next);

Check warning on line 16 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerInterceptor.java#L16

Added line #L16 was not covered by tests
}

protected abstract boolean isCircuitBreakerEnabled();

protected abstract <ReqT, RespT> ClientCall<ReqT, RespT> createInterceptedCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.hypertrace.circuitbreaker.grpcutils;

import java.time.Duration;
import lombok.Builder;
import lombok.Value;

@Value
@Builder
public class CircuitBreakerThresholds {
// Percentage of failures to trigger OPEN state
float failureRateThreshold;
// Percentage of slow calls to trigger OPEN state
float slowCallRateThreshold;
// Define what a "slow" call is
Duration slowCallDurationThreshold;
// Number of calls to consider in the sliding window
SlidingWindowType slidingWindowType;
int slidingWindowSize;
// Time before retrying after OPEN state
Duration waitDurationInOpenState;
// Minimum calls before evaluating failure rate
int minimumNumberOfCalls;
// Calls allowed in HALF_OPEN state before deciding to
// CLOSE or OPEN again
int permittedNumberOfCallsInHalfOpenState;

public enum SlidingWindowType {
COUNT_BASED,
TIME_BASED
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.hypertrace.circuitbreaker.grpcutils.resilience;

import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import java.util.Map;
import java.util.stream.Collectors;
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerThresholds;

/** Utility class to parse CircuitBreakerConfiguration to Resilience4j CircuitBreakerConfig */
public class ResilienceCircuitBreakerConfigParser {

Check warning on line 9 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerConfigParser.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerConfigParser.java#L9

Added line #L9 was not covered by tests

public static Map<String, CircuitBreakerConfig> getCircuitBreakerConfigs(
Map<String, CircuitBreakerThresholds> configurationMap) {
return configurationMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> getConfig(entry.getValue())));
}

static CircuitBreakerConfig getConfig(CircuitBreakerThresholds configuration) {
return CircuitBreakerConfig.custom()
.failureRateThreshold(configuration.getFailureRateThreshold())
.slowCallRateThreshold(configuration.getSlowCallRateThreshold())
.slowCallDurationThreshold(configuration.getSlowCallDurationThreshold())
.slidingWindowType(getSlidingWindowType(configuration.getSlidingWindowType()))
.slidingWindowSize(configuration.getSlidingWindowSize())
.waitDurationInOpenState(configuration.getWaitDurationInOpenState())
.permittedNumberOfCallsInHalfOpenState(
configuration.getPermittedNumberOfCallsInHalfOpenState())
.minimumNumberOfCalls(configuration.getMinimumNumberOfCalls())
.build();
}

private static CircuitBreakerConfig.SlidingWindowType getSlidingWindowType(
CircuitBreakerThresholds.SlidingWindowType slidingWindowType) {
if (slidingWindowType == null) {
return CircuitBreakerConfig.SlidingWindowType.TIME_BASED;
}
switch (slidingWindowType) {
case COUNT_BASED:
return CircuitBreakerConfig.SlidingWindowType.COUNT_BASED;

Check warning on line 38 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerConfigParser.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerConfigParser.java#L38

Added line #L38 was not covered by tests
case TIME_BASED:
default:
return CircuitBreakerConfig.SlidingWindowType.TIME_BASED;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package org.hypertrace.circuitbreaker.grpcutils.resilience;

import com.google.common.annotations.VisibleForTesting;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerConfiguration;
import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerInterceptor;
import org.hypertrace.core.grpcutils.context.RequestContext;

@Slf4j
public class ResilienceCircuitBreakerInterceptor extends CircuitBreakerInterceptor {

private final CircuitBreakerRegistry resilicenceCircuitBreakerRegistry;
private final Map<String, CircuitBreakerConfig> resilienceCircuitBreakerConfigMap;
private final ResilienceCircuitBreakerProvider resilienceCircuitBreakerProvider;
private final CircuitBreakerConfiguration<?> circuitBreakerConfiguration;
private final Clock clock;

public ResilienceCircuitBreakerInterceptor(
CircuitBreakerConfiguration<?> circuitBreakerConfiguration, Clock clock) {
this.circuitBreakerConfiguration = circuitBreakerConfiguration;
this.clock = clock;
this.resilienceCircuitBreakerConfigMap =
ResilienceCircuitBreakerConfigParser.getCircuitBreakerConfigs(
circuitBreakerConfiguration.getCircuitBreakerThresholdsMap());
this.resilicenceCircuitBreakerRegistry =

Check warning on line 41 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L35-L41

Added lines #L35 - L41 were not covered by tests
new ResilienceCircuitBreakerRegistryProvider(resilienceCircuitBreakerConfigMap)
.getCircuitBreakerRegistry();
this.resilienceCircuitBreakerProvider =

Check warning on line 44 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L43-L44

Added lines #L43 - L44 were not covered by tests
new ResilienceCircuitBreakerProvider(
resilicenceCircuitBreakerRegistry, resilienceCircuitBreakerConfigMap);
}

Check warning on line 47 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L47

Added line #L47 was not covered by tests

@VisibleForTesting
public ResilienceCircuitBreakerInterceptor(
Clock clock,
CircuitBreakerRegistry resilicenceCircuitBreakerRegistry,
ResilienceCircuitBreakerProvider resilienceCircuitBreakerProvider,
CircuitBreakerConfiguration<?> circuitBreakerConfiguration) {
this.circuitBreakerConfiguration = circuitBreakerConfiguration;
this.resilienceCircuitBreakerConfigMap =
ResilienceCircuitBreakerConfigParser.getCircuitBreakerConfigs(
circuitBreakerConfiguration.getCircuitBreakerThresholdsMap());
this.resilicenceCircuitBreakerRegistry = resilicenceCircuitBreakerRegistry;
this.resilienceCircuitBreakerProvider = resilienceCircuitBreakerProvider;
this.clock = clock;
}

@Override
protected boolean isCircuitBreakerEnabled() {
return circuitBreakerConfiguration.isEnabled();

Check warning on line 66 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L66

Added line #L66 was not covered by tests
}

@Override
protected <ReqT, RespT> ClientCall<ReqT, RespT> createInterceptedCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<>(
next.newCall(method, callOptions)) {
CircuitBreaker circuitBreaker;
String circuitBreakerKey;

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
Instant startTime = clock.instant();
// Wrap response listener to track failures
Listener<RespT> wrappedListener =
wrapListenerWithCircuitBreaker(responseListener, startTime);
super.start(wrappedListener, headers);
}

@SuppressWarnings("unchecked")
@Override
public void sendMessage(ReqT message) {
CircuitBreakerConfiguration<ReqT> config =
(CircuitBreakerConfiguration<ReqT>) circuitBreakerConfiguration;
if (config.getRequestClass() == null || config.getKeyFunction() == null) {
log.debug("Circuit breaker will apply to all requests as config is not set");
circuitBreakerKey = "default";
} else {
if (!message.getClass().equals(config.getRequestClass())) {
log.warn("Invalid config for message type: {}", message.getClass());
super.sendMessage(message);

Check warning on line 97 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L96-L97

Added lines #L96 - L97 were not covered by tests
}
circuitBreakerKey = config.getKeyFunction().apply(RequestContext.CURRENT.get(), message);

Check warning on line 99 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java

View check run for this annotation

Codecov / codecov/patch

grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java#L99

Added line #L99 was not covered by tests
}
circuitBreaker = resilienceCircuitBreakerProvider.getCircuitBreaker(circuitBreakerKey);
if (!circuitBreaker.tryAcquirePermission()) {
logCircuitBreakerRejection(circuitBreakerKey, circuitBreaker);
String rejectionReason =
circuitBreaker.getState() == CircuitBreaker.State.HALF_OPEN
? "Circuit Breaker is HALF-OPEN and rejecting excess requests"
: "Circuit Breaker is OPEN and blocking requests";
throw Status.RESOURCE_EXHAUSTED.withDescription(rejectionReason).asRuntimeException();
}
super.sendMessage(message);
}

private ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>
wrapListenerWithCircuitBreaker(Listener<RespT> responseListener, Instant startTime) {
return new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(
responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
long duration = Duration.between(startTime, clock.instant()).toNanos();
if (status.isOk()) {
circuitBreaker.onSuccess(duration, TimeUnit.NANOSECONDS);
} else {
log.debug(
"Circuit Breaker '{}' detected failure. Status: {}, Description: {}",
circuitBreaker.getName(),
status.getCode(),
status.getDescription());
circuitBreaker.onError(duration, TimeUnit.NANOSECONDS, status.asRuntimeException());
}
super.onClose(status, trailers);
}
};
}
};
}

private void logCircuitBreakerRejection(String circuitBreakerKey, CircuitBreaker circuitBreaker) {
Map<CircuitBreaker.State, String> stateMessages =
Map.of(
CircuitBreaker.State.HALF_OPEN, "is HALF-OPEN and rejecting excess requests.",
CircuitBreaker.State.OPEN, "is OPEN and blocking requests");
log.debug(
"Circuit Breaker '{}' {}",
circuitBreakerKey,
stateMessages.getOrDefault(circuitBreaker.getState(), "is in an unexpected state"));
}
}
Loading
Loading