Skip to content

Commit ffb08af

Browse files
authored
HDDS-14053. Extract generic MinHeapMergeIterator from SstFileSetReader (#9409)
1 parent efcb778 commit ffb08af

7 files changed

Lines changed: 574 additions & 136 deletions

File tree

hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ClosableIterator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
package org.apache.hadoop.ozone.util;
1919

20+
import java.io.Closeable;
2021
import java.util.Iterator;
2122

2223
/**
2324
* An {@link Iterator} that may hold resources until it is closed.
2425
*/
25-
public interface ClosableIterator<E> extends Iterator<E>, AutoCloseable {
26+
public interface ClosableIterator<E> extends Iterator<E>, Closeable {
2627
@Override
2728
void close();
2829
}

hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,23 @@
1717

1818
package org.apache.hadoop.hdds.utils.db;
1919

20+
import static org.apache.hadoop.hdds.utils.db.Table.newKeyValue;
21+
2022
import java.io.File;
23+
import java.io.IOException;
24+
import java.util.Collection;
25+
import java.util.Comparator;
2126
import java.util.List;
2227
import java.util.Map;
28+
import java.util.NoSuchElementException;
29+
import java.util.stream.Collectors;
30+
import java.util.stream.IntStream;
2331
import org.apache.hadoop.hdds.annotation.InterfaceStability;
32+
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
2433
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
2534
import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
2635
import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
36+
import org.apache.hadoop.ozone.util.ClosableIterator;
2737
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
2838
import org.apache.ratis.util.UncheckedAutoCloseable;
2939

@@ -170,4 +180,39 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
170180
boolean isClosed();
171181

172182
String getSnapshotsParentDir();
183+
184+
/**
185+
* Creates an iterator that merges multiple tables into a single iterator,
186+
* grouping values with the same key across the tables.
187+
*
188+
* @param <KEY> the type of keys for the tables
189+
* @param keyComparator the comparator used to compare keys from different tables
190+
* @param prefix the prefix used to filter entries of each table
191+
* @param table one or more tables to merge
192+
* @return a closable iterator over merged key-value pairs, where each key corresponds
193+
* to a collection of values from the tables
194+
*/
195+
default <KEY> ClosableIterator<KeyValue<KEY, Collection<Object>>> getMergeIterator(
196+
Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) {
197+
List<Object> tableValues = IntStream.range(0, table.length).mapToObj(i -> null).collect(Collectors.toList());
198+
KeyValue<KEY, Object> defaultNullValue = newKeyValue(null, null);
199+
Comparator<KeyValue<KEY, Object>> comparator = Comparator.comparing(KeyValue::getKey, keyComparator);
200+
return new MinHeapMergeIterator<KeyValue<KEY, Object>, Table.KeyValueIterator<KEY, Object>,
201+
KeyValue<KEY, Collection<Object>>>(table.length + 1, comparator) {
202+
@Override
203+
protected Table.KeyValueIterator<KEY, Object> getIterator(int idx) throws IOException {
204+
return table[idx].iterator(prefix);
205+
}
206+
207+
@Override
208+
protected KeyValue<KEY, Collection<Object>> merge(Map<Integer, KeyValue<KEY, Object>> keysToMerge) {
209+
KEY key = keysToMerge.values().stream().findAny()
210+
.orElseThrow(() -> new NoSuchElementException("No keys found")).getKey();
211+
for (int i = 0; i < tableValues.size(); i++) {
212+
tableValues.set(i, keysToMerge.getOrDefault(i, defaultNullValue).getValue());
213+
}
214+
return newKeyValue(key, tableValues);
215+
}
216+
};
217+
}
173218
}
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.hdds.utils.db;
19+
20+
import jakarta.annotation.Nonnull;
21+
import java.io.Closeable;
22+
import java.io.IOException;
23+
import java.io.UncheckedIOException;
24+
import java.util.Comparator;
25+
import java.util.HashMap;
26+
import java.util.Iterator;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.NoSuchElementException;
30+
import java.util.Objects;
31+
import java.util.PriorityQueue;
32+
import java.util.stream.Collectors;
33+
import java.util.stream.IntStream;
34+
import org.apache.hadoop.ozone.util.ClosableIterator;
35+
36+
/**
37+
* An abstract class that provides functionality to merge elements from multiple sorted iterators
38+
* using a min-heap. The {@code MinHeapMergeIterator} ensures the merged output is in sorted order
39+
* by repeatedly polling the smallest element from the heap of iterators.
40+
*
41+
* @param <K> the type of keys being merged, must be {@link Comparable}
42+
* @param <I> the type of iterators being used, must extend {@link Iterator} and implement {@link Closeable}
43+
* @param <V> the type of the final merged output
44+
*/
45+
public abstract class MinHeapMergeIterator<K, I extends Iterator<K> & Closeable, V>
46+
implements ClosableIterator<V> {
47+
private final PriorityQueue<HeapEntry<K>> minHeap;
48+
private final Map<Integer, K> keys;
49+
private final List<I> iterators;
50+
private boolean initialized;
51+
private final Comparator<K> comparator;
52+
53+
public MinHeapMergeIterator(int numberOfIterators, Comparator<K> comparator) {
54+
this.minHeap = new PriorityQueue<>(Math.max(numberOfIterators, 1));
55+
keys = new HashMap<>(numberOfIterators);
56+
iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I) null).collect(Collectors.toList());
57+
this.initialized = false;
58+
this.comparator = comparator;
59+
}
60+
61+
protected abstract I getIterator(int idx) throws IOException;
62+
63+
private boolean initHeap() throws IOException {
64+
if (initialized) {
65+
return false;
66+
}
67+
initialized = true;
68+
int count = 0;
69+
try {
70+
for (int idx = 0; idx < iterators.size(); idx++) {
71+
I itr = getIterator(idx);
72+
iterators.set(idx, itr);
73+
HeapEntry<K> entry = new HeapEntry<>(idx, itr, comparator);
74+
if (entry.getCurrentKey() != null) {
75+
minHeap.add(entry);
76+
count++;
77+
} else {
78+
// No valid entries, close the iterator.
79+
closeItrAtIndex(idx);
80+
}
81+
}
82+
} catch (IOException e) {
83+
close();
84+
throw e;
85+
}
86+
return count > 0;
87+
}
88+
89+
@Override
90+
public boolean hasNext() {
91+
try {
92+
return !minHeap.isEmpty() || initHeap();
93+
} catch (IOException e) {
94+
throw new UncheckedIOException(e);
95+
}
96+
}
97+
98+
protected abstract V merge(Map<Integer, K> keysToMerge);
99+
100+
@Override
101+
public V next() {
102+
if (!hasNext()) {
103+
throw new NoSuchElementException("No more elements found.");
104+
}
105+
106+
assert minHeap.peek() != null;
107+
// Get current key from heap
108+
K currentKey = minHeap.peek().getCurrentKey();
109+
// Clear the keys list by setting all entries to null.
110+
keys.clear();
111+
// Advance all entries with the same key (from different files)
112+
while (!minHeap.isEmpty() && Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) {
113+
HeapEntry<K> entry = minHeap.poll();
114+
int idx = entry.index;
115+
// Set the key for the current entry in the keys list.
116+
keys.put(idx, entry.getCurrentKey());
117+
if (entry.advance()) {
118+
minHeap.offer(entry);
119+
} else {
120+
// Iterator is exhausted, close it to prevent resource leak
121+
try {
122+
closeItrAtIndex(idx);
123+
} catch (IOException e) {
124+
throw new UncheckedIOException(e);
125+
}
126+
}
127+
}
128+
return merge(keys);
129+
}
130+
131+
private void closeItrAtIndex(int idx) throws IOException {
132+
if (iterators.get(idx) != null) {
133+
iterators.get(idx).close();
134+
iterators.set(idx, null);
135+
}
136+
}
137+
138+
@Override
139+
public void close() {
140+
IOException exception = null;
141+
for (int idx = 0; idx < iterators.size(); idx++) {
142+
try {
143+
closeItrAtIndex(idx);
144+
} catch (IOException e) {
145+
exception = e;
146+
}
147+
}
148+
if (exception != null) {
149+
throw new UncheckedIOException(exception);
150+
}
151+
}
152+
153+
/**
154+
* A wrapper class that holds an iterator and its current value for heap operations.
155+
*/
156+
private static final class HeapEntry<T> implements Comparable<HeapEntry<T>> {
157+
private final int index;
158+
private final Iterator<T> iterator;
159+
private T currentKey;
160+
private Comparator<T> comparator;
161+
162+
private HeapEntry(int index, Iterator<T> iterator, Comparator<T> comparator) {
163+
this.iterator = iterator;
164+
this.index = index;
165+
this.comparator = comparator;
166+
advance();
167+
}
168+
169+
private boolean advance() {
170+
if (iterator.hasNext()) {
171+
currentKey = iterator.next();
172+
return true;
173+
} else {
174+
currentKey = null;
175+
return false;
176+
}
177+
}
178+
179+
private T getCurrentKey() {
180+
return currentKey;
181+
}
182+
183+
@Override
184+
public int compareTo(@Nonnull HeapEntry<T> other) {
185+
return Comparator.comparing(HeapEntry<T>::getCurrentKey, this.comparator).compare(this, other);
186+
}
187+
188+
@Override
189+
@SuppressWarnings("unchecked")
190+
public boolean equals(Object obj) {
191+
if (this == obj) {
192+
return true;
193+
}
194+
if (obj == null || getClass() != obj.getClass()) {
195+
return false;
196+
}
197+
198+
HeapEntry<T> other = (HeapEntry<T>) obj;
199+
return this.compareTo(other) == 0;
200+
}
201+
202+
@Override
203+
public int hashCode() {
204+
return currentKey.hashCode();
205+
}
206+
}
207+
}

0 commit comments

Comments
 (0)