Skip to content

Commit 891711f

Browse files
committed
HDDS-14239. Optimize delete range search using prefix count index structure
Change-Id: I704dab8380d1dd35faa2b9453ef90b439937e95b
1 parent 78e1c8d commit 891711f

2 files changed

Lines changed: 231 additions & 41 deletions

File tree

hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java

Lines changed: 40 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919

2020
import com.google.common.base.Preconditions;
2121
import java.io.Closeable;
22+
import java.io.IOException;
2223
import java.nio.ByteBuffer;
23-
import java.util.Collection;
2424
import java.util.Comparator;
2525
import java.util.HashMap;
26+
import java.util.HashSet;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.Objects;
29-
import java.util.TreeMap;
30+
import java.util.Set;
3031
import java.util.concurrent.atomic.AtomicBoolean;
3132
import java.util.concurrent.atomic.AtomicInteger;
3233
import java.util.function.Supplier;
@@ -185,6 +186,11 @@ public void close() {
185186
keyBytes.close();
186187
}
187188
}
189+
190+
@Override
191+
public String toString() {
192+
return getOpType() + ", key=" + keyBytes;
193+
}
188194
}
189195

190196
/**
@@ -278,15 +284,13 @@ public void close() {
278284
private final class DeleteRangeOperation extends Operation {
279285
private final byte[] startKey;
280286
private final byte[] endKey;
281-
private final Bytes startKeyBytes;
282-
private final Bytes endKeyBytes;
287+
private final RangeQueryIndex.Range<Bytes> rangeEntry;
283288

284289
private DeleteRangeOperation(byte[] startKey, byte[] endKey) {
285290
super(null);
286291
this.startKey = Objects.requireNonNull(startKey, "startKey == null");
287292
this.endKey = Objects.requireNonNull(endKey, "endKey == null");
288-
this.startKeyBytes = new Bytes(startKey);
289-
this.endKeyBytes = new Bytes(endKey);
293+
this.rangeEntry = new RangeQueryIndex.Range<>(new Bytes(startKey), new Bytes(endKey));
290294
}
291295

292296
@Override
@@ -312,12 +316,13 @@ public String getOpType() {
312316
@Override
313317
public void close() {
314318
super.close();
315-
startKeyBytes.close();
316-
endKeyBytes.close();
319+
rangeEntry.getStartInclusive().close();
320+
rangeEntry.getEndExclusive().close();
317321
}
318322

319-
private boolean contains(Bytes key) {
320-
return startKeyBytes.compareTo(key) <= 0 && endKeyBytes.compareTo(key) > 0;
323+
@Override
324+
public String toString() {
325+
return getOpType() + ", rangeEntry=" + rangeEntry;
321326
}
322327
}
323328

@@ -379,16 +384,6 @@ private class FamilyCache {
379384
this.opIndex = new AtomicInteger(0);
380385
}
381386

382-
private DeleteRangeOperation findFirstDeleteRangeMatchingRange(Collection<DeleteRangeOperation> deleteRangeOps,
383-
Bytes key) {
384-
for (DeleteRangeOperation deleteRangeOp : deleteRangeOps) {
385-
if (deleteRangeOp.contains(key)) {
386-
return deleteRangeOp;
387-
}
388-
}
389-
return null;
390-
}
391-
392387
/**
393388
* Prepares a batch write operation for a RocksDB-backed system.
394389
*
@@ -420,37 +415,41 @@ void prepareBatchWrite() throws RocksDatabaseException {
420415
// Sort Entries based on opIndex and flush the operation to the batch in the same order.
421416
List<Operation> ops = batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey))
422417
.map(Map.Entry::getValue).collect(Collectors.toList());
423-
TreeMap<Integer, DeleteRangeOperation> deleteRangeIndices = new TreeMap<>();
424-
int index = 0;
418+
Set<RangeQueryIndex.Range<Bytes>> deleteRangeEntries = new HashSet<>();
425419
for (Operation op : ops) {
426420
if (DELETE_RANGE_OP.equals(op.getOpType())) {
427421
DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op;
428-
deleteRangeIndices.put(index, deleteRangeOp);
422+
deleteRangeEntries.add(deleteRangeOp.rangeEntry);
429423
}
430-
index++;
431424
}
432-
433-
for (int idx = 0; idx < ops.size(); idx++) {
434-
Operation op = ops.get(idx);
435-
if (DELETE_RANGE_OP.equals(op.getOpType())) {
436-
op.apply(family, writeBatch);
437-
} else {
438-
// Find the first delete range op matching which would contain the key after the
439-
// operation has occurred. If there is no such operation then perform the operation otherwise discard the
440-
// op.
441-
DeleteRangeOperation deleteRangeOp = findFirstDeleteRangeMatchingRange(
442-
deleteRangeIndices.tailMap(idx, false).values(), op.getKey());
443-
if (deleteRangeOp == null) {
425+
try {
426+
RangeQueryIndex<Bytes> rangeQueryIdx = new RangeQueryIndex<>(deleteRangeEntries);
427+
for (Operation op : ops) {
428+
if (DELETE_RANGE_OP.equals(op.getOpType())) {
429+
DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op;
430+
rangeQueryIdx.removeRange(deleteRangeOp.rangeEntry);
444431
op.apply(family, writeBatch);
445432
} else {
446-
debug(() -> String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)",
447-
op.getKey(), deleteRangeOp.startKeyBytes, deleteRangeOp.endKeyBytes));
448-
discardedCount++;
449-
discardedSize += op.totalLength();
433+
// Find a delete range op matching which would contain the key after the
434+
// operation has occurred. If there is no such operation then perform the operation otherwise discard the
435+
// op.
436+
if (!rangeQueryIdx.containsIntersectingRange(op.getKey())) {
437+
op.apply(family, writeBatch);
438+
} else {
439+
debug(() -> {
440+
RangeQueryIndex.Range<Bytes> deleteRangeOp = rangeQueryIdx.getFirstIntersectingRange(op.getKey());
441+
return String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)",
442+
op.getKey(), deleteRangeOp.getStartInclusive(), deleteRangeOp.getEndExclusive());
443+
});
444+
discardedCount++;
445+
discardedSize += op.totalLength();
446+
}
450447
}
451448
}
449+
debug(this::summary);
450+
} catch (IOException e) {
451+
throw new RocksDatabaseException("Failed to prepare batch write", e);
452452
}
453-
debug(this::summary);
454453
}
455454

456455
private String summary() {
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.hdds.utils.db;
19+
20+
import java.io.IOException;
21+
import java.util.Map;
22+
import java.util.Objects;
23+
import java.util.Set;
24+
import java.util.TreeMap;
25+
26+
/**
27+
* An index for answering "does this point fall within any of these ranges?" efficiently.
28+
*
29+
* <p>The indexed ranges are <em>half-open</em> intervals of the form
30+
* {@code [startInclusive, endExclusive)}.
31+
*
32+
* <p><strong>Core idea (sweep-line / prefix-sum over range boundaries):</strong>
33+
* Instead of scanning every range on each query, this index stores a sorted map from
34+
* boundary points to a running count of "active" ranges at that point.
35+
*
36+
* <ul>
37+
* <li>For each range {@code [s, e)}, we add a delta {@code +1} at {@code s} and a delta
38+
* {@code -1} at {@code e}.</li>
39+
* <li>We then convert the deltas into a prefix sum in key order, so every boundary key
40+
* stores the number of ranges active at that coordinate.</li>
41+
* <li>For any query point {@code k}, the active count is {@code floorEntry(k).value}.
42+
* If it is {@code > 0}, then {@code k} intersects at least one range.</li>
43+
* </ul>
44+
*
45+
* <p><strong>Update model:</strong> this index supports only removing ranges that were part of the
46+
* initial set. Removal updates the prefix sums for keys in {@code [startInclusive, endExclusive)}
47+
* (net effect of removing {@code +1} at start and {@code -1} at end).
48+
*
49+
* <p><strong>Complexities:</strong>
50+
* <ul>
51+
* <li>Build: {@code O(R log B)} where {@code R} is #ranges and {@code B} is #distinct boundaries.</li>
52+
* <li>{@link #containsIntersectingRange(Object)}: {@code O(log B)}.</li>
53+
* <li>{@link #removeRange(Range)}: {@code O(log B + K)} where {@code K} is #boundaries in the range.</li>
54+
* </ul>
55+
*
56+
* @param <T> boundary type (must be {@link Comparable} to be stored in a {@link TreeMap})
57+
*/
58+
class RangeQueryIndex<T extends Comparable<T>> {
59+
60+
private final TreeMap<T, Integer> rangeCountIndexMap;
61+
private final Set<Range<T>> ranges;
62+
63+
RangeQueryIndex(Set<Range<T>> ranges) {
64+
this.rangeCountIndexMap = new TreeMap<>();
65+
this.ranges = ranges;
66+
init();
67+
}
68+
69+
private void init() {
70+
// Phase 1: store boundary deltas (+1 at start, -1 at end).
71+
for (Range<T> range : ranges) {
72+
rangeCountIndexMap.compute(range.startInclusive, (k, v) -> v == null ? 1 : v + 1);
73+
rangeCountIndexMap.compute(range.endExclusive, (k, v) -> v == null ? -1 : v - 1);
74+
}
75+
76+
// Phase 2: convert deltas to prefix sums so each key holds the active range count at that coordinate.
77+
int totalCount = 0;
78+
for (Map.Entry<T, Integer> entry : rangeCountIndexMap.entrySet()) {
79+
totalCount += entry.getValue();
80+
entry.setValue(totalCount);
81+
}
82+
}
83+
84+
/**
85+
* Remove a range from the index.
86+
*
87+
* <p>This method assumes the range set is "popped" over time (ranges are removed but not added).
88+
* Internally, removing {@code [s, e)} decreases the active count by 1 for all boundary keys in
89+
* {@code [s, e)} and leaves counts outside the range unchanged.
90+
*
91+
* @throws IOException if the given {@code range} is not part of the indexed set
92+
*/
93+
void removeRange(Range<T> range) throws IOException {
94+
if (!ranges.contains(range)) {
95+
throw new IOException(String.format("Range %s not found in index structure : %s", range, ranges));
96+
}
97+
ranges.remove(range);
98+
for (Map.Entry<T, Integer> entry : rangeCountIndexMap.subMap(range.startInclusive, true,
99+
range.endExclusive, false).entrySet()) {
100+
entry.setValue(entry.getValue() - 1);
101+
}
102+
}
103+
104+
/**
105+
* @return true iff {@code key} is contained in at least one indexed range.
106+
*
107+
* <p>Implementation detail: uses {@link TreeMap#floorEntry(Object)} to find the last boundary
108+
* at or before {@code key}, and checks the prefix-summed active count at that point.</p>
109+
*/
110+
boolean containsIntersectingRange(T key) {
111+
Map.Entry<T, Integer> countEntry = rangeCountIndexMap.floorEntry(key);
112+
if (countEntry == null) {
113+
return false;
114+
}
115+
return countEntry.getValue() > 0;
116+
}
117+
118+
/**
119+
* Returns an intersecting range containing {@code key}, if any.
120+
*
121+
* <p>This method first checks {@link #containsIntersectingRange(Comparable)} using the index;
122+
* if the count indicates an intersection exists, it then scans the backing {@link #ranges}
123+
* set to find a concrete {@link Range} that contains {@code key}.</p>
124+
*
125+
* <p>Note that because {@link #ranges} is a {@link Set}, "first" refers to whatever iteration
126+
* order that set provides (it is not guaranteed to be deterministic unless the provided set is).</p>
127+
*
128+
* @return a containing range, or null if none intersect
129+
*/
130+
Range<T> getFirstIntersectingRange(T key) {
131+
Map.Entry<T, Integer> countEntry = rangeCountIndexMap.floorEntry(key);
132+
if (countEntry == null) {
133+
return null;
134+
}
135+
for (Range<T> range : ranges) {
136+
if (range.contains(key)) {
137+
return range;
138+
}
139+
}
140+
return null;
141+
}
142+
143+
/**
144+
* A half-open interval {@code [startInclusive, endExclusive)}.
145+
*
146+
* <p>For a value {@code k} to be contained, it must satisfy:
147+
* {@code startInclusive <= k < endExclusive} (according to {@link Comparable#compareTo(Object)}).</p>
148+
*/
149+
static final class Range<T extends Comparable<T>> {
150+
private final T startInclusive;
151+
private final T endExclusive;
152+
153+
Range(T startInclusive, T endExclusive) {
154+
this.startInclusive = Objects.requireNonNull(startInclusive, "start == null");
155+
this.endExclusive = Objects.requireNonNull(endExclusive, "end == null");
156+
}
157+
158+
@Override
159+
public boolean equals(Object o) {
160+
return this == o;
161+
}
162+
163+
@Override
164+
public int hashCode() {
165+
return Objects.hash(startInclusive, endExclusive);
166+
}
167+
168+
T getStartInclusive() {
169+
return startInclusive;
170+
}
171+
172+
T getEndExclusive() {
173+
return endExclusive;
174+
}
175+
176+
/**
177+
* @return true iff {@code key} is within {@code [startInclusive, endExclusive)}.
178+
*/
179+
public boolean contains(T key) {
180+
return startInclusive.compareTo(key) <= 0 && key.compareTo(endExclusive) < 0;
181+
}
182+
183+
@Override
184+
public String toString() {
185+
return "Range{" +
186+
"startInclusive=" + startInclusive +
187+
", endExclusive=" + endExclusive +
188+
'}';
189+
}
190+
}
191+
}

0 commit comments

Comments
 (0)