Skip to content

Commit 0da1807

Browse files
author
Egor Kuts
committed
ignite-28305 backpressure to limit in flight partition operations per node
1 parent 784e098 commit 0da1807

File tree

22 files changed

+371
-23
lines changed

22 files changed

+371
-23
lines changed

modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.ignite.internal.sql.engine.QueryProcessor;
4848
import org.apache.ignite.internal.table.IgniteTablesInternal;
4949
import org.apache.ignite.internal.tx.TxManager;
50+
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
5051
import org.apache.ignite.network.NetworkAddress;
5152
import org.jetbrains.annotations.Nullable;
5253
import org.junit.jupiter.api.TestInfo;
@@ -144,6 +145,7 @@ ClientHandlerModule start(TestInfo testInfo) {
144145
EventLog.NOOP,
145146
new TestLowWatermark(),
146147
Runnable::run,
148+
new PartitionOperationInFlightLimiter(0),
147149
() -> true
148150
);
149151

modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.apache.ignite.internal.table.IgniteTablesInternal;
7575
import org.apache.ignite.internal.tx.TxManager;
7676
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
77+
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
7778
import org.apache.ignite.lang.IgniteException;
7879
import org.jetbrains.annotations.Nullable;
7980
import org.jetbrains.annotations.TestOnly;
@@ -163,6 +164,8 @@ public class ClientHandlerModule implements IgniteComponent, PlatformComputeTran
163164

164165
private final Executor partitionOperationsExecutor;
165166

167+
private final PartitionOperationInFlightLimiter partitionOperationInFlightLimiter;
168+
166169
private final ConcurrentHashMap<String, CompletableFuture<PlatformComputeConnection>> computeExecutors = new ConcurrentHashMap<>();
167170

