Skip to content

Commit 34ec6a4

Browse files
authored
[Fix] ConcurrentLongHashMap throw ArrayIndexOutOfBoundsException (apache#4771)
* fix ConcurrentLongHashMap throw ArrayIndexOutOfBoundsException When concurrent read write access the map, The key array and value array are not publish at the same time when shrink or expand. This fix encapsulate the key and value in the same field to avoid this happen * fix ConcurrentLongHashMap throw ArrayIndexOutOfBoundsException When concurrent read write access the map, The key array and value array are not publish at the same time when shrink or expand. This fix encapsulate the key and value in the same field to avoid this happen * fix ConcurrentLongHashMap throw ArrayIndexOutOfBoundsException When concurrent read write access the map, The key array and value array are not publish at the same time when shrink or expand. This fix encapsulate the key and value in the same field to avoid this happen * fix ConcurrentLongHashMap throw ArrayIndexOutOfBoundsException When concurrent read write access the map, The key array and value array are not publish at the same time when shrink or expand. This fix encapsulate the key and value in the same field to avoid this happen * fix ConcurrentLongHashMap throw ArrayIndexOutOfBoundsException When concurrent read write access the map, The key array and value array are not publish at the same time when shrink or expand. This fix encapsulate the key and value in the same field to avoid this happen
1 parent 697bd59 commit 34ec6a4

4 files changed

Lines changed: 893 additions & 81 deletions

File tree

bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java

Lines changed: 62 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static com.google.common.base.Preconditions.checkNotNull;
2525

2626
import com.google.common.collect.Lists;
27+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2728
import java.util.Arrays;
2829
import java.util.List;
2930
import java.util.concurrent.locks.StampedLock;
@@ -197,7 +198,7 @@ long getUsedBucketCount() {
197198
public long capacity() {
198199
long capacity = 0;
199200
for (Section<V> s : sections) {
200-
capacity += s.capacity;
201+
capacity += s.table.capacity();
201202
}
202203
return capacity;
203204
}
@@ -310,13 +311,18 @@ public interface EntryProcessor<V> {
310311
void accept(long key, V value);
311312
}
312313

313-
// A section is a portion of the hash map that is covered by a single
314+
// A section is a portion of the hash map that is covered by a single lock. The keys, values
315+
// and capacity arrays are bundled into an immutable Table snapshot so that readers always see
316+
// a consistent (key, value, length) triple, eliminating the partial-publish race that the
317+
// previous design had to paper over with Math.min(keys.length, values.length).
314318
@SuppressWarnings("serial")
315319
private static final class Section<V> extends StampedLock {
316-
private volatile long[] keys;
317-
private volatile V[] values;
320+
private record Table<V>(long[] keys, V[] values, int capacity) { }
321+
322+
// Section is Serializable only by inheritance from StampedLock; never actually serialized.
323+
@SuppressFBWarnings("SE_BAD_FIELD")
324+
private volatile Table<V> table;
318325

319-
private volatile int capacity;
320326
private final int initCapacity;
321327
private volatile int size;
322328
private int usedBuckets;
@@ -330,31 +336,29 @@ private static final class Section<V> extends StampedLock {
330336

331337
Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
332338
float expandFactor, float shrinkFactor) {
333-
this.capacity = alignToPowerOfTwo(capacity);
334-
this.initCapacity = this.capacity;
335-
this.keys = new long[this.capacity];
336-
this.values = (V[]) new Object[this.capacity];
339+
int initial = alignToPowerOfTwo(capacity);
340+
this.initCapacity = initial;
341+
this.table = new Table<>(new long[initial], (V[]) new Object[initial], initial);
337342
this.size = 0;
338343
this.usedBuckets = 0;
339344
this.autoShrink = autoShrink;
340345
this.mapFillFactor = mapFillFactor;
341346
this.mapIdleFactor = mapIdleFactor;
342347
this.expandFactor = expandFactor;
343348
this.shrinkFactor = shrinkFactor;
344-
this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
345-
this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
349+
this.resizeThresholdUp = (int) (initial * mapFillFactor);
350+
this.resizeThresholdBelow = (int) (initial * mapIdleFactor);
346351
}
347352

348353
V get(long key, int keyHash) {
349354

350355
long stamp = tryOptimisticRead();
351356
boolean acquiredLock = false;
352357

353-
// add local variable here, so OutOfBound won't happen
354-
long[] keys = this.keys;
355-
V[] values = this.values;
356-
// calculate table.length as capacity to avoid rehash changing capacity
357-
int bucket = signSafeMod(keyHash, values.length);
358+
Table<V> table = this.table;
359+
long[] keys = table.keys();
360+
V[] values = table.values();
361+
int bucket = signSafeMod(keyHash, table.capacity());
358362

359363
try {
360364
while (true) {
@@ -377,10 +381,10 @@ V get(long key, int keyHash) {
377381
stamp = readLock();
378382
acquiredLock = true;
379383

380-
// update local variable
381-
keys = this.keys;
382-
values = this.values;
383-
bucket = signSafeMod(keyHash, values.length);
384+
table = this.table;
385+
keys = table.keys();
386+
values = table.values();
387+
bucket = signSafeMod(keyHash, table.capacity());
384388
storedKey = keys[bucket];
385389
storedValue = values[bucket];
386390
}
@@ -393,7 +397,7 @@ V get(long key, int keyHash) {
393397
}
394398
}
395399

396-
bucket = (bucket + 1) & (values.length - 1);
400+
bucket = (bucket + 1) & (table.capacity() - 1);
397401
}
398402
} finally {
399403
if (acquiredLock) {
@@ -406,7 +410,10 @@ V put(long key, V value, int keyHash, boolean onlyIfAbsent, LongFunction<V> valu
406410
int bucket = keyHash;
407411

408412
long stamp = writeLock();
409-
int capacity = this.capacity;
413+
Table<V> table = this.table;
414+
long[] keys = table.keys();
415+
V[] values = table.values();
416+
int capacity = table.capacity();
410417

411418
// Remember where we find the first available spot
412419
int firstDeletedKey = -1;
@@ -474,6 +481,9 @@ V put(long key, V value, int keyHash, boolean onlyIfAbsent, LongFunction<V> valu
474481
private void cleanDeletedStatus(int startBucket) {
475482
// Cleanup all the buckets that were in `DeletedValue` state,
476483
// so that we can reduce unnecessary expansions
484+
Table<V> table = this.table;
485+
V[] values = table.values();
486+
int capacity = table.capacity();
477487
int lastBucket = signSafeMod(startBucket - 1, capacity);
478488
while (values[lastBucket] == DeletedValue) {
479489
values[lastBucket] = (V) EmptyValue;
@@ -486,10 +496,13 @@ private void cleanDeletedStatus(int startBucket) {
486496
private V remove(long key, Object value, int keyHash) {
487497
int bucket = keyHash;
488498
long stamp = writeLock();
499+
Table<V> table = this.table;
500+
long[] keys = table.keys();
501+
V[] values = table.values();
502+
int capacity = table.capacity();
489503

490504
try {
491505
while (true) {
492-
int capacity = this.capacity;
493506
bucket = signSafeMod(bucket, capacity);
494507

495508
long storedKey = keys[bucket];
@@ -551,7 +564,10 @@ int removeIf(LongObjectPredicate<V> filter) {
551564
int removedCount = 0;
552565
try {
553566
// Go through all the buckets for this section
554-
int capacity = this.capacity;
567+
Table<V> table = this.table;
568+
long[] keys = table.keys();
569+
V[] values = table.values();
570+
int capacity = table.capacity();
555571
for (int bucket = 0; size > 0 && bucket < capacity; bucket++) {
556572
long storedKey = keys[bucket];
557573
V storedValue = values[bucket];
@@ -583,6 +599,7 @@ int removeIf(LongObjectPredicate<V> filter) {
583599
// so as to avoid frequent shrinking and expansion near initCapacity,
584600
// frequent shrinking and expansion,
585601
// additionally opened arrays will consume more memory and affect GC
602+
int capacity = this.table.capacity();
586603
int newCapacity = Math.max(alignToPowerOfTwo((int) (capacity / shrinkFactor)), initCapacity);
587604
int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
588605
if (newCapacity < capacity && newResizeThresholdUp > size) {
@@ -602,11 +619,12 @@ void clear() {
602619
long stamp = writeLock();
603620

604621
try {
605-
if (autoShrink && capacity > initCapacity) {
622+
Table<V> table = this.table;
623+
if (autoShrink && table.capacity() > initCapacity) {
606624
shrinkToInitCapacity();
607625
} else {
608-
Arrays.fill(keys, 0);
609-
Arrays.fill(values, EmptyValue);
626+
Arrays.fill(table.keys(), 0);
627+
Arrays.fill(table.values(), EmptyValue);
610628
this.size = 0;
611629
this.usedBuckets = 0;
612630
}
@@ -618,9 +636,10 @@ void clear() {
618636
public void forEach(EntryProcessor<V> processor) {
619637
long stamp = tryOptimisticRead();
620638

621-
int capacity = this.capacity;
622-
long[] keys = this.keys;
623-
V[] values = this.values;
639+
Table<V> table = this.table;
640+
int capacity = table.capacity();
641+
long[] keys = table.keys();
642+
V[] values = table.values();
624643

625644
boolean acquiredReadLock = false;
626645

@@ -632,9 +651,10 @@ public void forEach(EntryProcessor<V> processor) {
632651
stamp = readLock();
633652
acquiredReadLock = true;
634653

635-
capacity = this.capacity;
636-
keys = this.keys;
637-
values = this.values;
654+
table = this.table;
655+
capacity = table.capacity();
656+
keys = table.keys();
657+
values = table.values();
638658
}
639659

640660
// Go through all the buckets for this section
@@ -666,6 +686,9 @@ private void rehash(int newCapacity) {
666686
// Expand the hashmap
667687
long[] newKeys = new long[newCapacity];
668688
V[] newValues = (V[]) new Object[newCapacity];
689+
Table<V> table = this.table;
690+
long[] keys = table.keys();
691+
V[] values = table.values();
669692

670693
// Re-hash table
671694
for (int i = 0; i < keys.length; i++) {
@@ -676,29 +699,21 @@ private void rehash(int newCapacity) {
676699
}
677700
}
678701

679-
keys = newKeys;
680-
values = newValues;
702+
this.table = new Table<>(newKeys, newValues, newCapacity);
681703
usedBuckets = size;
682-
// Capacity needs to be updated after the values, so that we won't see
683-
// a capacity value bigger than the actual array size
684-
capacity = newCapacity;
685-
resizeThresholdUp = (int) (capacity * mapFillFactor);
686-
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
704+
resizeThresholdUp = (int) (newCapacity * mapFillFactor);
705+
resizeThresholdBelow = (int) (newCapacity * mapIdleFactor);
687706
}
688707

689708
private void shrinkToInitCapacity() {
690709
long[] newKeys = new long[initCapacity];
691710
V[] newValues = (V[]) new Object[initCapacity];
692711

693-
keys = newKeys;
694-
values = newValues;
712+
table = new Table<>(newKeys, newValues, initCapacity);
695713
size = 0;
696714
usedBuckets = 0;
697-
// Capacity needs to be updated after the values, so that we won't see
698-
// a capacity value bigger than the actual array size
699-
capacity = initCapacity;
700-
resizeThresholdUp = (int) (capacity * mapFillFactor);
701-
resizeThresholdBelow = (int) (capacity * mapIdleFactor);
715+
resizeThresholdUp = (int) (initCapacity * mapFillFactor);
716+
resizeThresholdBelow = (int) (initCapacity * mapIdleFactor);
702717
}
703718

704719
private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {

0 commit comments

Comments
 (0)