Skip to content

Commit 337768c

Browse files
author
Sumedh Wale
committed
fixed race condition in old entries cleaner thread deleting in-use snapshot entries
- use a lock for entire duration of getting the active transactions as well as checking if an entry is in-use by a transaction instead of just taking a snapshot of the active transactions; this fixes a race condition where the old entries cleaner thread deleting in-use entries by an active transaction started just after the active transaction list was obtained - fixed assertion error in changePassword when the given password or old password is null - fixed occasional failures in BugsDUnit, ClientServer2DUnit, MVCCDUnit and HeapThresholdDUnit - fixed occasional failures in Bugs3Test when run in the full parallel suite - workaround occasional failure in StatementStatsDUnit since those stats are never used by SnappyData
1 parent 5043783 commit 337768c

13 files changed

Lines changed: 153 additions & 65 deletions

File tree

gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -888,18 +888,20 @@ class OldEntriesCleanerThread implements Runnable {
888888
*/
889889
private long refreshRunningTXs(ArrayList<TXState> runningTXs, long txListVersion) {
890890
final TXManagerImpl txMgr = getTxManager();
891-
final long newTxListVersion = txMgr.hostTXStatesVersion.get();
892-
if (txListVersion != newTxListVersion) {
893-
if (txListVersion >= 0) runningTXs.clear();
894-
Collection<TXStateProxy> txProxies = getTxManager().getHostedTransactionsInProgress();
895-
for (TXStateProxy txProxy : txProxies) {
896-
TXState txState = txProxy.getLocalTXState();
897-
if (txState != null && txState.isSnapshot() && !txState.isClosed()) {
898-
runningTXs.add(txState);
891+
synchronized (txMgr.hostedTXStatesLock) {
892+
final long newTxListVersion = txMgr.getHostedTXStateVersion();
893+
if (txListVersion != newTxListVersion) {
894+
if (txListVersion >= 0) runningTXs.clear();
895+
Collection<TXStateProxy> txProxies = txMgr.getHostedTransactionsInProgress();
896+
for (TXStateProxy txProxy : txProxies) {
897+
TXState txState = txProxy.getLocalTXState();
898+
if (txState != null && txState.isSnapshot() && !txState.isClosed()) {
899+
runningTXs.add(txState);
900+
}
899901
}
900902
}
903+
return newTxListVersion;
901904
}
902-
return newTxListVersion;
903905
}
904906

905907
public void run() {
@@ -931,11 +933,16 @@ public void run() {
931933
} else {
932934
txListVersion = refreshRunningTXs(runningTXs, txListVersion);
933935
if (notRequired(region, re, null, runningTXs)) {
936+
if (SNAPSHOT_DEBUG || getLoggerI18n().fineEnabled()) {
937+
getLoggerI18n().info(LocalizedStrings.DEBUG,
938+
"OldEntriesCleanerThread: Removing the entry " + re);
939+
}
934940
removeEntry(regionEntryMap, re, region);
935941
}
936942
}
937943
} else {
938-
BlockingQueue<RegionEntry> oldEntriesQueue = (BlockingQueue<RegionEntry>) oldEntries;
944+
@SuppressWarnings("unchecked")
945+
BlockingQueue<RegionEntry> oldEntriesQueue = (BlockingQueue<RegionEntry>)oldEntries;
939946
for (RegionEntry re : oldEntriesQueue) {
940947
// update in progress guards against the race where oldEntry and
941948
// entry in region have same version for brief period
@@ -946,7 +953,7 @@ public void run() {
946953
if (notRequired(region, re, oldEntriesQueue, runningTXs)) {
947954
if (SNAPSHOT_DEBUG || getLoggerI18n().fineEnabled()) {
948955
getLoggerI18n().info(LocalizedStrings.DEBUG,
949-
"OldEntriesCleanerThread : Removing the entry " + re);
956+
"OldEntriesCleanerThread: Removing the entry from queue " + re);
950957
}
951958
// continue if some explicit call removed the entry
952959
if (!oldEntriesQueue.remove(re)) continue;
@@ -1160,8 +1167,9 @@ public Object removeValue(Object key, Object value, Object existingValue,
11601167
if (value != null
11611168
&& (value == NO_OBJECT_TOKEN || ((existingValue == value)
11621169
&& ((BlockingQueue)existingValue).isEmpty()))) {
1163-
if (getInstance().getLoggerI18n().fineEnabled()) {
1164-
getInstance().getLoggerI18n().info(LocalizedStrings.DEBUG, "Removing queue for key " + key);
1170+
final GemFireCacheImpl cache = getInstance();
1171+
if (cache != null && cache.getLoggerI18n().fineEnabled()) {
1172+
cache.getLoggerI18n().info(LocalizedStrings.DEBUG, "Removing queue for key " + key);
11651173
}
11661174
return null;
11671175
}

gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java

Lines changed: 56 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.concurrent.ConcurrentHashMap;
2323
import java.util.concurrent.ConcurrentLinkedQueue;
2424
import java.util.concurrent.TimeUnit;
25-
import java.util.concurrent.atomic.AtomicLong;
2625
import java.util.concurrent.atomic.AtomicReference;
2726
import java.util.concurrent.locks.LockSupport;
2827
import javax.transaction.Transaction;
@@ -116,9 +115,10 @@ public final class TXManagerImpl implements CacheTransactionManager,
116115

117116
private boolean closed = false;
118117

119-
// whenever the hostedTXStates is changed, the hostTXStatesVersion must be incremented
118+
// whenever the hostedTXStates is changed, the hostedTXStatesVersion must be incremented
120119
private final CustomEntryConcurrentHashMap<TXId, TXStateProxy> hostedTXStates;
121-
final AtomicLong hostTXStatesVersion;
120+
final Object hostedTXStatesLock = new Object();
121+
private long hostedTXStatesVersion;
122122

123123
private static final ConcurrentTHashSet<TXContext> hostedTXContexts =
124124
new ConcurrentTHashSet<>(16, 128);
@@ -465,10 +465,9 @@ private final TXStateProxy newTXStateProxy(final TXId txId,
465465
final IsolationLevel isolationLevel, final boolean isJTA,
466466
final EnumSet<TransactionFlag> flags, final boolean initLocalTXState,
467467
final MapResult result) {
468-
TXStateProxy proxy = GemFireCacheImpl.FactoryStatics.txStateProxyFactory.newTXStateProxy(
468+
incrementHostedTXStateVersion();
469+
return GemFireCacheImpl.FactoryStatics.txStateProxyFactory.newTXStateProxy(
469470
this, txId, isolationLevel, isJTA, flags, initLocalTXState);
470-
this.hostTXStatesVersion.getAndIncrement();
471-
return proxy;
472471
}
473472

474473
/**
@@ -553,7 +552,7 @@ public final Object removeValue(final Object key, Object value,
553552
txId.shortToString() + ": removing from hosted list with commit="
554553
+ commit);
555554
}
556-
hostTXStatesVersion.getAndIncrement();
555+
incrementHostedTXStateVersion();
557556
// always remove from map at this point; callback is only to atomically
558557
// add to finishedTXStates in postRemove
559558
return null;
@@ -830,7 +829,7 @@ public TXManagerImpl(CachePerfStats cachePerfStats, LogWriterI18n logWriter,
830829
this.hostedTXStates = new CustomEntryConcurrentHashMap<>(
831830
128, CustomEntryConcurrentHashMap.DEFAULT_LOAD_FACTOR,
832831
TXMAP_CONCURRENCY);
833-
this.hostTXStatesVersion = new AtomicLong();
832+
this.hostedTXStatesVersion = 0;
834833
this.suspendedTXs = new ConcurrentHashMap<TXId, TXStateInterface>();
835834
this.finishedTXStates = new TXFinishedMap(cache.getDistributedSystem(),
836835
cache.getCancelCriterion());
@@ -993,12 +992,15 @@ public final TXStateProxy beginTX(final TXManagerImpl.TXContext context,
993992
txId = TXId.newTXId(this.cache);
994993
}
995994

996-
final TXStateProxy txState = this.hostedTXStates.create(txId,
997-
txStateProxyCreator, isolationLevel, txFlags, false);
998-
context.setTXState(txState);
999-
// For snapshot isolation, create tx state at the beginning
1000-
if (txState.isSnapshot()) {
1001-
txState.getTXStateForRead();
995+
final TXStateProxy txState;
996+
synchronized (this.hostedTXStatesLock) {
997+
txState = this.hostedTXStates.create(txId,
998+
txStateProxyCreator, isolationLevel, txFlags, false);
999+
context.setTXState(txState);
1000+
// For snapshot isolation, create tx state at the beginning
1001+
if (txState.isSnapshot()) {
1002+
txState.getTXStateForRead();
1003+
}
10021004
}
10031005

10041006
return txState;
@@ -1025,8 +1027,10 @@ public final TXStateProxy resumeTX(final TXManagerImpl.TXContext context,
10251027
if (txState != null) {
10261028
return txState;
10271029
}
1028-
txState = this.hostedTXStates.create(txId,
1029-
txStateProxyCreator, isolationLevel, txFlags, false);
1030+
synchronized (this.hostedTXStatesLock) {
1031+
txState = this.hostedTXStates.create(txId,
1032+
txStateProxyCreator, isolationLevel, txFlags, false);
1033+
}
10301034
// context.setTXState(txState);
10311035
return txState;
10321036
}
@@ -1041,8 +1045,11 @@ public TXStateProxy beginJTA() {
10411045
checkClosed();
10421046
final TXContext context = getOrCreateTXContext();
10431047
final TXId txId = TXId.newTXId(this.cache);
1044-
final TXStateProxy txState = this.hostedTXStates.create(txId,
1045-
txStateJTACreator, IsolationLevel.DEFAULT, null, false);
1048+
final TXStateProxy txState;
1049+
synchronized (this.hostedTXStatesLock) {
1050+
txState = this.hostedTXStates.create(txId,
1051+
txStateJTACreator, IsolationLevel.DEFAULT, null, false);
1052+
}
10461053

10471054
context.setTXState(txState);
10481055
return txState;
@@ -1690,6 +1697,24 @@ public final void unmasquerade(final TXContext context,
16901697
}
16911698
}
16921699

1700+
/**
1701+
* For package-internal use only.
1702+
* Caller should hold the lock on {@link #hostedTXStatesLock}.
1703+
*/
1704+
final void incrementHostedTXStateVersion() {
1705+
assert Thread.holdsLock(this.hostedTXStatesLock);
1706+
this.hostedTXStatesVersion++;
1707+
}
1708+
1709+
/**
1710+
* For package-internal use only.
1711+
* Caller should hold the lock on {@link #hostedTXStatesLock}.
1712+
*/
1713+
final long getHostedTXStateVersion() {
1714+
assert Thread.holdsLock(this.hostedTXStatesLock);
1715+
return this.hostedTXStatesVersion;
1716+
}
1717+
16931718
/**
16941719
* Cleanup the remote txState after commit and rollback
16951720
* @param txId
@@ -1703,15 +1728,18 @@ public final TXStateProxy removeHostedTXState(TXId txId, Boolean commit) {
17031728
* Cleanup the remote txState subject to given condition atomically during
17041729
* remove.
17051730
*
1706-
* NOTE: the provided MapCallback should increment the {@link TXManagerImpl#hostTXStatesVersion}
1731+
* NOTE: the provided MapCallback should increment the {@link TXManagerImpl#hostedTXStatesVersion}
17071732
* when removal is given a go ahead (i.e. just before removeValue returns null).
17081733
*/
17091734
public final <C, P> TXStateProxy removeHostedTXState(TXId txId,
17101735
MapCallback<TXId, TXStateProxy, C, P> condition, C context,
17111736
P removeParams) {
1712-
final TXStateProxy tx = this.hostedTXStates.remove(txId, condition,
1713-
context, removeParams);
1714-
getCache().removeTXId(txId);
1737+
final TXStateProxy tx;
1738+
synchronized (this.hostedTXStatesLock) {
1739+
tx = this.hostedTXStates.remove(txId, condition,
1740+
context, removeParams);
1741+
getCache().removeTXId(txId);
1742+
}
17151743
if (tx != null) {
17161744
if (TXStateProxy.LOG_FINE) {
17171745
getLogger().info(LocalizedStrings.DEBUG, "TX removed: " + tx);
@@ -2013,8 +2041,12 @@ public TXStateProxy getHostedTXState(TXId txId) {
20132041

20142042
public TXStateProxy getOrCreateHostedTXState(final TXId txId,
20152043
final LockingPolicy lockingPolicy, final boolean checkFinishTX) {
2016-
return this.hostedTXStates.create(txId,
2017-
txStateCreator, lockingPolicy, checkFinishTX, true);
2044+
final TXStateProxy proxy;
2045+
synchronized (this.hostedTXStatesLock) {
2046+
proxy = this.hostedTXStates.create(txId,
2047+
txStateCreator, lockingPolicy, checkFinishTX, true);
2048+
}
2049+
return proxy;
20182050
}
20192051

20202052
/**

gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ public final Object removeValue(final Object key, Object value,
562562
txProxy.txId.shortToString()
563563
+ ": removing from hosted list since it is empty");
564564
}
565-
txProxy.getTxMgr().hostTXStatesVersion.getAndIncrement();
565+
txProxy.getTxMgr().incrementHostedTXStateVersion();
566566
return null;
567567
}
568568
else {
@@ -3478,7 +3478,6 @@ public final TXState getTXStateForRead() {
34783478
}
34793479
}
34803480

3481-
34823481
private final TXState createTXState(boolean checkTX) {
34833482
final TXState localState;
34843483
this.lock.lock();

gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/db/FabricDatabase.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -543,9 +543,9 @@ synchronized public void postCreate(
543543
}
544544
} catch (Throwable t) {
545545
try {
546-
547546
if (logger != null) {
548-
logger.warning("got throwable: " + t.getMessage() + " calling shut down", t);
547+
logger.warning("FabricDatabase.postCreate got throwable: " + t.getMessage() +
548+
" calling shut down", t);
549549
}
550550
Monitor.getMonitor().shutdown();
551551
} catch (CancelException ce) {
@@ -575,7 +575,6 @@ synchronized public void postCreate(
575575
Attribute.GFXD_DBNAME);
576576
}
577577

578-
579578
for (GfxdSystemProcedureMessage msg : postMsgs) {
580579
if (msg.getSysProcMethod().isOffHeapMethod()
581580
&& this.memStore.getGemFireCache().getOffHeapStore() == null) {

gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/ddl/catalog/messages/GfxdSystemProcedureMessage.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,8 +315,8 @@ public void processMessage(Object[] params, DistributedMember sender)
315315
throws StandardException {
316316

317317
assert String.class.isInstance(params[0])
318-
&& String.class.isInstance(params[1])
319-
&& String.class.isInstance(params[2]);
318+
&& (String.class.isInstance(params[1]) || params[1] == null)
319+
&& (String.class.isInstance(params[2]) || params[2] == null);
320320

321321
String key = (String)params[0];
322322
String value = (String)params[2];
@@ -1929,6 +1929,8 @@ String getSQLStatement(Object[] params) throws StandardException {
19291929
},
19301930
;
19311931

1932+
static final SysProcMethod[] values = values();
1933+
19321934
static final int[][] tableActions = new int[][] {
19331935
{ TablePrivilegeInfo.SELECT_ACTION, Authorizer.SELECT_PRIV },
19341936
{ TablePrivilegeInfo.INSERT_ACTION, Authorizer.INSERT_PRIV },
@@ -2195,7 +2197,7 @@ public void fromData(DataInput in)
21952197
this.connId = in.readLong();
21962198
this.ddlId = in.readLong();
21972199
int ordinal = in.readByte();
2198-
this.procMethod = SysProcMethod.values()[ordinal];
2200+
this.procMethod = SysProcMethod.values[ordinal];
21992201
this.params = this.procMethod.readParams(in, flags);
22002202
this.initialDDLReplayInProgress =
22012203
(flags & INITIAL_DDL_REPLAY_IN_PROGRESS) != 0;

gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/BackwardCompatabilityTestBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.gemstone.gemfire.internal.shared.NativeCalls;
3535
import com.pivotal.gemfirexd.BackwardCompatabilityDUnit.ClientRun;
3636
import com.pivotal.gemfirexd.BackwardCompatabilityDUnit.ProductClient;
37+
import com.pivotal.gemfirexd.internal.engine.Misc;
3738
import com.pivotal.gemfirexd.tools.GfxdUtilLauncher;
3839
import io.snappydata.test.dunit.Host;
3940
import io.snappydata.test.dunit.SerializableCallable;
@@ -122,7 +123,7 @@ public void setUp() throws Exception {
122123
@Override
123124
public void tearDown2() throws Exception {
124125
System.clearProperty("gemfirexd.thrift-default");
125-
ClientSharedUtils.setThriftDefault(true);
126+
ClientSharedUtils.setThriftDefault(false);
126127
super.tearDown2();
127128
final String workingDir = getSysDirName();
128129
if (currentListIdx >= 0 && currentVersIdx >= 0) {

gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/ClientServer2DUnit.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ public void testInitialScripts() throws Exception {
266266
} catch (RMIException ex) {
267267
if (ex.getCause() instanceof SQLException) {
268268
SQLException sqlEx = (SQLException)ex.getCause();
269-
if (!"XJ040".equals(sqlEx.getSQLState())) {
269+
if (!"XJ040".equals(sqlEx.getSQLState()) && !"42X05".equals(sqlEx.getSQLState())) {
270270
throw ex;
271271
} else {
272272
// Explicitly delete the newly timestamped persistent file.
@@ -303,7 +303,7 @@ public void testInitialScripts() throws Exception {
303303
} catch (RMIException ex) {
304304
if (ex.getCause() instanceof SQLException) {
305305
SQLException sqlEx = (SQLException)ex.getCause();
306-
if (!"XJ040".equals(sqlEx.getSQLState())) {
306+
if (!"XJ040".equals(sqlEx.getSQLState()) && !"42X05".equals(sqlEx.getSQLState())) {
307307
throw ex;
308308
} else {
309309
// Explicitly delete the newly timestamped persistent file.

gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/ClientServerDUnit.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import com.pivotal.gemfirexd.internal.engine.GemFireXDQueryObserverAdapter;
4949
import com.pivotal.gemfirexd.internal.engine.GemFireXDQueryObserverHolder;
5050
import com.pivotal.gemfirexd.internal.engine.GfxdConstants;
51+
import com.pivotal.gemfirexd.internal.engine.Misc;
5152
import com.pivotal.gemfirexd.internal.engine.ddl.resolver.GfxdPartitionByExpressionResolver;
5253
import com.pivotal.gemfirexd.internal.engine.jdbc.GemFireXDRuntimeException;
5354
import com.pivotal.gemfirexd.internal.engine.store.GemFireStore;
@@ -1834,6 +1835,7 @@ public void testNetworkClientFailoverWithCurrentSchemaSetting() throws Exception
18341835
*/
18351836
public void testNetworkClientLoadBalancing() throws Exception {
18361837
// always use thrift for this test
1838+
final int numVMs = Host.getHost(0).getVMCount();
18371839
SerializableRunnable setThrift = new SerializableRunnable() {
18381840
@Override
18391841
public void run() {
@@ -1843,7 +1845,7 @@ public void run() {
18431845
}
18441846
};
18451847
setThrift.run();
1846-
for (int i = 0; i <= 3; i++) {
1848+
for (int i = 0; i < numVMs; i++) {
18471849
Host.getHost(0).getVM(i).invoke(setThrift);
18481850
}
18491851
try {
@@ -1858,7 +1860,7 @@ public void run() {
18581860
}
18591861
};
18601862
resetThrift.run();
1861-
for (int i = 0; i <= 3; i++) {
1863+
for (int i = 0; i < numVMs; i++) {
18621864
Host.getHost(0).getVM(i).invoke(resetThrift);
18631865
}
18641866
}

gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/ddl/BugsDUnit.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2907,7 +2907,6 @@ public void testBug47289() throws Exception {
29072907
for (int i = 1; i < 20; ++i) {
29082908
psSec.setInt(1, i);
29092909
psSec.setString(2, getSymbol(1, 6) + "_" + i);
2910-
psSec.setString(2, getSymbol(1, 8));
29112910
psSec.setString(3, exchanges[i % 7]);
29122911
psSec.setInt(4, 50);
29132912
psSec.executeUpdate();
@@ -2935,20 +2934,18 @@ public void testBug47289() throws Exception {
29352934
st.execute("alter table trade.companies add constraint comp_fk foreign key (symbol, exchange) "
29362935
+ "references trade.securities (symbol, exchange) on delete restrict");
29372936
fail("FK constraint addition should fail");
2938-
}catch(SQLException sqle) {
2939-
if(sqle.getSQLState().equals("0A000")) {
2937+
} catch (SQLException sqle) {
2938+
if (sqle.getSQLState().equals("0A000") || sqle.getSQLState().equals("23505")) {
29402939
//Ok
2941-
}else {
2940+
} else {
29422941
throw sqle;
29432942
}
29442943
}
29452944
} finally {
29462945
TestUtil.shutDown();
29472946
}
2948-
29492947
}
29502948

2951-
29522949
public void testBug47289_1() throws Exception {
29532950

29542951
// Start two server VMs

0 commit comments

Comments
 (0)