Skip to content

Commit eb0234a

Browse files
es circuit breaker (#1484)
* es circuit breaker * adjust parameters * configurable es circuit breaker * added unit tests * fix mock config + update error * export circuit breaker as module * import fix + rollback test code
1 parent 332a034 commit eb0234a

11 files changed

Lines changed: 486 additions & 5 deletions

config/config.devnet.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ features:
8383
transactionBatch:
8484
enabled: true
8585
maxLookBehind: 100
86+
elasticCircuitBreaker:
87+
enabled: false
88+
durationThresholdMs: 5000
89+
failureCountThreshold: 5
90+
resetTimeoutMs: 30000
8691
statusChecker:
8792
enabled: false
8893
thresholds:

config/config.e2e.mainnet.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ features:
8080
deepHistory:
8181
enabled: false
8282
url: ''
83+
elasticCircuitBreaker:
84+
enabled: false
85+
durationThresholdMs: 5000
86+
failureCountThreshold: 5
87+
resetTimeoutMs: 30000
8388
statusChecker:
8489
enabled: false
8590
thresholds:

config/config.mainnet.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ features:
8080
deepHistory:
8181
enabled: false
8282
url: ''
83+
elasticCircuitBreaker:
84+
enabled: false
85+
durationThresholdMs: 5000
86+
failureCountThreshold: 5
87+
resetTimeoutMs: 30000
8388
statusChecker:
8489
enabled: false
8590
thresholds:

config/config.testnet.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ features:
7979
deepHistory:
8080
enabled: false
8181
url: ''
82+
elasticCircuitBreaker:
83+
enabled: false
84+
durationThresholdMs: 5000
85+
failureCountThreshold: 5
86+
resetTimeoutMs: 30000
8287
statusChecker:
8388
enabled: false
8489
thresholds:

src/common/api-config/api.config.service.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import { LogTopic } from '@multiversx/sdk-transaction-processor/lib/types/log-to
77

88
@Injectable()
99
export class ApiConfigService {
10-
constructor(private readonly configService: ConfigService) { }
10+
constructor(private readonly configService: ConfigService) {
11+
}
1112

1213
getConfig<T>(configKey: string): T | undefined {
1314
return this.configService.get<T>(configKey);
@@ -389,6 +390,23 @@ export class ApiConfigService {
389390
return isApiActive;
390391
}
391392

393+
isElasticCircuitBreakerEnabled(): boolean {
394+
const isEnabled = this.configService.get<boolean>('features.elasticCircuitBreaker.enabled');
395+
return isEnabled !== undefined ? isEnabled : false;
396+
}
397+
398+
getElasticCircuitBreakerConfig(): {
399+
durationThresholdMs: number,
400+
failureCountThreshold: number,
401+
resetTimeoutMs: number
402+
} {
403+
return {
404+
durationThresholdMs: this.configService.get<number>('features.elasticCircuitBreaker.durationThresholdMs') ?? 5000,
405+
failureCountThreshold: this.configService.get<number>('features.elasticCircuitBreaker.failureCountThreshold') ?? 5000,
406+
resetTimeoutMs: this.configService.get<number>('features.elasticCircuitBreaker.resetTimeoutMs') ?? 5000,
407+
};
408+
}
409+
392410
getIsWebsocketApiActive(): boolean {
393411
return this.configService.get<boolean>('api.websocket') ?? true;
394412
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { Global, Module } from "@nestjs/common";
2+
import { ApiConfigModule } from "src/common/api-config/api.config.module";
3+
import { DynamicModuleUtils } from "src/utils/dynamic.module.utils";
4+
import { EsCircuitBreakerProxy } from "./circuit.breaker.proxy.service";
5+
6+
@Global()
7+
@Module({
8+
imports: [
9+
ApiConfigModule,
10+
DynamicModuleUtils.getElasticModule(),
11+
],
12+
providers: [EsCircuitBreakerProxy],
13+
exports: [EsCircuitBreakerProxy],
14+
})
15+
export class EsCircuitBreakerProxyModule { }
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import { OriginLogger } from "@multiversx/sdk-nestjs-common";
2+
import { ElasticQuery, ElasticService } from "@multiversx/sdk-nestjs-elastic";
3+
import { Injectable, ServiceUnavailableException } from "@nestjs/common";
4+
import { ApiConfigService } from "../../../api-config/api.config.service";
5+
6+
@Injectable()
7+
export class EsCircuitBreakerProxy {
8+
private failureCount = 0;
9+
private lastFailureTime = 0;
10+
private isCircuitOpen = false;
11+
private readonly logger = new OriginLogger(EsCircuitBreakerProxy.name);
12+
private readonly enabled: boolean;
13+
private readonly config: { durationThresholdMs: number, failureCountThreshold: number, resetTimeoutMs: number };
14+
15+
constructor(
16+
readonly apiConfigService: ApiConfigService,
17+
private readonly elasticService: ElasticService,
18+
) {
19+
this.enabled = apiConfigService.isElasticCircuitBreakerEnabled();
20+
this.config = apiConfigService.getElasticCircuitBreakerConfig();
21+
this.logger.log(`ES Circuit Breaker. Enabled: ${this.enabled}. Duration threshold: ${this.config.durationThresholdMs}ms.
22+
FailureCountThreshold: ${this.config.failureCountThreshold}ms. FailureCountThreshold: ${this.config.failureCountThreshold}`);
23+
}
24+
25+
private async withCircuitBreaker<T>(operation: () => Promise<T>): Promise<T> {
26+
if (!this.enabled) {
27+
return operation();
28+
}
29+
30+
if (this.isCircuitOpen) {
31+
const now = Date.now();
32+
if (now - this.lastFailureTime >= this.config.resetTimeoutMs) {
33+
this.logger.log('Circuit is half-open, attempting reset');
34+
this.isCircuitOpen = false;
35+
this.failureCount = 0;
36+
} else {
37+
throw new ServiceUnavailableException();
38+
}
39+
}
40+
41+
try {
42+
const timeoutPromise = new Promise<never>((_, reject) => {
43+
setTimeout(() => reject(new Error('Operation timed out')), this.config.durationThresholdMs);
44+
});
45+
46+
const result = await Promise.race([operation(), timeoutPromise]);
47+
this.failureCount = 0;
48+
return result;
49+
} catch (error) {
50+
this.failureCount++;
51+
this.lastFailureTime = Date.now();
52+
53+
if (this.failureCount >= this.config.failureCountThreshold) {
54+
if (!this.isCircuitOpen) {
55+
this.logger.log('Circuit breaker opened due to multiple failures');
56+
}
57+
58+
this.isCircuitOpen = true;
59+
}
60+
61+
throw new ServiceUnavailableException();
62+
}
63+
}
64+
65+
// eslint-disable-next-line require-await
66+
async getCount(index: string, query: ElasticQuery): Promise<number> {
67+
return this.withCircuitBreaker(() => this.elasticService.getCount(index, query));
68+
}
69+
70+
// eslint-disable-next-line require-await
71+
async getList(index: string, id: string, query: ElasticQuery): Promise<any[]> {
72+
return this.withCircuitBreaker(() => this.elasticService.getList(index, id, query));
73+
}
74+
75+
// eslint-disable-next-line require-await
76+
async getItem(index: string, id: string, value: string): Promise<any> {
77+
return this.withCircuitBreaker(() => this.elasticService.getItem(index, id, value));
78+
}
79+
80+
// eslint-disable-next-line require-await
81+
async getCustomValue(index: string, id: string, key: string): Promise<any> {
82+
return this.withCircuitBreaker(() => this.elasticService.getCustomValue(index, id, key));
83+
}
84+
85+
// eslint-disable-next-line require-await
86+
async setCustomValue(index: string, id: string, key: string, value: any): Promise<void> {
87+
return this.withCircuitBreaker(() => this.elasticService.setCustomValue(index, id, key, value));
88+
}
89+
90+
// eslint-disable-next-line require-await
91+
async setCustomValues(index: string, id: string, values: Record<string, any>): Promise<void> {
92+
return this.withCircuitBreaker(() => this.elasticService.setCustomValues(index, id, values));
93+
}
94+
95+
// eslint-disable-next-line require-await
96+
async getScrollableList(index: string, id: string, query: ElasticQuery, action: (items: any[]) => Promise<void>): Promise<void> {
97+
return this.withCircuitBreaker(() => this.elasticService.getScrollableList(index, id, query, action));
98+
}
99+
100+
// eslint-disable-next-line require-await
101+
async get(url: string): Promise<any> {
102+
return this.withCircuitBreaker(() => this.elasticService.get(url));
103+
}
104+
105+
// eslint-disable-next-line require-await
106+
async post(url: string, data: any): Promise<any> {
107+
return this.withCircuitBreaker(() => this.elasticService.post(url, data));
108+
}
109+
}

src/common/indexer/elastic/elastic.indexer.module.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
import { forwardRef, Global, Module } from "@nestjs/common";
22
import { ApiConfigModule } from "src/common/api-config/api.config.module";
33
import { BlsModule } from "src/endpoints/bls/bls.module";
4-
import { DynamicModuleUtils } from "src/utils/dynamic.module.utils";
54
import { ElasticIndexerHelper } from "./elastic.indexer.helper";
65
import { ElasticIndexerService } from "./elastic.indexer.service";
6+
import { EsCircuitBreakerProxyModule } from "./circuit-breaker/circuit.breaker.proxy.module";
77

88
@Global()
99
@Module({
1010
imports: [
1111
ApiConfigModule,
1212
forwardRef(() => BlsModule),
13-
DynamicModuleUtils.getElasticModule(),
13+
EsCircuitBreakerProxyModule,
1414
],
1515
providers: [ElasticIndexerService, ElasticIndexerHelper],
1616
exports: [ElasticIndexerService, ElasticIndexerHelper],

src/common/indexer/elastic/elastic.indexer.service.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { HttpStatus, Injectable } from "@nestjs/common";
22
import { BinaryUtils } from "@multiversx/sdk-nestjs-common";
3-
import { ElasticService, ElasticQuery, QueryOperator, QueryType, QueryConditionOptions, ElasticSortOrder, ElasticSortProperty, TermsQuery, RangeGreaterThanOrEqual, MatchQuery } from "@multiversx/sdk-nestjs-elastic";
3+
import { ElasticQuery, QueryOperator, QueryType, QueryConditionOptions, ElasticSortOrder, ElasticSortProperty, TermsQuery, RangeGreaterThanOrEqual, MatchQuery } from "@multiversx/sdk-nestjs-elastic";
44
import { IndexerInterface } from "../indexer.interface";
55
import { ApiConfigService } from "src/common/api-config/api.config.service";
66
import { CollectionFilter } from "src/endpoints/collections/entities/collection.filter";
@@ -29,6 +29,7 @@ import { ApplicationFilter } from "src/endpoints/applications/entities/applicati
2929
import { NftType } from "../entities/nft.type";
3030
import { EventsFilter } from "src/endpoints/events/entities/events.filter";
3131
import { Events } from "../entities/events";
32+
import { EsCircuitBreakerProxy } from "./circuit-breaker/circuit.breaker.proxy.service";
3233

3334
@Injectable()
3435
export class ElasticIndexerService implements IndexerInterface {
@@ -38,7 +39,7 @@ export class ElasticIndexerService implements IndexerInterface {
3839

3940
constructor(
4041
private readonly apiConfigService: ApiConfigService,
41-
private readonly elasticService: ElasticService,
42+
private readonly elasticService: EsCircuitBreakerProxy,
4243
private readonly indexerHelper: ElasticIndexerHelper,
4344
) { }
4445

src/test/unit/controllers/services.mock/account.services.mock.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ export const mockApiConfigService = () => ({
7676
getProcessTtl: jest.fn().mockReturnValue(''),
7777
getExternalMediaUrl: jest.fn().mockReturnValue(''),
7878
getMediaUrl: jest.fn().mockReturnValue(''),
79+
isElasticCircuitBreakerEnabled: jest.fn().mockReturnValue(false),
80+
getElasticCircuitBreakerConfig: jest.fn().mockReturnValue({}),
7981
getConfig: jest.fn(),
8082
});
8183

0 commit comments

Comments
 (0)