Skip to content

Commit 1fd234b

Browse files
authored
Merge pull request #1567 from cshannon/AMQ-9824
AMQ-9824 - Cleanup code in KahaDB classes
2 parents 433c395 + b7184b4 commit 1fd234b

7 files changed

Lines changed: 683 additions & 927 deletions

File tree

activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java

Lines changed: 365 additions & 462 deletions
Large diffs are not rendered by default.

activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Lines changed: 285 additions & 426 deletions
Large diffs are not rendered by default.

activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -442,31 +442,28 @@ private void corruptBatchEndEof(int id) throws Exception{
442442
private void corruptOrderIndex(final int num, final int size) throws Exception {
443443
//This is because of AMQ-6097, now that the MessageOrderIndex stores the size in the Location,
444444
//we need to corrupt that value as well
445-
final KahaDBStore kahaDbStore = (KahaDBStore) ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore();
446-
kahaDbStore.indexLock.writeLock().lock();
445+
final KahaDBStore kahaDbStore = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore();
446+
kahaDbStore.indexLock.lock();
447447
try {
448-
kahaDbStore.pageFile.tx().execute(new Transaction.Closure<IOException>() {
449-
@Override
450-
public void execute(Transaction tx) throws IOException {
451-
StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert(
452-
(ActiveMQQueue)destination), tx);
453-
int i = 1;
454-
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
455-
Entry<Long, MessageKeys> entry = iterator.next();
456-
if (i == num) {
457-
//change the size value to the wrong size
458-
sd.orderIndex.get(tx, entry.getKey());
459-
MessageKeys messageKeys = entry.getValue();
460-
messageKeys.location.setSize(size);
461-
sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), entry.getKey(), messageKeys);
462-
break;
463-
}
464-
i++;
448+
kahaDbStore.pageFile.tx().execute(tx -> {
449+
StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert(
450+
(ActiveMQQueue)destination), tx);
451+
int i = 1;
452+
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
453+
Entry<Long, MessageKeys> entry = iterator.next();
454+
if (i == num) {
455+
//change the size value to the wrong size
456+
sd.orderIndex.get(tx, entry.getKey());
457+
MessageKeys messageKeys = entry.getValue();
458+
messageKeys.location.setSize(size);
459+
sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), entry.getKey(), messageKeys);
460+
break;
465461
}
462+
i++;
466463
}
467464
});
468465
} finally {
469-
kahaDbStore.indexLock.writeLock().unlock();
466+
kahaDbStore.indexLock.unlock();
470467
}
471468
}
472469

activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -164,22 +164,19 @@ private void corruptIndex() throws IOException {
164164

165165
//blow up the index
166166
try {
167-
store.indexLock.writeLock().lock();
168-
pageFile.tx().execute(new Transaction.Closure<IOException>() {
169-
@Override
170-
public void execute(Transaction tx) throws IOException {
171-
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
172-
.hasNext();) {
173-
Entry<String, StoredDestination> entry = iterator.next();
174-
entry.getValue().orderIndex.nextMessageId = -100;
175-
entry.getValue().orderIndex.defaultPriorityIndex.clear(tx);
176-
entry.getValue().orderIndex.lowPriorityIndex.clear(tx);
177-
entry.getValue().orderIndex.highPriorityIndex.clear(tx);
178-
}
167+
store.indexLock.lock();
168+
pageFile.tx().execute(tx -> {
169+
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
170+
.hasNext();) {
171+
Entry<String, StoredDestination> entry = iterator.next();
172+
entry.getValue().orderIndex.nextMessageId = -100;
173+
entry.getValue().orderIndex.defaultPriorityIndex.clear(tx);
174+
entry.getValue().orderIndex.lowPriorityIndex.clear(tx);
175+
entry.getValue().orderIndex.highPriorityIndex.clear(tx);
179176
}
180177
});
181178
} finally {
182-
store.indexLock.writeLock().unlock();
179+
store.indexLock.unlock();
183180
}
184181
}
185182

activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ public void forceCleanup() throws IOException {
6262

6363
public int getFileMapSize() throws IOException {
6464
// ensure save memory publishing, use the right lock
65-
indexLock.readLock().lock();
65+
indexLock.lock();
6666
try {
6767
return getJournal().getFileMap().size();
6868
} finally {
69-
indexLock.readLock().unlock();
69+
indexLock.unlock();
7070
}
7171
}
7272
}

activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@ public void forceCleanup() throws IOException {
6565

6666
public int getFileMapSize() throws IOException {
6767
// ensure save memory publishing, use the right lock
68-
indexLock.readLock().lock();
68+
indexLock.lock();
6969
try {
7070
return getJournal().getFileMap().size();
7171
} finally {
72-
indexLock.readLock().unlock();
72+
indexLock.unlock();
7373
}
7474
}
7575
}

activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void testLocationIndexMatchesOrderIndex() throws Exception {
6161

6262
//Iterate over the order index and add up the size of the messages to compare
6363
//to the location index
64-
kahaDbStore.indexLock.readLock().lock();
64+
kahaDbStore.indexLock.lock();
6565
try {
6666
long size = kahaDbStore.pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
6767
@Override
@@ -79,7 +79,7 @@ public Long execute(Transaction tx) throws IOException {
7979
assertEquals("Order index size values don't match message size",
8080
size, messageStore.getMessageSize());
8181
} finally {
82-
kahaDbStore.indexLock.readLock().unlock();
82+
kahaDbStore.indexLock.unlock();
8383
}
8484
}
8585

0 commit comments

Comments
 (0)