Skip to content

Commit 8b75180

Browse files
committed
GEODE-10417: Fix NullPointerException in WAN replication (#7845)
* GEODE-10417: Fix NullPointerException in WAN replication When the WAN group-transa$ction-events feature is enabled in a parallel gateway sender, it is possible to get a NullPointerException when retrieving events from the queue to complete a transaction if the event in the queue is null. If this situation is reached then the gateway sender dispatcher will not dispatch queue events anymore and therefore the WAN replication will not progress. This happens because the predicates that check if elements in the queue contain a transactionId are not protected against the event being null. A null check has been added before the predicates are invoked so that in case of a null event, the predicate is not invoked and the event is skipped from the checking. * GEODE-10417: Change assertEquals to assertThat
1 parent 3ada8fe commit 8b75180

3 files changed

Lines changed: 66 additions & 22 deletions

File tree

geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,10 @@ public List<Object> getElementsMatching(Predicate matchingPredicate, Predicate e
466466
List<Object> elementsMatching = new ArrayList<>();
467467
for (final Object key : eventSeqNumDeque) {
468468
Object object = optimalGet(key);
469+
if (object == null) {
470+
continue;
471+
}
472+
469473
if (matchingPredicate.test(object)) {
470474
elementsMatching.add(object);
471475
eventSeqNumDeque.remove(key);

geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueJUnitTest.java

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import static org.apache.geode.cache.Region.SEPARATOR;
1818
import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
1919
import static org.assertj.core.api.Assertions.assertThat;
20-
import static org.junit.Assert.assertEquals;
2120
import static org.mockito.Mockito.doReturn;
2221
import static org.mockito.Mockito.mock;
2322
import static org.mockito.Mockito.spy;
@@ -175,22 +174,65 @@ public void testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesAndSo
175174
List<Object> objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
176175
isLastEventInTransactionPredicate);
177176

178-
assertEquals(2, objects.size());
179-
assertEquals(objects, Arrays.asList(event1, event3));
177+
assertThat(objects.size()).isEqualTo(2);
178+
assertThat(objects).isEqualTo(Arrays.asList(event1, event3));
180179

181180
objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
182181
isLastEventInTransactionPredicate);
183-
assertEquals(1, objects.size());
184-
assertEquals(objects, Arrays.asList(event7));
182+
assertThat(objects.size()).isEqualTo(1);
183+
assertThat(objects).isEqualTo(Arrays.asList(event7));
185184

186185
hasTransactionIdPredicate =
187186
ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx2);
188187
objects = bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
189188
isLastEventInTransactionPredicate);
190-
assertEquals(2, objects.size());
191-
assertEquals(objects, Arrays.asList(event2, event4));
189+
assertThat(objects.size()).isEqualTo(2);
190+
assertThat(objects).isEqualTo(Arrays.asList(event2, event4));
192191
}
193192

193+
@Test
194+
public void testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesObjectReadNullDoesNotThrowException()
195+
throws ForceReattemptException {
196+
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
197+
198+
TransactionId tx1 = new TXId(null, 1);
199+
TransactionId tx2 = new TXId(null, 2);
200+
TransactionId tx3 = new TXId(null, 3);
201+
202+
GatewaySenderEventImpl event1 = createMockGatewaySenderEvent(1, tx1, false);
203+
GatewaySenderEventImpl eventNotInTransaction1 = createMockGatewaySenderEvent(2, null, false);
204+
GatewaySenderEventImpl event2 = createMockGatewaySenderEvent(3, tx2, false);
205+
GatewaySenderEventImpl event3 = null; // createMockGatewaySenderEvent(4, tx1, true);
206+
GatewaySenderEventImpl event4 = createMockGatewaySenderEvent(5, tx2, true);
207+
GatewaySenderEventImpl event5 = createMockGatewaySenderEvent(6, tx3, false);
208+
GatewaySenderEventImpl event6 = createMockGatewaySenderEvent(7, tx3, false);
209+
GatewaySenderEventImpl event7 = createMockGatewaySenderEvent(8, tx1, true);
210+
211+
this.bucketRegionQueue
212+
.cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII);
213+
214+
this.bucketRegionQueue.addToQueue(1L, event1);
215+
this.bucketRegionQueue.addToQueue(2L, eventNotInTransaction1);
216+
this.bucketRegionQueue.addToQueue(3L, event2);
217+
this.bucketRegionQueue.addToQueue(4L, event3);
218+
this.bucketRegionQueue.addToQueue(5L, event4);
219+
this.bucketRegionQueue.addToQueue(6L, event5);
220+
this.bucketRegionQueue.addToQueue(7L, event6);
221+
this.bucketRegionQueue.addToQueue(8L, event7);
222+
223+
Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
224+
ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx1);
225+
Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
226+
ParallelGatewaySenderQueue.getIsLastEventInTransactionPredicate();
227+
when(bucketRegionQueue.getValueInVMOrDiskWithoutFaultIn(4L)).thenReturn(null);
228+
List<Object> objects = this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
229+
isLastEventInTransactionPredicate);
230+
231+
assertThat(objects.size()).isEqualTo(2);
232+
assertThat(objects).isEqualTo(Arrays.asList(new Object[] {event1, event7}));
233+
}
234+
235+
194236
@Test
195237
public void testPeekedElementsArePossibleDuplicate()
196238
throws Exception {

geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueJUnitTest.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package org.apache.geode.internal.cache.wan.serial;
1616

1717
import static org.assertj.core.api.Assertions.assertThat;
18-
import static org.junit.Assert.assertEquals;
1918
import static org.mockito.ArgumentMatchers.anyBoolean;
2019
import static org.mockito.ArgumentMatchers.anyLong;
2120
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -111,9 +110,9 @@ public void peekGetsExtraEventsWhenMustGroupTransactionEventsAndNotAllEventsForT
111110
queue.setGroupTransactionEvents(true);
112111

113112
List<AsyncEvent<?, ?>> peeked = queue.peek(3, 100);
114-
assertEquals(4, peeked.size());
113+
assertThat(peeked.size()).isEqualTo(4);
115114
List<AsyncEvent<?, ?>> peekedAfter = queue.peek(3, 100);
116-
assertEquals(3, peekedAfter.size());
115+
assertThat(peekedAfter.size()).isEqualTo(3);
117116
}
118117

