Skip to content

Commit 3ada8fe

Browse files
committed
GEODE-10403: Fix distributed deadlock with stop gw sender (#7830)
There is a distributed deadlock that can appear when stopping the gateway sender if a race condition happens in which the stop gateway sender command gets blocked indefinitely trying to get the size of the queue from remote peers (ParallelGatewaySenderQueue.size() call) and also one call to store one event in the queue tries to get the lifecycle lock (acquired by the gateway sender command). These two calls could get into a deadlock under heavy load and make the system unresponsive for any traffic request (get, put, ...). In order to avoid it, in the storage of the event in the gateway sender queue (AbstractGatewaySender.distribute() call), instead to trying to get the lifecycle lock without any timeout, a try with a timeout is added. If the try returns false it is checked if the gateway sender is running. If it is not running, the event is dropped and there is no need to get the lock. Otherwise, the lifecycle lock acquire is retried until it succeeds or the gateway sender is stopped.
1 parent 5ee9444 commit 3ada8fe

4 files changed

Lines changed: 133 additions & 21 deletions

File tree

geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterOperationExecutors.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
import java.util.concurrent.SynchronousQueue;
2626
import java.util.concurrent.ThreadPoolExecutor;
2727
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicInteger;
2829

2930
import org.apache.logging.log4j.Logger;
3031

3132
import org.apache.geode.CancelException;
3233
import org.apache.geode.InternalGemFireError;
3334
import org.apache.geode.SystemFailure;
35+
import org.apache.geode.annotations.internal.MutableForTesting;
3436
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
3537
import org.apache.geode.distributed.internal.membership.gms.messages.ViewAckMessage;
3638
import org.apache.geode.internal.logging.CoreLoggingExecutors;
@@ -167,6 +169,8 @@ public class ClusterOperationExecutors implements OperationExecutors {
167169

168170
private SerialQueuedExecutorPool serialQueuedExecutorPool;
169171

172+
@MutableForTesting
173+
public static final AtomicInteger maxPrThreadsForTest = new AtomicInteger(-1);
170174

171175
ClusterOperationExecutors(DistributionStats stats,
172176
InternalDistributedSystem system) {
@@ -252,10 +256,11 @@ public class ClusterOperationExecutors implements OperationExecutors {
252256
this::doWaitingThread, stats.getWaitingPoolHelper(),
253257
threadMonitor);
254258

255-
if (MAX_PR_THREADS > 1) {
259+
int maxPrThreads = maxPrThreadsForTest.get() > 0 ? maxPrThreadsForTest.get() : MAX_PR_THREADS;
260+
if (maxPrThreads > 1) {
256261
partitionedRegionPool =
257262
CoreLoggingExecutors.newThreadPoolWithFeedStatistics(
258-
MAX_PR_THREADS, INCOMING_QUEUE_LIMIT, stats.getPartitionedRegionQueueHelper(),
263+
maxPrThreads, INCOMING_QUEUE_LIMIT, stats.getPartitionedRegionQueueHelper(),
259264
"PartitionedRegion Message Processor",
260265
thread -> stats.incPartitionedRegionThreadStarts(), this::doPartitionRegionThread,
261266
stats.getPartitionedRegionPoolHelper(),

geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Set;
2626
import java.util.concurrent.ConcurrentLinkedQueue;
2727
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2829
import java.util.concurrent.locks.ReentrantReadWriteLock;
2930

3031
import org.apache.logging.log4j.Logger;
@@ -238,6 +239,9 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di
238239

239240
protected boolean enforceThreadsConnectSameReceiver;
240241

242+
@MutableForTesting
243+
public static final AtomicBoolean doSleepForTestingInDistribute = new AtomicBoolean(false);
244+
241245
protected AbstractGatewaySender() {
242246
statisticsClock = disabledClock();
243247
}
@@ -1122,16 +1126,17 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
11221126
}
11231127

11241128
// If this gateway is not running, return
1125-
if (!isRunning()) {
1126-
if (isPrimary()) {
1127-
recordDroppedEvent(clonedEvent);
1128-
}
1129-
if (isDebugEnabled) {
1130-
logger.debug("Returning back without putting into the gateway sender queue:" + event);
1131-
}
1129+
if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) {
11321130
return;
11331131
}
11341132

1133+
if (AbstractGatewaySender.doSleepForTestingInDistribute.get()) {
1134+
try {
1135+
Thread.sleep(5);
1136+
} catch (InterruptedException e) {
1137+
e.printStackTrace();
1138+
}
1139+
}
11351140
if (!getLifeCycleLock().readLock().tryLock()) {
11361141
synchronized (queuedEventsSync) {
11371142
if (!enqueuedAllTempQueueEvents) {
@@ -1148,19 +1153,22 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
11481153
}
11491154
}
11501155
if (enqueuedAllTempQueueEvents) {
1151-
getLifeCycleLock().readLock().lock();
1156+
try {
1157+
while (!getLifeCycleLock().readLock().tryLock(10, TimeUnit.MILLISECONDS)) {
1158+
if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) {
1159+
return;
1160+
}
1161+
}
1162+
} catch (InterruptedException e) {
1163+
Thread.currentThread().interrupt();
1164+
return;
1165+
}
11521166
}
11531167
}
11541168
try {
11551169
// If this gateway is not running, return
11561170
// The sender may have stopped, after we have checked the status in the beginning.
1157-
if (!isRunning()) {
1158-
if (isDebugEnabled) {
1159-
logger.debug("Returning back without putting into the gateway sender queue:" + event);
1160-
}
1161-
if (isPrimary()) {
1162-
recordDroppedEvent(clonedEvent);
1163-
}
1171+
if (!getIsRunningAndDropEventIfNotRunning(event, isDebugEnabled, clonedEvent)) {
11641172
return;
11651173
}
11661174

@@ -1205,6 +1213,20 @@ this, getId(), operation, clonedEvent),
12051213
}
12061214
}
12071215

1216+
private boolean getIsRunningAndDropEventIfNotRunning(EntryEventImpl event, boolean isDebugEnabled,
1217+
EntryEventImpl clonedEvent) {
1218+
if (isRunning()) {
1219+
return true;
1220+
}
1221+
if (isPrimary()) {
1222+
recordDroppedEvent(clonedEvent);
1223+
}
1224+
if (isDebugEnabled) {
1225+
logger.debug("Returning back without putting into the gateway sender queue:" + event);
1226+
}
1227+
return false;
1228+
}
1229+
12081230
private void recordDroppedEvent(EntryEventImpl event) {
12091231
if (eventProcessor != null) {
12101232
eventProcessor.registerEventDroppedInPrimaryQueue(event);

geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -917,6 +917,13 @@ public static void createManagementCache(Integer locPort) {
917917
createCache(true, locPort);
918918
}
919919

920+
public static void createCacheConserveSocketsInVMs(Boolean conserveSockets, Integer locPort,
921+
VM... vms) {
922+
for (VM vm : vms) {
923+
vm.invoke(() -> createCacheConserveSockets(conserveSockets, locPort));
924+
}
925+
}
926+
920927
public static void createCacheConserveSockets(Boolean conserveSockets, Integer locPort) {
921928
WANTestBase test = new WANTestBase();
922929
Properties props = test.getDistributedSystemProperties();

geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDistributedTest.java

Lines changed: 82 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.geode.cache.wan.GatewayEventFilter;
6060
import org.apache.geode.cache.wan.GatewaySender;
6161
import org.apache.geode.distributed.internal.ClusterDistributionManager;
62+
import org.apache.geode.distributed.internal.ClusterOperationExecutors;
6263
import org.apache.geode.distributed.internal.DistributionMessage;
6364
import org.apache.geode.distributed.internal.DistributionMessageObserver;
6465
import org.apache.geode.internal.cache.BucketRegion;
@@ -346,6 +347,66 @@ public void testParallelPropagationSenderStop() {
346347
vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 100));
347348
}
348349

350+
/**
351+
* Verifies that no distributed deadlock occurs when stopping a gateway sender while receiving
352+
* traffic.
353+
* The distributed deadlock may occur when the gateway sender tries to get the
354+
* size of the gateway sender queue (sending a size message to other members) while holding the
355+
* lifeCycleLock lock. This lock is also taken when an event is to be distributed by the gateway
356+
* sender.
357+
* As this issue has only been observed in the field with a lot of traffic, in order to reproduce
358+
* it in a test case, conserve-sockets is set to true (although the deadlock has also
359+
* been seen with conserve-sockets=false), the size of the PartitionedRegion thread pool is set
360+
* to a small value and an artificial timeout is added at a point in the distribute() call
361+
* of the AbstractGatewaySeder class.
362+
*/
363+
@Test
364+
public void testNoDistributedDeadlockWithGatewaySenderStop() throws Exception {
365+
addIgnoredException("Broken pipe");
366+
Integer[] locatorPorts = createLNAndNYLocators();
367+
Integer lnPort = locatorPorts[0];
368+
Integer nyPort = locatorPorts[1];
369+
VM[] senders = {vm4, vm5, vm6, vm7};
370+
try {
371+
for (VM sender : senders) {
372+
sender.invoke(() -> AbstractGatewaySender.doSleepForTestingInDistribute.set(true));
373+
sender.invoke(() -> ClusterOperationExecutors.maxPrThreadsForTest.set(2));
374+
}
375+
vm2.invoke(() -> ClusterOperationExecutors.maxPrThreadsForTest.set(2));
376+
vm3.invoke(() -> ClusterOperationExecutors.maxPrThreadsForTest.set(2));
377+
378+
createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true, true);
379+
380+
// make sure all the senders are running before doing any puts
381+
waitForSendersRunning();
382+
383+
// Send a fairly big amount of operations to provoke the deadlock
384+
int invocationsPerServer = 4;
385+
AsyncInvocation[] invocations = new AsyncInvocation[senders.length * invocationsPerServer];
386+
for (int i = 0; i < senders.length; i++) {
387+
for (int j = 0; j < invocationsPerServer; j++) {
388+
invocations[i + (j * invocationsPerServer)] =
389+
senders[i].invokeAsync(() -> doPuts(getUniqueName() + "_PR", 100));
390+
}
391+
}
392+
393+
// Wait for some elements to be replicated before stopping the senders
394+
for (int i = 0; i < senders.length; i++) {
395+
senders[i].invoke(() -> await()
396+
.untilAsserted(() -> assertThat(getSenderStats("ln", -1).get(3)).isGreaterThan(1)));
397+
}
398+
399+
stopSendersAsync();
400+
for (int i = 0; i < invocations.length; i++) {
401+
invocations[i].await();
402+
}
403+
} finally {
404+
for (int i = 0; i < senders.length; i++) {
405+
senders[i].invoke(() -> AbstractGatewaySender.doSleepForTestingInDistribute.set(false));
406+
}
407+
}
408+
}
409+
349410
/**
350411
* Normal scenario in which a sender is stopped and then started again.
351412
*/
@@ -1271,7 +1332,13 @@ private void clearShadowBucketRegions(PartitionedRegion shadowRegion) {
12711332

12721333
private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort,
12731334
boolean createAccessors, boolean startSenders) {
1274-
createSendersAndReceivers(lnPort, nyPort);
1335+
createSendersReceiversAndPartitionedRegion(lnPort, nyPort, createAccessors, startSenders,
1336+
false);
1337+
}
1338+
1339+
private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort,
1340+
boolean createAccessors, boolean startSenders, boolean conserveSockets) {
1341+
createSendersAndReceivers(lnPort, nyPort, conserveSockets);
12751342

12761343
createPartitionedRegions(createAccessors);
12771344

@@ -1280,11 +1347,11 @@ private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer
12801347
}
12811348
}
12821349

1283-
private void createSendersAndReceivers(Integer lnPort, Integer nyPort) {
1284-
createCacheInVMs(nyPort, vm2, vm3);
1350+
private void createSendersAndReceivers(Integer lnPort, Integer nyPort, boolean conserveSockets) {
1351+
createCacheConserveSocketsInVMs(conserveSockets, nyPort, vm2, vm3);
12851352
createReceiverInVMs(vm2, vm3);
12861353

1287-
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
1354+
createCacheConserveSocketsInVMs(conserveSockets, lnPort, vm4, vm5, vm6, vm7);
12881355

12891356
vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
12901357
vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
@@ -1578,6 +1645,17 @@ private void stopSenders() {
15781645
vm7.invoke(() -> stopSender("ln"));
15791646
}
15801647

1648+
private void stopSendersAsync() throws InterruptedException {
1649+
AsyncInvocation inv1 = vm4.invokeAsync(() -> stopSender("ln"));
1650+
AsyncInvocation inv2 = vm5.invokeAsync(() -> stopSender("ln"));
1651+
AsyncInvocation inv3 = vm6.invokeAsync(() -> stopSender("ln"));
1652+
AsyncInvocation inv4 = vm7.invokeAsync(() -> stopSender("ln"));
1653+
inv1.await();
1654+
inv2.await();
1655+
inv3.await();
1656+
inv4.await();
1657+
}
1658+
15811659
private void waitForSendersRunning() {
15821660
vm4.invoke(() -> waitForSenderRunningState("ln"));
15831661
vm5.invoke(() -> waitForSenderRunningState("ln"));

0 commit comments

Comments
 (0)