168171
@TestOnly
@@ -186,6 +189,7 @@ public class ClientHandlerModule implements IgniteComponent, PlatformComputeTran
186189
* @param eventLog Event log.
187190
* @param lowWatermark Low watermark.
188191
* @param partitionOperationsExecutor Executor for a partition operation.
192+
* @param partitionOperationInFlightLimiter In-flight limiter for partition operations.
189193
* @param ddlBatchingSuggestionEnabled Boolean supplier indicates whether the suggestion related DDL batching is enabled.
190194
*/
191195
public ClientHandlerModule(
@@ -207,6 +211,7 @@ public ClientHandlerModule(
207211
EventLog eventLog,
208212
LowWatermark lowWatermark,
209213
Executor partitionOperationsExecutor,
214+
PartitionOperationInFlightLimiter partitionOperationInFlightLimiter,
210215
Supplier<Boolean> ddlBatchingSuggestionEnabled
211216
) {
212217
assert igniteTables != null;
@@ -252,6 +257,7 @@ public ClientHandlerModule(
252257
this.clientConnectorConfiguration = clientConnectorConfiguration;
253258
this.ddlBatchingSuggestionEnabled = ddlBatchingSuggestionEnabled;
254259
this.partitionOperationsExecutor = partitionOperationsExecutor;
260+
this.partitionOperationInFlightLimiter = partitionOperationInFlightLimiter;
255261
}
256262

257263
/** {@inheritDoc} */
@@ -471,6 +477,7 @@ private ClientInboundMessageHandler createInboundMessageHandler(
471477
connectionId,
472478
primaryReplicaTracker,
473479
partitionOperationsExecutor,
480+
partitionOperationInFlightLimiter,
474481
SUPPORTED_FEATURES,
475482
Map.of(),
476483
computeExecutors::remove,

modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@
153153
import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
154154
import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
155155
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
156+
import org.apache.ignite.internal.lang.ReplicaOverloadedException;
156157
import org.apache.ignite.internal.logger.IgniteLogger;
157158
import org.apache.ignite.internal.logger.Loggers;
158159
import org.apache.ignite.internal.network.ClusterService;
@@ -180,6 +181,7 @@
180181
import org.apache.ignite.internal.tx.TransactionKilledException;
181182
import org.apache.ignite.internal.tx.TxManager;
182183
import org.apache.ignite.internal.util.ExceptionUtils;
184+
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
183185
import org.apache.ignite.lang.CancelHandle;
184186
import org.apache.ignite.lang.ErrorGroups.Compute;
185187
import org.apache.ignite.lang.ErrorGroups.Sql;
@@ -278,6 +280,8 @@ public class ClientInboundMessageHandler
278280

279281
private final Executor partitionOperationsExecutor;
280282

283+
private final PartitionOperationInFlightLimiter partitionOperationInFlightLimiter;
284+
281285
private final BitSet features;
282286

283287
private final Map<HandshakeExtension, Object> extensions;
@@ -309,6 +313,7 @@ public class ClientInboundMessageHandler
309313
* @param connectionId Connection ID.
310314
* @param primaryReplicaTracker Primary replica tracker.
311315
* @param partitionOperationsExecutor Partition operations executor.
316+
* @param partitionOperationInFlightLimiter In-flight limiter for partition operations.
312317
* @param features Features.
313318
* @param extensions Extensions.
314319
* @param eventLog Event log.
@@ -330,6 +335,7 @@ public ClientInboundMessageHandler(
330335
long connectionId,
331336
ClientPrimaryReplicaTracker primaryReplicaTracker,
332337
Executor partitionOperationsExecutor,
338+
PartitionOperationInFlightLimiter partitionOperationInFlightLimiter,
333339
BitSet features,
334340
Map<HandshakeExtension, Object> extensions,
335341
Function<String, CompletableFuture<PlatformComputeConnection>> computeConnectionFunc,
@@ -373,6 +379,7 @@ public ClientInboundMessageHandler(
373379
this.eventLog = eventLog;
374380
this.primaryReplicaTracker = primaryReplicaTracker;
375381
this.partitionOperationsExecutor = partitionOperationsExecutor;
382+
this.partitionOperationInFlightLimiter = partitionOperationInFlightLimiter;
376383
this.handshakeEventLoopSwitcher = handshakeEventLoopSwitcher;
377384

378385
jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources);
@@ -882,19 +889,28 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
882889
if (ClientOp.isPartitionOperation(opCode)) {
883890
long requestId0 = requestId;
884891
int opCode0 = opCode;
892+
if (!partitionOperationInFlightLimiter.tryAcquire()) {
893+
in.close();
885894

886-
partitionOperationsExecutor.execute(() -> {
887-
try {
888-
processOperationInternal(ctx, in, requestId0, opCode0, guard);
889-
} catch (Throwable t) {
890-
in.close();
891-
892-
writeError(requestId0, opCode0, t, ctx, false, guard);
895+
writeError(requestId0, opCode0, new ReplicaOverloadedException(), ctx, false, guard);
893896

894-
metrics.requestsFailedIncrement();
895-
metrics.requestsActiveDecrement();
896-
}
897-
});
897+
metrics.requestsFailedIncrement();
898+
} else {
899+
partitionOperationsExecutor.execute(() -> {
900+
try {
901+
processOperationInternal(ctx, in, requestId0, opCode0, guard);
902+
} catch (Throwable t) {
903+
in.close();
904+
905+
writeError(requestId0, opCode0, t, ctx, false, guard);
906+
907+
metrics.requestsFailedIncrement();
908+
metrics.requestsActiveDecrement();
909+
} finally {
910+
partitionOperationInFlightLimiter.release();
911+
}
912+
});
913+
}
898914
} else {
899915
processOperationInternal(ctx, in, requestId, opCode, guard);
900916
}

modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.ignite.internal.schema.AlwaysSyncedSchemaSyncService;
6666
import org.apache.ignite.internal.security.authentication.AuthenticationManager;
6767
import org.apache.ignite.internal.table.IgniteTablesInternal;
68+
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
6869
import org.apache.ignite.lang.IgniteException;
6970
import org.jetbrains.annotations.Nullable;
7071

@@ -272,6 +273,7 @@ protected void initChannel(Channel ch) {
272273
new TestLowWatermark()
273274
),
274275
Runnable::run,
276+
new PartitionOperationInFlightLimiter(0),
275277
features,
276278
randomExtensions(),
277279
unused -> null,

modules/client/src/test/java/org/apache/ignite/client/TestServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl;
8080
import org.apache.ignite.internal.security.configuration.SecurityConfiguration;
8181
import org.apache.ignite.internal.table.IgniteTablesInternal;
82+
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
8283
import org.apache.ignite.network.NetworkAddress;
8384
import org.jetbrains.annotations.Nullable;
8485
import org.mockito.Mockito;
@@ -290,6 +291,7 @@ public void log(String type, Supplier<Event> eventProvider) {
290291
EventLog.NOOP,
291292
new TestLowWatermark(),
292293
Runnable::run,
294+
new PartitionOperationInFlightLimiter(0),
293295
() -> true
294296
);
295297

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.lang;
19+
20+
import static org.apache.ignite.lang.ErrorGroups.Replicator.GROUP_OVERLOADED_ERR;
21+
22+
/**
23+
* Thrown when the node has reached the maximum number of in-flight partition operations
24+
* ({@code replication.maxInFlightPartitionOperationsPerCore}) and cannot accept new requests.
25+
*/
26+
public class ReplicaOverloadedException extends IgniteInternalException {
27+
private static final long serialVersionUID = -6023736883539658779L;
28+
29+
/** Constructor. */
30+
public ReplicaOverloadedException() {
31+
super(GROUP_OVERLOADED_ERR, "Node is overloaded: max in-flight partition operations limit reached.");
32+
}
33+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.util;
19+
20+
import java.util.concurrent.Semaphore;
21+
import java.util.function.IntSupplier;
22+
import org.jetbrains.annotations.Nullable;
23+
24+
/**
25+
* Limits the number of in-flight partition operations (queued or executing) across the replica manager and thin-client connector.
26+
*
27+
* <p>When the limit is zero or less, all operations are permitted unconditionally.
28+
* When positive, {@link #tryAcquire()} returns {@code false} once the limit is reached and the caller should reject the request.
29+
* A permit must be released via {@link #release()} upon operation completes.
30+
*/
31+
public class PartitionOperationInFlightLimiter {
32+
private volatile Semaphore semaphore;
33+
34+
private final @Nullable IntSupplier limitSupplier;
35+
36+
private volatile boolean initialized;
37+
38+
/**
39+
* Constructor.
40+
*
41+
* @param maxInFlightPartitionOperationsPerCore Max number of in-flight partition operations per CPU core, or <= 0 to disable the limit.
42+
* The total limit is {@code maxInFlightPartitionOperationsPerCore * availableProcessors}.
43+
*/
44+
public PartitionOperationInFlightLimiter(int maxInFlightPartitionOperationsPerCore) {
45+
int limit = maxInFlightPartitionOperationsPerCore <= 0 ? 0
46+
: maxInFlightPartitionOperationsPerCore * Runtime.getRuntime().availableProcessors();
47+
this.semaphore = limit <= 0 ? null : new Semaphore(limit);
48+
this.limitSupplier = null;
49+
this.initialized = true;
50+
}
51+
52+
/**
53+
* Constructor.
54+
*
55+
* @param maxInFlightPartitionOperationsPerCoreSupplier Supplier of the max number of in-flight partition operations per CPU core,
56+
* or 0 to disable. The total limit is {@code supplied value * availableProcessors}.
57+
*/
58+
public PartitionOperationInFlightLimiter(@Nullable IntSupplier maxInFlightPartitionOperationsPerCoreSupplier) {
59+
this.limitSupplier = maxInFlightPartitionOperationsPerCoreSupplier;
60+
this.initialized = false;
61+
}
62+
63+
/**
64+
* Attempts to acquire a permit.
65+
*
66+
* @return {@code true} if a permit was acquired or the limit is disabled; {@code false} if the limit is reached.
67+
*/
68+
public boolean tryAcquire() {
69+
Semaphore s = resolvedSemaphore();
70+
return s == null || s.tryAcquire();
71+
}
72+
73+
/**
74+
* Releases a previously acquired permit.
75+
* Must only be called after a successful {@link #tryAcquire()} when the limit is enabled.
76+
*/
77+
public void release() {
78+
Semaphore s = resolvedSemaphore();
79+
80+
if (s != null) {
81+
s.release();
82+
}
83+
}
84+
85+
private @Nullable Semaphore resolvedSemaphore() {
86+
if (initialized) {
87+
return semaphore;
88+
}
89+
synchronized (this) {
90+
if (initialized) {
91+
return semaphore;
92+
}
93+
if (limitSupplier != null) {
94+
int perCore = limitSupplier.getAsInt();
95+
int limit = perCore <= 0 ? 0 : perCore * Runtime.getRuntime().availableProcessors();
96+
97+
if (limit > 0) {
98+
this.semaphore = new Semaphore(limit);
99+
}
100+
}
101+
this.initialized = true;
102+
}
103+
return semaphore;
104+
}
105+
}

0 commit comments

Comments
 (0)