-
Notifications
You must be signed in to change notification settings - Fork 1
Add grpc circuit breaker utility using interceptors #68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
bc60d8e
faa0e02
6a058f7
878a8cf
275c854
04ed487
fa3a3c3
90ca22b
5f0ecd0
8ca26b7
75fb8b1
77b0aac
ea47f32
f163a9d
5f3b287
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| plugins { | ||
| `java-library` | ||
| jacoco | ||
| id("org.hypertrace.publish-plugin") | ||
| id("org.hypertrace.jacoco-report-plugin") | ||
| } | ||
|
|
||
| dependencies { | ||
|
|
||
| api(platform("io.grpc:grpc-bom:1.68.3")) | ||
| api("io.grpc:grpc-api") | ||
|
|
||
| implementation("org.slf4j:slf4j-api:1.7.36") | ||
| implementation("io.github.resilience4j:resilience4j-circuitbreaker:1.7.1") | ||
| implementation("com.typesafe:config:1.4.2") | ||
| implementation("com.google.inject:guice:7.0.0") | ||
|
|
||
| annotationProcessor("org.projectlombok:lombok:1.18.24") | ||
| compileOnly("org.projectlombok:lombok:1.18.24") | ||
|
|
||
| testImplementation("org.junit.jupiter:junit-jupiter:5.8.2") | ||
pavan-traceable marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| testImplementation("org.mockito:mockito-core:5.8.0") | ||
| } | ||
|
|
||
| tasks.test { | ||
| useJUnitPlatform() | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| package org.hypertrace.circuitbreaker.grpcutils; | ||
|
|
||
| import com.typesafe.config.Config; | ||
| import java.util.Map; | ||
| import java.util.stream.Collectors; | ||
| import lombok.extern.slf4j.Slf4j; | ||
|
|
||
| @Slf4j | ||
| public class CircuitBreakerConfigProvider { | ||
|
|
||
| public static final String DEFAULT_CONFIG_KEY = "default"; | ||
|
|
||
| // Whether to enable circuit breaker or not. | ||
| private static final String ENABLED = "enabled"; | ||
|
|
||
| // Percentage of failures to trigger OPEN state | ||
| private static final String FAILURE_RATE_THRESHOLD = "failureRateThreshold"; | ||
| // Percentage of slow calls to trigger OPEN state | ||
| private static final String SLOW_CALL_RATE_THRESHOLD = "slowCallRateThreshold"; | ||
| // Define what a "slow" call is | ||
| private static final String SLOW_CALL_DURATION_THRESHOLD = "slowCallDurationThreshold"; | ||
| // Number of calls to consider in the sliding window | ||
| private static final String SLIDING_WINDOW_SIZE = "slidingWindowSize"; | ||
| // Time before retrying after OPEN state | ||
| private static final String WAIT_DURATION_IN_OPEN_STATE = "waitDurationInOpenState"; | ||
| // Minimum calls before evaluating failure rate | ||
| private static final String MINIMUM_NUMBER_OF_CALLS = "minimumNumberOfCalls"; | ||
| // Calls allowed in HALF_OPEN state before deciding to | ||
| // CLOSE or OPEN again | ||
| private static final String PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE = | ||
| "permittedNumberOfCallsInHalfOpenState"; | ||
| private static final String SLIDING_WINDOW_TYPE = "slidingWindowType"; | ||
|
|
||
| // Global flag for circuit breaker enablement | ||
| private boolean circuitBreakerEnabled = false; | ||
| private Map<String, CircuitBreakerConfiguration> circuitBreakerConfigurationMap; | ||
|
|
||
| public CircuitBreakerConfigProvider(Config config) { | ||
| this.initialize(config); | ||
| } | ||
|
|
||
| /** Checks if Circuit Breaker is globally enabled. */ | ||
| public boolean isCircuitBreakerEnabled() { | ||
| return circuitBreakerEnabled; | ||
| } | ||
|
|
||
| private void initialize(Config circuitBreakerConfig) { | ||
| circuitBreakerEnabled = | ||
| circuitBreakerConfig.hasPath(ENABLED) && circuitBreakerConfig.getBoolean(ENABLED); | ||
| this.circuitBreakerConfigurationMap = | ||
| circuitBreakerConfig.root().keySet().stream() | ||
| .filter(key -> !key.equals(ENABLED)) // Ignore the global enabled flag | ||
| .collect( | ||
| Collectors.toMap( | ||
| key -> key, // Circuit breaker key | ||
pavan-traceable marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| key -> createCircuitBreakerConfig(circuitBreakerConfig.getConfig(key)))); | ||
| log.info( | ||
| "Loaded {} circuit breaker configurations, Global Enabled: {}. Configs: {}", | ||
| circuitBreakerConfigurationMap.size(), | ||
| circuitBreakerEnabled, | ||
| circuitBreakerConfigurationMap); | ||
| } | ||
|
|
||
| public Map<String, CircuitBreakerConfiguration> getConfigMap() { | ||
| return circuitBreakerConfigurationMap; | ||
| } | ||
|
|
||
| private CircuitBreakerConfiguration createCircuitBreakerConfig(Config config) { | ||
| return CircuitBreakerConfiguration.builder() | ||
| .failureRateThreshold((float) config.getDouble(FAILURE_RATE_THRESHOLD)) | ||
pavan-traceable marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .slowCallRateThreshold((float) config.getDouble(SLOW_CALL_RATE_THRESHOLD)) | ||
| .slowCallDurationThreshold(config.getDuration(SLOW_CALL_DURATION_THRESHOLD)) | ||
| .slidingWindowType(getSlidingWindowType(config.getString(SLIDING_WINDOW_TYPE))) | ||
| .slidingWindowSize(config.getInt(SLIDING_WINDOW_SIZE)) | ||
| .waitDurationInOpenState(config.getDuration(WAIT_DURATION_IN_OPEN_STATE)) | ||
| .permittedNumberOfCallsInHalfOpenState( | ||
| config.getInt(PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE)) | ||
| .minimumNumberOfCalls(config.getInt(MINIMUM_NUMBER_OF_CALLS)) | ||
| .build(); | ||
| } | ||
|
|
||
| private CircuitBreakerConfiguration.SlidingWindowType getSlidingWindowType( | ||
| String slidingWindowType) { | ||
| return CircuitBreakerConfiguration.SlidingWindowType.valueOf(slidingWindowType); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| package org.hypertrace.circuitbreaker.grpcutils; | ||
|
|
||
| import java.time.Duration; | ||
| import lombok.Builder; | ||
| import lombok.Setter; | ||
| import lombok.Value; | ||
|
|
||
| @Value | ||
| @Builder | ||
| @Setter | ||
aaron-steinfeld marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| public class CircuitBreakerConfiguration { | ||
| // Percentage of failures to trigger OPEN state | ||
| float failureRateThreshold; | ||
| // Percentage of slow calls to trigger OPEN state | ||
| float slowCallRateThreshold; | ||
| // Define what a "slow" call is | ||
| Duration slowCallDurationThreshold; | ||
| // Number of calls to consider in the sliding window | ||
| SlidingWindowType slidingWindowType; | ||
| int slidingWindowSize; | ||
| // Time before retrying after OPEN state | ||
| Duration waitDurationInOpenState; | ||
| // Minimum calls before evaluating failure rate | ||
| int minimumNumberOfCalls; | ||
| // Calls allowed in HALF_OPEN state before deciding to | ||
| // CLOSE or OPEN again | ||
| int permittedNumberOfCallsInHalfOpenState; | ||
|
|
||
| public enum SlidingWindowType { | ||
| COUNT_BASED, | ||
| TIME_BASED | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| package org.hypertrace.circuitbreaker.grpcutils; | ||
|
|
||
| import io.grpc.CallOptions; | ||
| import io.grpc.Channel; | ||
| import io.grpc.ClientCall; | ||
| import io.grpc.ClientInterceptor; | ||
| import io.grpc.MethodDescriptor; | ||
|
|
||
| public abstract class CircuitBreakerInterceptor implements ClientInterceptor { | ||
| @Override | ||
| public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( | ||
| MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { | ||
| if (!isCircuitBreakerEnabled()) { | ||
| return next.newCall(method, callOptions); | ||
|
Check warning on line 14 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/CircuitBreakerInterceptor.java
|
||
| } | ||
| return createInterceptedCall(method, callOptions, next); | ||
| } | ||
|
|
||
| protected abstract boolean isCircuitBreakerEnabled(); | ||
|
|
||
| protected abstract <ReqT, RespT> ClientCall<ReqT, RespT> createInterceptedCall( | ||
| MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| package org.hypertrace.circuitbreaker.grpcutils.resilience; | ||
|
|
||
| import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; | ||
| import java.util.Map; | ||
| import java.util.stream.Collectors; | ||
| import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerConfiguration; | ||
|
|
||
| /** Utility class to parse CircuitBreakerConfiguration to Resilience4j CircuitBreakerConfig */ | ||
| public class ResilienceCircuitBreakerConfigParser { | ||
|
Check warning on line 9 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerConfigParser.java
|
||
|
|
||
| public static Map<String, CircuitBreakerConfig> getCircuitBreakerConfigs( | ||
| Map<String, CircuitBreakerConfiguration> configurationMap) { | ||
| return configurationMap.entrySet().stream() | ||
| .collect(Collectors.toMap(Map.Entry::getKey, entry -> getConfig(entry.getValue()))); | ||
| } | ||
|
|
||
| static CircuitBreakerConfig getConfig(CircuitBreakerConfiguration configuration) { | ||
pavan-traceable marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return CircuitBreakerConfig.custom() | ||
| .failureRateThreshold(configuration.getFailureRateThreshold()) | ||
| .slowCallRateThreshold(configuration.getSlowCallRateThreshold()) | ||
| .slowCallDurationThreshold(configuration.getSlowCallDurationThreshold()) | ||
| .slidingWindowType(getSlidingWindowType(configuration)) | ||
| .slidingWindowSize(configuration.getSlidingWindowSize()) | ||
| .waitDurationInOpenState(configuration.getWaitDurationInOpenState()) | ||
| .permittedNumberOfCallsInHalfOpenState( | ||
| configuration.getPermittedNumberOfCallsInHalfOpenState()) | ||
| .minimumNumberOfCalls(configuration.getMinimumNumberOfCalls()) | ||
| .build(); | ||
| } | ||
|
|
||
| private static CircuitBreakerConfig.SlidingWindowType getSlidingWindowType( | ||
| CircuitBreakerConfiguration configuration) { | ||
| switch (configuration.getSlidingWindowType()) { | ||
| case COUNT_BASED: | ||
| return CircuitBreakerConfig.SlidingWindowType.COUNT_BASED; | ||
| case TIME_BASED: | ||
| default: | ||
| return CircuitBreakerConfig.SlidingWindowType.TIME_BASED; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,137 @@ | ||
| package org.hypertrace.circuitbreaker.grpcutils.resilience; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.inject.Singleton; | ||
| import com.typesafe.config.Config; | ||
| import io.github.resilience4j.circuitbreaker.CircuitBreaker; | ||
| import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; | ||
| import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; | ||
| import io.grpc.CallOptions; | ||
| import io.grpc.Channel; | ||
| import io.grpc.ClientCall; | ||
| import io.grpc.ForwardingClientCall; | ||
| import io.grpc.ForwardingClientCallListener; | ||
| import io.grpc.Metadata; | ||
| import io.grpc.MethodDescriptor; | ||
| import io.grpc.Status; | ||
| import java.time.Clock; | ||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
| import java.util.Map; | ||
| import java.util.concurrent.TimeUnit; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerConfigProvider; | ||
| import org.hypertrace.circuitbreaker.grpcutils.CircuitBreakerInterceptor; | ||
|
|
||
| @Slf4j | ||
| @Singleton | ||
| public class ResilienceCircuitBreakerInterceptor extends CircuitBreakerInterceptor { | ||
|
|
||
| public static final CallOptions.Key<String> CIRCUIT_BREAKER_KEY = | ||
| CallOptions.Key.createWithDefault("circuitBreakerKey", "default"); | ||
| private final CircuitBreakerRegistry resilicenceCircuitBreakerRegistry; | ||
| private final CircuitBreakerConfigProvider circuitBreakerConfigProvider; | ||
| private final Map<String, CircuitBreakerConfig> resilienceCircuitBreakerConfig; | ||
| private final ResilienceCircuitBreakerProvider resilienceCircuitBreakerProvider; | ||
| private final Clock clock; | ||
|
|
||
| public ResilienceCircuitBreakerInterceptor(Config config, Clock clock) { | ||
pavan-traceable marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| this.circuitBreakerConfigProvider = new CircuitBreakerConfigProvider(config); | ||
| this.resilienceCircuitBreakerConfig = | ||
| ResilienceCircuitBreakerConfigParser.getCircuitBreakerConfigs( | ||
| circuitBreakerConfigProvider.getConfigMap()); | ||
| this.resilicenceCircuitBreakerRegistry = | ||
|
Check warning on line 43 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java
|
||
| new ResilienceCircuitBreakerRegistryProvider(resilienceCircuitBreakerConfig) | ||
| .getCircuitBreakerRegistry(); | ||
| this.resilienceCircuitBreakerProvider = | ||
|
Check warning on line 46 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java
|
||
| new ResilienceCircuitBreakerProvider( | ||
| resilicenceCircuitBreakerRegistry, resilienceCircuitBreakerConfig); | ||
| this.clock = clock; | ||
| } | ||
|
Check warning on line 50 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java
|
||
|
|
||
| @VisibleForTesting | ||
| public ResilienceCircuitBreakerInterceptor( | ||
aaron-steinfeld marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Config config, | ||
| Clock clock, | ||
| CircuitBreakerRegistry resilicenceCircuitBreakerRegistry, | ||
| ResilienceCircuitBreakerProvider resilienceCircuitBreakerProvider) { | ||
| this.circuitBreakerConfigProvider = new CircuitBreakerConfigProvider(config); | ||
| this.resilienceCircuitBreakerConfig = | ||
| ResilienceCircuitBreakerConfigParser.getCircuitBreakerConfigs( | ||
| circuitBreakerConfigProvider.getConfigMap()); | ||
| this.resilicenceCircuitBreakerRegistry = resilicenceCircuitBreakerRegistry; | ||
| this.resilienceCircuitBreakerProvider = resilienceCircuitBreakerProvider; | ||
| this.clock = clock; | ||
| } | ||
|
|
||
| @Override | ||
| protected boolean isCircuitBreakerEnabled() { | ||
| return circuitBreakerConfigProvider.isCircuitBreakerEnabled(); | ||
| } | ||
|
|
||
| @Override | ||
| protected <ReqT, RespT> ClientCall<ReqT, RespT> createInterceptedCall( | ||
| MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { | ||
| // Get circuit breaker key from CallOptions | ||
| String circuitBreakerKey = callOptions.getOption(CIRCUIT_BREAKER_KEY); | ||
aaron-steinfeld marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| CircuitBreaker circuitBreaker = | ||
| resilienceCircuitBreakerProvider.getCircuitBreaker(circuitBreakerKey); | ||
| return new ForwardingClientCall.SimpleForwardingClientCall<>( | ||
| next.newCall(method, callOptions)) { | ||
| @Override | ||
| public void start(Listener<RespT> responseListener, Metadata headers) { | ||
| Instant startTime = clock.instant(); | ||
|
Check warning on line 83 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java
|
||
| // Wrap response listener to track failures | ||
| Listener<RespT> wrappedListener = | ||
| wrapListenerWithCircuitBreaker(responseListener, startTime); | ||
| super.start(wrappedListener, headers); | ||
| } | ||
|
Check warning on line 88 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java
|
||
|
|
||
| @Override | ||
| public void sendMessage(ReqT message) { | ||
| if (!circuitBreaker.tryAcquirePermission()) { | ||
| logCircuitBreakerRejection(circuitBreakerKey, circuitBreaker); | ||
| String rejectionReason = | ||
| circuitBreaker.getState() == CircuitBreaker.State.HALF_OPEN | ||
| ? "Circuit Breaker is HALF-OPEN and rejecting excess requests" | ||
|
Check warning on line 96 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java
|
||
| : "Circuit Breaker is OPEN and blocking requests"; | ||
| throw Status.RESOURCE_EXHAUSTED.withDescription(rejectionReason).asRuntimeException(); | ||
pavan-traceable marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| super.sendMessage(message); | ||
| } | ||
|
Check warning on line 101 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java
|
||
|
|
||
| private ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> | ||
| wrapListenerWithCircuitBreaker(Listener<RespT> responseListener, Instant startTime) { | ||
| return new ForwardingClientCallListener.SimpleForwardingClientCallListener<>( | ||
| responseListener) { | ||
|
Check warning on line 106 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java
|
||
| @Override | ||
| public void onClose(Status status, Metadata trailers) { | ||
| long duration = Duration.between(startTime, clock.instant()).toNanos(); | ||
|
Check warning on line 109 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java
|
||
| if (status.isOk()) { | ||
| circuitBreaker.onSuccess(duration, TimeUnit.NANOSECONDS); | ||
|
Check warning on line 111 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java
|
||
| } else { | ||
| log.debug( | ||
|
Check warning on line 113 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java
|
||
| "Circuit Breaker '{}' detected failure. Status: {}, Description: {}", | ||
| circuitBreaker.getName(), | ||
| status.getCode(), | ||
| status.getDescription()); | ||
| circuitBreaker.onError(duration, TimeUnit.NANOSECONDS, status.asRuntimeException()); | ||
|
Check warning on line 118 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java
|
||
| } | ||
| super.onClose(status, trailers); | ||
| } | ||
|
Check warning on line 121 in grpc-circuitbreaker-utils/src/main/java/org/hypertrace/circuitbreaker/grpcutils/resilience/ResilienceCircuitBreakerInterceptor.java
|
||
| }; | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| private void logCircuitBreakerRejection(String circuitBreakerKey, CircuitBreaker circuitBreaker) { | ||
| Map<CircuitBreaker.State, String> stateMessages = | ||
| Map.of( | ||
| CircuitBreaker.State.HALF_OPEN, "is HALF-OPEN and rejecting excess requests.", | ||
| CircuitBreaker.State.OPEN, "is OPEN and blocking requests"); | ||
| log.debug( | ||
| "Circuit Breaker '{}' {}", | ||
| circuitBreakerKey, | ||
| stateMessages.getOrDefault(circuitBreaker.getState(), "is in an unexpected state")); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.