Skip to content

Commit 9140021

Browse files
author
Egor Kuts
committed
IGNITE-28505 prevent log flooding caused by TX operations when a node is stopping
1 parent 784e098 commit 9140021

21 files changed

Lines changed: 244 additions & 23 deletions

File tree

modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.ignite.internal.sql.engine.QueryProcessor;
4848
import org.apache.ignite.internal.table.IgniteTablesInternal;
4949
import org.apache.ignite.internal.tx.TxManager;
50+
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
5051
import org.apache.ignite.network.NetworkAddress;
5152
import org.jetbrains.annotations.Nullable;
5253
import org.junit.jupiter.api.TestInfo;
@@ -144,6 +145,7 @@ ClientHandlerModule start(TestInfo testInfo) {
144145
EventLog.NOOP,
145146
new TestLowWatermark(),
146147
Runnable::run,
148+
new PartitionOperationInFlightLimiter(0),
147149
() -> true
148150
);
149151

modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.apache.ignite.internal.table.IgniteTablesInternal;
7575
import org.apache.ignite.internal.tx.TxManager;
7676
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
77+
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
7778
import org.apache.ignite.lang.IgniteException;
7879
import org.jetbrains.annotations.Nullable;
7980
import org.jetbrains.annotations.TestOnly;
@@ -163,6 +164,8 @@ public class ClientHandlerModule implements IgniteComponent, PlatformComputeTran
163164

164165
private final Executor partitionOperationsExecutor;
165166

167+
private final PartitionOperationInFlightLimiter partitionOperationInFlightLimiter;
168+
166169
private final ConcurrentHashMap<String, CompletableFuture<PlatformComputeConnection>> computeExecutors = new ConcurrentHashMap<>();
167170