119118
@Test
@@ -146,7 +145,7 @@ public void peekGetsExtraEventsWhenMustGroupTransactionEventsAndNotAllEventsForT
146145
.when(queue).getElementsMatching(any(), any(), anyLong());
147146

148147
List<AsyncEvent<?, ?>> peeked = queue.peek(-1, 1);
149-
assertEquals(4, peeked.size());
148+
assertThat(peeked.size()).isEqualTo(4);
150149
}
151150

152151
@Test
@@ -155,11 +154,11 @@ public void peekDoesNotGetExtraEventsWhenNotMustGroupTransactionEventsAndNotAllE
155154
QUEUE_REGION, metaRegionFactory);
156155

157156
List<AsyncEvent<?, ?>> peeked = queue.peek(3, 100);
158-
assertEquals(3, peeked.size());
157+
assertThat(peeked.size()).isEqualTo(3);
159158
List<AsyncEvent<?, ?>> peekedAfter = queue.peek(3, 100);
160-
assertEquals(3, peekedAfter.size());
159+
assertThat(peekedAfter.size()).isEqualTo(3);
161160
peekedAfter = queue.peek(1, 100);
162-
assertEquals(1, peekedAfter.size());
161+
assertThat(peekedAfter.size()).isEqualTo(1);
163162
}
164163

165164
@Test
@@ -192,7 +191,7 @@ public void peekDoesNotGetExtraEventsWhenNotMustGroupTransactionEventsAndNotAllE
192191
.when(queue).getElementsMatching(any(), any(), anyLong());
193192

194193
List<AsyncEvent<?, ?>> peeked = queue.peek(-1, 1);
195-
assertEquals(3, peeked.size());
194+
assertThat(peeked.size()).isEqualTo(3);
196195
}
197196

198197
@Test
@@ -216,24 +215,23 @@ public void removeExtraPeekedEventDoesNotRemoveFromExtraPeekedIdsUntilPreviousEv
216215
QUEUE_REGION, metaRegionFactory);
217216
queue.setGroupTransactionEvents(true);
218217
List<AsyncEvent<?, ?>> peeked = queue.peek(3, -1);
219-
assertEquals(4, peeked.size());
218+
assertThat(peeked.size()).isEqualTo(4);
220219
assertThat(queue.getLastPeekedId()).isEqualTo(2);
221-
assertThat(queue.getExtraPeekedIds().contains(5L)).isTrue();
222-
220+
assertThat(queue.getExtraPeekedIds()).contains(5L);
223221

224222
for (Object ignored : peeked) {
225223
queue.remove();
226224
}
227-
assertThat(queue.getExtraPeekedIds().contains(5L)).isTrue();
225+
assertThat(queue.getExtraPeekedIds()).contains(5L);
228226

229227
peeked = queue.peek(3, -1);
230-
assertEquals(3, peeked.size());
231-
assertThat(queue.getExtraPeekedIds().contains(5L)).isTrue();
228+
assertThat(peeked.size()).isEqualTo(3);
229+
assertThat(queue.getExtraPeekedIds()).contains(5L);
232230

233231
for (Object ignored : peeked) {
234232
queue.remove();
235233
}
236-
assertThat(queue.getExtraPeekedIds().contains(5L)).isFalse();
234+
assertThat(queue.getExtraPeekedIds()).doesNotContain(5L);
237235
}
238236

239237
private GatewaySenderEventImpl createMockGatewaySenderEventImpl(int transactionId,

0 commit comments

Comments
 (0)