Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ public class CommonParameter {
@Setter
public double rateLimiterDisconnect; // clearParam: 1.0
@Getter
@Setter
public boolean rateLimiterApiNonBlocking = false;
@Getter
public RocksDbSettings rocksDBCustomSettings;
@Getter
public GenesisBlock genesisBlock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class RateLimiterConfig {
private P2pRateLimitConfig p2p = new P2pRateLimitConfig();
private List<HttpRateLimitItem> http = new ArrayList<>();
private List<RpcRateLimitItem> rpc = new ArrayList<>();
private boolean apiNonBlocking = false;

@Getter
@Setter
Expand Down
1 change: 1 addition & 0 deletions common/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ rate.limiter = {
global.qps = 50000
global.ip.qps = 10000
global.api.qps = 1000
apiNonBlocking = false
}

seed.node = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.tron.core.config.args;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.typesafe.config.Config;
Expand Down Expand Up @@ -29,6 +30,7 @@ public void testDefaults() {
assertEquals(1.0, rl.getP2p().getDisconnect(), 0.001);
assertTrue(rl.getHttp().isEmpty());
assertTrue(rl.getRpc().isEmpty());
assertFalse(rl.isApiNonBlocking());
}

@Test
Expand All @@ -40,7 +42,8 @@ public void testFromConfig() {
+ " http = [{ component = TestServlet, strategy = QpsRateLimiterAdapter,"
+ " paramString = \"qps=10\" }],"
+ " rpc = [{ component = TestRpc, strategy = GlobalPreemptibleAdapter,"
+ " paramString = \"permit=1\" }]"
+ " paramString = \"permit=1\" }],"
+ " apiNonBlocking = true"
+ "}");
RateLimiterConfig rl = RateLimiterConfig.fromConfig(config);
assertEquals(100, rl.getGlobal().getQps());
Expand All @@ -50,5 +53,6 @@ public void testFromConfig() {
assertEquals("TestServlet", rl.getHttp().get(0).getComponent());
assertEquals(1, rl.getRpc().size());
assertEquals("TestRpc", rl.getRpc().get(0).getComponent());
assertTrue(rl.isApiNonBlocking());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ private static void applyRateLimiterConfig(RateLimiterConfig rl) {
PARAMETER.rateLimiterSyncBlockChain = rl.getP2p().getSyncBlockChain();
PARAMETER.rateLimiterFetchInvData = rl.getP2p().getFetchInvData();
PARAMETER.rateLimiterDisconnect = rl.getP2p().getDisconnect();
PARAMETER.rateLimiterApiNonBlocking = rl.isApiNonBlocking();

// HTTP/RPC rate limiter items: convert bean lists to business objects
RateLimiterInitialization initialization = new RateLimiterInitialization();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ protected void service(HttpServletRequest req, HttpServletResponse resp)
IRateLimiter rateLimiter = container.get(KEY_PREFIX_HTTP, getClass().getSimpleName());

// Check per-endpoint first to avoid consuming global IP/QPS quota for requests
// that would be rejected by the per-endpoint limiter anyway.
boolean perEndpointAcquired = rateLimiter == null || rateLimiter.tryAcquire(runtimeData);
boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.tryAcquire(runtimeData);
// that would be rejected by the per-endpoint limiter anyway. acquirePermit()
// chooses blocking or non-blocking semantics based on rate.limiter.apiNonBlocking.
boolean perEndpointAcquired = rateLimiter == null || rateLimiter.acquirePermit(runtimeData);
boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.acquirePermit(runtimeData);

String contextPath = req.getContextPath();
String url = Strings.isNullOrEmpty(req.getServletPath())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,43 @@ public class GlobalRateLimiter {
public static boolean tryAcquire(RuntimeData runtimeData) {
String ip = runtimeData.getRemoteAddr();
if (!Strings.isNullOrEmpty(ip)) {
RateLimiter r;
try {
// cache.get is atomic: only one loader executes per key under concurrent requests,
// preventing multiple RateLimiter instances from being created for the same IP.
r = cache.get(ip, () -> RateLimiter.create(IP_QPS));
} catch (Exception e) {
logger.warn("Failed to load IP rate limiter for {}, denying request: {}",
ip, e.getMessage());
RateLimiter r = loadIpLimiter(ip);
if (r == null || !r.tryAcquire()) {
return false;
}
if (!r.tryAcquire()) {
}
return rateLimiter.tryAcquire();
}

public static boolean acquire(RuntimeData runtimeData) {
String ip = runtimeData.getRemoteAddr();
if (!Strings.isNullOrEmpty(ip)) {
RateLimiter r = loadIpLimiter(ip);
if (r == null) {
return false;
}
r.acquire();
}
rateLimiter.acquire();
return true;
}

public static boolean acquirePermit(RuntimeData runtimeData) {
return Args.getInstance().isRateLimiterApiNonBlocking()
? tryAcquire(runtimeData)
: acquire(runtimeData);
}

private static RateLimiter loadIpLimiter(String ip) {
try {
// cache.get is atomic: only one loader executes per key under concurrent requests,
// preventing multiple RateLimiter instances from being created for the same IP.
return cache.get(ip, () -> RateLimiter.create(IP_QPS));
} catch (Exception e) {
logger.warn("Failed to load IP rate limiter for {}, denying request: {}",
ip, e.getMessage());
return null;
}
return rateLimiter.tryAcquire();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,

RuntimeData runtimeData = new RuntimeData(call);
// Check per-endpoint first to avoid consuming global IP/QPS quota for requests
// that would be rejected by the per-endpoint limiter anyway.
boolean perEndpointAcquired = rateLimiter == null || rateLimiter.tryAcquire(runtimeData);
boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.tryAcquire(runtimeData);
// that would be rejected by the per-endpoint limiter anyway. acquirePermit()
// chooses blocking or non-blocking semantics based on rate.limiter.apiNonBlocking.
boolean perEndpointAcquired = rateLimiter == null || rateLimiter.acquirePermit(runtimeData);
boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.acquirePermit(runtimeData);

if (!acquireResource) {
// Release the per-endpoint permit when global rejected, to avoid semaphore leak.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ public DefaultBaseQqsAdapter(String paramString) {
public boolean tryAcquire(RuntimeData data) {
return strategy.tryAcquire();
}

@Override
public boolean acquire(RuntimeData data) {
return strategy.acquire();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,8 @@ public boolean tryAcquire(RuntimeData data) {
return strategy.tryAcquire();
}

@Override
public boolean acquire(RuntimeData data) {
return strategy.acquire();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,9 @@ public boolean tryAcquire(RuntimeData data) {
return strategy.tryAcquire(data.getRemoteAddr());
}

@Override
public boolean acquire(RuntimeData data) {
return strategy.acquire(data.getRemoteAddr());
}

}
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
package org.tron.core.services.ratelimiter.adapter;

import org.tron.core.config.args.Args;
import org.tron.core.services.ratelimiter.RuntimeData;

public interface IRateLimiter {

boolean tryAcquire(RuntimeData data);

boolean acquire(RuntimeData data);

default boolean acquirePermit(RuntimeData data) {
return Args.getInstance().isRateLimiterApiNonBlocking()
? tryAcquire(data)
: acquire(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,9 @@ public boolean tryAcquire(RuntimeData data) {
return strategy.tryAcquire();
}

@Override
public boolean acquire(RuntimeData data) {
return strategy.acquire();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class GlobalPreemptibleStrategy extends Strategy {

public static final String STRATEGY_PARAM_PERMIT = "permit";
public static final int DEFAULT_PERMIT_NUM = 1;
public static final int DEFAULT_ACQUIRE_TIMEOUT = 2;
private Semaphore sp;

public GlobalPreemptibleStrategy(String paramString) {
Expand All @@ -23,15 +27,25 @@ protected Map<String, ParamItem> defaultParam() {
return map;
}

// Non-blocking: immediately rejects if no permit is available.
// Intentional change from the previous tryAcquire(2, TimeUnit.SECONDS) behaviour:
// blocking the caller for up to 2 s ties up Netty IO / gRPC executor threads and
// masks overload rather than shedding it. All rate-limiting in this stack is now
// non-blocking to keep the thread model consistent with GlobalRateLimiter.
// Non-blocking: immediately rejects if no permit is available. Used when the
// apiNonBlocking switch is on, to shed overload instead of tying up Netty IO /
// gRPC executor threads while waiting for a permit.
public boolean tryAcquire() {
return sp.tryAcquire();
}

public boolean acquire() {
try {
return sp.tryAcquire(DEFAULT_ACQUIRE_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Restore the interrupt flag and reject — caller must not release a permit
// it never acquired.
logger.error("acquire permit with error: {}", e.getMessage());
Thread.currentThread().interrupt();
return false;
}
}

public void release() {
sp.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,29 @@ public IPQpsStrategy(String paramString) {
}

public boolean tryAcquire(String ip) {
RateLimiter limiter;
RateLimiter limiter = loadLimiter(ip);
return limiter != null && limiter.tryAcquire();
}

public boolean acquire(String ip) {
RateLimiter limiter = loadLimiter(ip);
if (limiter == null) {
return false;
}
limiter.acquire();
return true;
}

private RateLimiter loadLimiter(String ip) {
try {
// cache.get is atomic: only one loader executes per key under concurrent requests,
// preventing multiple RateLimiter instances from being created for the same IP.
limiter = ipLimiter.get(ip, this::newRateLimiter);
return ipLimiter.get(ip, this::newRateLimiter);
} catch (Exception e) {
logger.warn("Failed to load IP rate limiter for {}, denying request: {}",
ip, e.getMessage());
return false;
return null;
}
return limiter.tryAcquire();
}

private RateLimiter newRateLimiter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ protected Map<String, ParamItem> defaultParam() {
public boolean tryAcquire() {
return rateLimiter.tryAcquire();
}

public boolean acquire() {
rateLimiter.acquire();
return true;
}
}
5 changes: 4 additions & 1 deletion framework/src/main/resources/config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ node {

## rate limiter config
rate.limiter = {
# Every api could only set a specific rate limit strategy. Three non-blocking strategy are supported:
# Every api could only set a specific rate limit strategy. Three strategy are supported:
# GlobalPreemptibleAdapter: The number of preemptible resource or maximum concurrent requests globally.
# QpsRateLimiterAdapter: qps is the average request count in one second supported by the server, it could be a Double or a Integer.
# IPQPSRateLimiterAdapter: similar to the QpsRateLimiterAdapter, qps could be a Double or a Integer.
Expand Down Expand Up @@ -473,6 +473,9 @@ rate.limiter = {
global.qps = 50000
# IP-based global qps, default 10000
global.ip.qps = 10000
# If true, API rate limiters reject immediately on overload (non-blocking).
# If false (default), callers wait for a permit (blocking, the legacy behaviour).
apiNonBlocking = false
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,14 @@ public void testBuildsEachWhitelistedAdapter() {
@Test
public void testPerEndpointRejectedDoesNotConsumeGlobalQuota() throws Exception {
IPreemptibleRateLimiter perEndpoint = Mockito.mock(IPreemptibleRateLimiter.class);
when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(false);
when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(false);
container.add(KEY_HTTP, "TestServlet", perEndpoint);

try (MockedStatic<GlobalRateLimiter> globalMock = mockStatic(GlobalRateLimiter.class)) {
servlet.service(request, response);

globalMock.verify(() -> GlobalRateLimiter.tryAcquire(any()), never());
// tryAcquire returned false — no permit was taken, nothing to release
globalMock.verify(() -> GlobalRateLimiter.acquirePermit(any()), never());
// acquirePermit returned false — no permit was taken, nothing to release
verify(perEndpoint, never()).release();
}
}
Expand All @@ -186,13 +186,13 @@ public void testPerEndpointRejectedDoesNotConsumeGlobalQuota() throws Exception
@Test
public void testNonPreemptiblePerEndpointRejectedDoesNotConsumeGlobal() throws Exception {
IRateLimiter perEndpoint = Mockito.mock(IRateLimiter.class);
when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(false);
when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(false);
container.add(KEY_HTTP, "TestServlet", perEndpoint);

try (MockedStatic<GlobalRateLimiter> globalMock = mockStatic(GlobalRateLimiter.class)) {
servlet.service(request, response);

globalMock.verify(() -> GlobalRateLimiter.tryAcquire(any()), never());
globalMock.verify(() -> GlobalRateLimiter.acquirePermit(any()), never());
}
}

Expand All @@ -203,11 +203,11 @@ public void testNonPreemptiblePerEndpointRejectedDoesNotConsumeGlobal() throws E
@Test
public void testGlobalRejectedReleasesPreemptiblePermit() throws Exception {
IPreemptibleRateLimiter perEndpoint = Mockito.mock(IPreemptibleRateLimiter.class);
when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(true);
when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(true);
container.add(KEY_HTTP, "TestServlet", perEndpoint);

try (MockedStatic<GlobalRateLimiter> globalMock = mockStatic(GlobalRateLimiter.class)) {
globalMock.when(() -> GlobalRateLimiter.tryAcquire(any())).thenReturn(false);
globalMock.when(() -> GlobalRateLimiter.acquirePermit(any())).thenReturn(false);

servlet.service(request, response);

Expand All @@ -223,11 +223,11 @@ public void testGlobalRejectedReleasesPreemptiblePermit() throws Exception {
@Test
public void testBothPassPermitReleasedAfterRequest() throws Exception {
IPreemptibleRateLimiter perEndpoint = Mockito.mock(IPreemptibleRateLimiter.class);
when(perEndpoint.tryAcquire(any(RuntimeData.class))).thenReturn(true);
when(perEndpoint.acquirePermit(any(RuntimeData.class))).thenReturn(true);
container.add(KEY_HTTP, "TestServlet", perEndpoint);

try (MockedStatic<GlobalRateLimiter> globalMock = mockStatic(GlobalRateLimiter.class)) {
globalMock.when(() -> GlobalRateLimiter.tryAcquire(any())).thenReturn(true);
globalMock.when(() -> GlobalRateLimiter.acquirePermit(any())).thenReturn(true);

servlet.service(request, response);

Expand All @@ -243,11 +243,11 @@ public void testBothPassPermitReleasedAfterRequest() throws Exception {
public void testNullRateLimiterConsultsOnlyGlobal() throws Exception {
// No entry added to container — container.get() returns null
try (MockedStatic<GlobalRateLimiter> globalMock = mockStatic(GlobalRateLimiter.class)) {
globalMock.when(() -> GlobalRateLimiter.tryAcquire(any())).thenReturn(true);
globalMock.when(() -> GlobalRateLimiter.acquirePermit(any())).thenReturn(true);

servlet.service(request, response);

globalMock.verify(() -> GlobalRateLimiter.tryAcquire(any()), times(1));
globalMock.verify(() -> GlobalRateLimiter.acquirePermit(any()), times(1));
}
}
}
Loading
Loading