168171
@TestOnly
@@ -186,6 +189,7 @@ public class ClientHandlerModule implements IgniteComponent, PlatformComputeTran
186189
* @param eventLog Event log.
187190
* @param lowWatermark Low watermark.
188191
* @param partitionOperationsExecutor Executor for a partition operation.
192+
* @param partitionOperationInFlightLimiter In-flight limiter for partition operations.
189193
* @param ddlBatchingSuggestionEnabled Boolean supplier indicates whether the suggestion related DDL batching is enabled.
190194
*/
191195
public ClientHandlerModule(
@@ -207,6 +211,7 @@ public ClientHandlerModule(
207211
EventLog eventLog,
208212
LowWatermark lowWatermark,
209213
Executor partitionOperationsExecutor,
214+
PartitionOperationInFlightLimiter partitionOperationInFlightLimiter,
210215
Supplier<Boolean> ddlBatchingSuggestionEnabled
211216
) {
212217
assert igniteTables != null;
@@ -252,6 +257,7 @@ public ClientHandlerModule(
252257
this.clientConnectorConfiguration = clientConnectorConfiguration;
253258
this.ddlBatchingSuggestionEnabled = ddlBatchingSuggestionEnabled;
254259
this.partitionOperationsExecutor = partitionOperationsExecutor;
260+
this.partitionOperationInFlightLimiter = partitionOperationInFlightLimiter;
255261
}
256262

257263
/** {@inheritDoc} */
@@ -471,6 +477,7 @@ private ClientInboundMessageHandler createInboundMessageHandler(
471477
connectionId,
472478
primaryReplicaTracker,
473479
partitionOperationsExecutor,
480+
partitionOperationInFlightLimiter,
474481
SUPPORTED_FEATURES,
475482
Map.of(),
476483
computeExecutors::remove,

modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@
153153
import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
154154
import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
155155
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
156+
import org.apache.ignite.internal.lang.ReplicaOverloadedException;
156157
import org.apache.ignite.internal.logger.IgniteLogger;
157158
import org.apache.ignite.internal.logger.Loggers;
158159
import org.apache.ignite.internal.network.ClusterService;
@@ -180,6 +181,7 @@
180181
import org.apache.ignite.internal.tx.TransactionKilledException;
181182
import org.apache.ignite.internal.tx.TxManager;
182183
import org.apache.ignite.internal.util.ExceptionUtils;
184+
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
183185
import org.apache.ignite.lang.CancelHandle;
184186
import org.apache.ignite.lang.ErrorGroups.Compute;
185187
import org.apache.ignite.lang.ErrorGroups.Sql;
@@ -278,6 +280,8 @@ public class ClientInboundMessageHandler
278280

279281
private final Executor partitionOperationsExecutor;
280282

283+
private final PartitionOperationInFlightLimiter partitionOperationInFlightLimiter;
284+
281285
private final BitSet features;
282286

283287
private final Map<HandshakeExtension, Object> extensions;
@@ -309,6 +313,7 @@ public class ClientInboundMessageHandler
309313
* @param connectionId Connection ID.
310314
* @param primaryReplicaTracker Primary replica tracker.
311315
* @param partitionOperationsExecutor Partition operations executor.
316+
* @param partitionOperationInFlightLimiter In-flight limiter for partition operations.
312317
* @param features Features.
313318
* @param extensions Extensions.
314319
* @param eventLog Event log.
@@ -330,6 +335,7 @@ public ClientInboundMessageHandler(
330335
long connectionId,
331336
ClientPrimaryReplicaTracker primaryReplicaTracker,
332337
Executor partitionOperationsExecutor,
338+
PartitionOperationInFlightLimiter partitionOperationInFlightLimiter,
333339
BitSet features,
334340
Map<HandshakeExtension, Object> extensions,
335341
Function<String, CompletableFuture<PlatformComputeConnection>> computeConnectionFunc,
@@ -373,6 +379,7 @@ public ClientInboundMessageHandler(
373379
this.eventLog = eventLog;
374380
this.primaryReplicaTracker = primaryReplicaTracker;
375381
this.partitionOperationsExecutor = partitionOperationsExecutor;
382+
this.partitionOperationInFlightLimiter = partitionOperationInFlightLimiter;
376383
this.handshakeEventLoopSwitcher = handshakeEventLoopSwitcher;
377384

378385
jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources);
@@ -882,19 +889,28 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
882889
if (ClientOp.isPartitionOperation(opCode)) {
883890
long requestId0 = requestId;
884891
int opCode0 = opCode;
892+
if (!partitionOperationInFlightLimiter.tryAcquire()) {
893+
in.close();
885894

886-
partitionOperationsExecutor.execute(() -> {
887-
try {
888-
processOperationInternal(ctx, in, requestId0, opCode0, guard);
889-
} catch (Throwable t) {
890-
in.close();
891-
892-
writeError(requestId0, opCode0, t, ctx, false, guard);
895+
writeError(requestId0, opCode0, new ReplicaOverloadedException(), ctx, false, guard);
893896

894-
metrics.requestsFailedIncrement();
895-
metrics.requestsActiveDecrement();
896-
}
897-
});
897+
metrics.requestsFailedIncrement();
898+
} else {
899+
partitionOperationsExecutor.execute(() -> {
900+
try {
901+
processOperationInternal(ctx, in, requestId0, opCode0, guard);
902+
} catch (Throwable t) {
903+
in.close();
904+
905+
writeError(requestId0, opCode0, t, ctx, false, guard);
906+
907+
metrics.requestsFailedIncrement();
908+
metrics.requestsActiveDecrement();
909+
} finally {
910+
partitionOperationInFlightLimiter.release();
911+
}
912+
});
913+
}
898914
} else {
899915
processOperationInternal(ctx, in, requestId, opCode, guard);
900916
}

modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.ignite.internal.schema.AlwaysSyncedSchemaSyncService;
6666
import org.apache.ignite.internal.security.authentication.AuthenticationManager;
6767
import org.apache.ignite.internal.table.IgniteTablesInternal;
68+
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
6869
import org.apache.ignite.lang.IgniteException;
6970
import org.jetbrains.annotations.Nullable;
7071

@@ -272,6 +273,7 @@ protected void initChannel(Channel ch) {
272273
new TestLowWatermark()
273274
),
274275
Runnable::run,
276+
new PartitionOperationInFlightLimiter(0),
275277
features,
276278
randomExtensions(),
277279
unused -> null,

modules/client/src/test/java/org/apache/ignite/client/TestServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl;
8080
import org.apache.ignite.internal.security.configuration.SecurityConfiguration;
8181
import org.apache.ignite.internal.table.IgniteTablesInternal;
82+
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
8283
import org.apache.ignite.network.NetworkAddress;
8384
import org.jetbrains.annotations.Nullable;
8485
import org.mockito.Mockito;
@@ -290,6 +291,7 @@ public void log(String type, Supplier<Event> eventProvider) {
290291
EventLog.NOOP,
291292
new TestLowWatermark(),
292293
Runnable::run,
294+
new PartitionOperationInFlightLimiter(0),
293295
() -> true
294296
);
295297

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.lang;
19+
20+
import static org.apache.ignite.lang.ErrorGroups.Replicator.GROUP_OVERLOADED_ERR;
21+
22+
/**
23+
* Thrown when the node has reached the maximum number of in-flight partition operations
24+
* ({@code replication.maxInFlightPartitionOperations}) and cannot accept new requests.
25+
*/
26+
public class ReplicaOverloadedException extends IgniteInternalException {
27+
private static final long serialVersionUID = -6023736883539658779L;
28+
29+
/** Constructor. */
30+
public ReplicaOverloadedException() {
31+
super(GROUP_OVERLOADED_ERR, "Node is overloaded: max in-flight partition operations limit reached.");
32+
}
33+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.util;
19+
20+
import java.util.concurrent.Semaphore;
21+
import java.util.function.IntSupplier;
22+
import org.jetbrains.annotations.Nullable;
23+
24+
/**
25+
* Limits the number of in-flight partition operations (queued or executing) across the replica manager and thin-client connector.
26+
*
27+
* <p>When the limit is zero or less, all operations are permitted unconditionally.
28+
* When positive, {@link #tryAcquire()} returns {@code false} once the limit is reached and the caller should reject the request.
29+
* A permit must be released via {@link #release()} upon operation completes.
30+
*/
31+
public class PartitionOperationInFlightLimiter {
32+
private volatile Semaphore semaphore;
33+
34+
private final @Nullable IntSupplier limitSupplier;
35+
36+
private volatile boolean initialized;
37+
38+
/**
39+
* Constructor.
40+
*
41+
* @param maxInFlightPartitionOperations Max number of in-flight partition operations, or <= 0 to disable the limit.
42+
*/
43+
public PartitionOperationInFlightLimiter(int maxInFlightPartitionOperations) {
44+
this.semaphore = maxInFlightPartitionOperations <= 0 ? null : new Semaphore(maxInFlightPartitionOperations);
45+
this.limitSupplier = null;
46+
this.initialized = true;
47+
}
48+
49+
/**
50+
* Constructor that initializes the limit lazily from the given supplier on first use.
51+
* Allows lazy init on first usage.
52+
*
53+
* @param maxInFlightPartitionOperationsSupplier Supplier of the maximum number of in-flight partition operations, or 0 to disable.
54+
*/
55+
public PartitionOperationInFlightLimiter(@Nullable IntSupplier maxInFlightPartitionOperationsSupplier) {
56+
this.limitSupplier = maxInFlightPartitionOperationsSupplier;
57+
this.initialized = false;
58+
}
59+
60+
/**
61+
* Attempts to acquire a permit.
62+
*
63+
* @return {@code true} if a permit was acquired or the limit is disabled; {@code false} if the limit is reached.
64+
*/
65+
public boolean tryAcquire() {
66+
Semaphore s = resolvedSemaphore();
67+
return s == null || s.tryAcquire();
68+
}
69+
70+
/**
71+
* Releases a previously acquired permit.
72+
* Must only be called after a successful {@link #tryAcquire()} when the limit is enabled.
73+
*/
74+
public void release() {
75+
Semaphore s = resolvedSemaphore();
76+
77+
if (s != null) {
78+
s.release();
79+
}
80+
}
81+
82+
private @Nullable Semaphore resolvedSemaphore() {
83+
if (initialized) {
84+
return semaphore;
85+
}
86+
synchronized (this) {
87+
if (initialized) {
88+
return semaphore;
89+
}
90+
if (limitSupplier != null) {
91+
int limit = limitSupplier.getAsInt();
92+
93+
if (limit != 0) {
94+
this.semaphore = new Semaphore(limit);
95+
}
96+
}
97+
this.initialized = true;
98+
}
99+
return semaphore;
100+
}
101+
}

modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@
258258
import org.apache.ignite.internal.tx.message.TxMessageGroup;
259259
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
260260
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
261+
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
261262
import org.apache.ignite.internal.vault.VaultManager;
262263
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
263264
import org.apache.ignite.network.NetworkAddress;
@@ -1476,6 +1477,7 @@ private class Node {
14761477
Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class),
14771478
placementDriver,
14781479
threadPoolsManager.partitionOperationsExecutor(),
1480+
new PartitionOperationInFlightLimiter(0),
14791481
partitionIdleSafeTimePropagationPeriodMsSupplier,
14801482
new NoOpFailureManager(),
14811483
new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()),

modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@
195195
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
196196
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
197197
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
198+
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
198199
import org.apache.ignite.internal.vault.VaultManager;
199200
import org.apache.ignite.network.NetworkAddress;
200201
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
@@ -668,6 +669,7 @@ public CompletableFuture<Boolean> invoke(Condition condition, Operation success,
668669
Set.of(PartitionReplicationMessageGroup.class, TxMessageGroup.class),
669670
placementDriverManager.placementDriver(),
670671
threadPoolsManager.partitionOperationsExecutor(),
672+
new PartitionOperationInFlightLimiter(0),
671673
partitionIdleSafeTimePropagationPeriodMsSupplier,
672674
new NoOpFailureManager(),
673675
new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()),

modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxCleanupRecoveryRequestHandler.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.ignite.internal.failure.FailureProcessor;
3434
import org.apache.ignite.internal.lang.IgniteBiTuple;
3535
import org.apache.ignite.internal.lang.IgniteInternalException;
36+
import org.apache.ignite.internal.lang.NodeStoppingException;
3637
import org.apache.ignite.internal.logger.IgniteLogger;
3738
import org.apache.ignite.internal.logger.IgniteThrottledLogger;
3839
import org.apache.ignite.internal.logger.Loggers;
@@ -156,13 +157,14 @@ private CompletableFuture<?> callCleanup(TxMeta txMeta, UUID txId) {
156157
txMeta.commitTimestamp(),
157158
txId
158159
).exceptionally(throwable -> {
159-
throttledLog.warn(
160-
"Failed to cleanup transaction",
161-
"Failed to cleanup transaction {}.",
162-
throwable,
163-
formatTxInfo(txId, txManager)
164-
);
165-
160+
if (!hasCause(throwable, NodeStoppingException.class)) {
161+
throttledLog.warn(
162+
"Failed to cleanup transaction",
163+
"Failed to cleanup transaction {}.",
164+
throwable,
165+
formatTxInfo(txId, txManager)
166+
);
167+
}
166168
return null;
167169
});
168170
}

0 commit comments

Comments
 (0)