Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.hadoop.ozone.util;

import java.io.Closeable;
import java.util.Iterator;

/**
* An {@link Iterator} that may hold resources until it is closed.
*/
public interface ClosableIterator<E> extends Iterator<E>, AutoCloseable {
public interface ClosableIterator<E> extends Iterator<E>, Closeable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is acceptable. Closeable is a subclass of AutoCloseable, and the only difference is it throws IOException instead of Exception. This is not a public API.

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,23 @@

package org.apache.hadoop.hdds.utils.db;

import static org.apache.hadoop.hdds.utils.db.Table.newKeyValue;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
import org.apache.hadoop.ozone.util.ClosableIterator;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import org.apache.ratis.util.UncheckedAutoCloseable;

Expand Down Expand Up @@ -170,4 +180,39 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
boolean isClosed();

String getSnapshotsParentDir();

/**
* Creates an iterator that merges multiple tables into a single iterator,
* grouping values with the same key across the tables.
*
* @param <KEY> the type of keys for the tables
* @param keyComparator the comparator used to compare keys from different tables
* @param prefix the prefix used to filter entries of each table
* @param table one or more tables to merge
* @return a closable iterator over merged key-value pairs, where each key corresponds
* to a collection of values from the tables
*/
default <KEY> ClosableIterator<KeyValue<KEY, Collection<Object>>> getMergeIterator(
Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) {
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The varargs parameter table is not validated for null or empty. If an empty array or null is passed, this will cause issues: an empty array will create an iterator that never returns elements but won't fail gracefully, and null will cause a NullPointerException. Consider adding validation at the start of the method.

Suggested change
Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) {
Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) {
if (table == null || table.length == 0) {
throw new IllegalArgumentException("At least one table must be provided to getMergeIterator.");
}

Copilot uses AI. Check for mistakes.
List<Object> tableValues = IntStream.range(0, table.length).mapToObj(i -> null).collect(Collectors.toList());
KeyValue<KEY, Object> defaultNullValue = newKeyValue(null, null);
Comparator<KeyValue<KEY, Object>> comparator = Comparator.comparing(KeyValue::getKey, keyComparator);
return new MinHeapMergeIterator<KeyValue<KEY, Object>, Table.KeyValueIterator<KEY, Object>,
KeyValue<KEY, Collection<Object>>>(table.length + 1, comparator) {
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor creates a PriorityQueue with size Math.max(numberOfIterators, 1) but then creates a MinHeapMergeIterator with table.length + 1 on line 202. This results in a heap size one larger than the number of tables, which is wasteful and inconsistent. The size should be table.length not table.length + 1.

Suggested change
KeyValue<KEY, Collection<Object>>>(table.length + 1, comparator) {
KeyValue<KEY, Collection<Object>>>(table.length, comparator) {

Copilot uses AI. Check for mistakes.
@Override
protected Table.KeyValueIterator<KEY, Object> getIterator(int idx) throws IOException {
return table[idx].iterator(prefix);
}

@Override
protected KeyValue<KEY, Collection<Object>> merge(Map<Integer, KeyValue<KEY, Object>> keysToMerge) {
KEY key = keysToMerge.values().stream().findAny()
.orElseThrow(() -> new NoSuchElementException("No keys found")).getKey();
for (int i = 0; i < tableValues.size(); i++) {
tableValues.set(i, keysToMerge.getOrDefault(i, defaultNullValue).getValue());
}
return newKeyValue(key, tableValues);
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tableValues list is reused and mutated on every call to merge(). This means all returned Collection<Object> values from the iterator share the same underlying list, and subsequent calls to next() will modify the contents of previously returned collections. This can lead to subtle bugs if callers retain references to these collections. Consider creating a new list for each merge result: return newKeyValue(key, new ArrayList<>(tableValues));

Suggested change
return newKeyValue(key, tableValues);
return newKeyValue(key, new java.util.ArrayList<>(tableValues));

Copilot uses AI. Check for mistakes.
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.utils.db;

import jakarta.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.ozone.util.ClosableIterator;

/**
* An abstract class that provides functionality to merge elements from multiple sorted iterators
* using a min-heap. The {@code MinHeapMergeIterator} ensures the merged output is in sorted order
* by repeatedly polling the smallest element from the heap of iterators.
*
* @param <K> the type of keys being merged, must be {@link Comparable}
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class javadoc states that K "must be {@link Comparable}" but this is not accurate. The type parameter K doesn't have a Comparable bound, and the class accepts any type with a Comparator. The documentation should be updated to reflect that K needs a Comparator provided via the constructor.

Suggested change
* @param <K> the type of keys being merged, must be {@link Comparable}
* @param <K> the type of keys being merged; a {@link Comparator} for K must be provided via the constructor

Copilot uses AI. Check for mistakes.
* @param <I> the type of iterators being used, must extend {@link Iterator} and implement {@link Closeable}
* @param <V> the type of the final merged output
*/
public abstract class MinHeapMergeIterator<K, I extends Iterator<K> & Closeable, V>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Encapsulates several methods from SstFileSetReader.MultipleSstFileIterator, to be used by DBStore.

implements ClosableIterator<V> {
private final PriorityQueue<HeapEntry<K>> minHeap;
private final Map<Integer, K> keys;
private final List<I> iterators;
private boolean initialized;
private final Comparator<K> comparator;

public MinHeapMergeIterator(int numberOfIterators, Comparator<K> comparator) {
this.minHeap = new PriorityQueue<>(Math.max(numberOfIterators, 1));
keys = new HashMap<>(numberOfIterators);
iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I) null).collect(Collectors.toList());
this.initialized = false;
this.comparator = comparator;
}

protected abstract I getIterator(int idx) throws IOException;

private boolean initHeap() throws IOException {
if (initialized) {
return false;
}
initialized = true;
int count = 0;
try {
for (int idx = 0; idx < iterators.size(); idx++) {
I itr = getIterator(idx);
iterators.set(idx, itr);
HeapEntry<K> entry = new HeapEntry<>(idx, itr, comparator);
if (entry.getCurrentKey() != null) {
minHeap.add(entry);
count++;
} else {
// No valid entries, close the iterator.
closeItrAtIndex(idx);
}
}
} catch (IOException e) {
close();
throw e;
}
return count > 0;
}

@Override
public boolean hasNext() {
try {
return !minHeap.isEmpty() || initHeap();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

protected abstract V merge(Map<Integer, K> keysToMerge);

@Override
public V next() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved from SstFileSetReader.MultipleSstFileIterator.next(), except that it also manages keys map.

if (!hasNext()) {
throw new NoSuchElementException("No more elements found.");
}

assert minHeap.peek() != null;
// Get current key from heap
K currentKey = minHeap.peek().getCurrentKey();
// Clear the keys list by setting all entries to null.
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on line 113 says "Clear the keys list by setting all entries to null" but the code actually calls keys.clear() which removes all entries, not sets them to null. The comment should be updated to: "Clear the keys map from the previous iteration."

Suggested change
// Clear the keys list by setting all entries to null.
// Clear the keys map from the previous iteration.

Copilot uses AI. Check for mistakes.
keys.clear();
// Advance all entries with the same key (from different files)
while (!minHeap.isEmpty() && Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) {
HeapEntry<K> entry = minHeap.poll();
int idx = entry.index;
// Set the key for the current entry in the keys list.
keys.put(idx, entry.getCurrentKey());
if (entry.advance()) {
minHeap.offer(entry);
} else {
// Iterator is exhausted, close it to prevent resource leak
try {
closeItrAtIndex(idx);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
return merge(keys);
}

private void closeItrAtIndex(int idx) throws IOException {
if (iterators.get(idx) != null) {
iterators.get(idx).close();
iterators.set(idx, null);
}
}

Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method overrides ClosableIterator.close; it is advisable to add an Override annotation.

Suggested change
@Override

Copilot uses AI. Check for mistakes.
@Override
public void close() {
IOException exception = null;
for (int idx = 0; idx < iterators.size(); idx++) {
try {
closeItrAtIndex(idx);
} catch (IOException e) {
exception = e;
}
}
if (exception != null) {
throw new UncheckedIOException(exception);
Comment on lines +140 to +149
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The close() method only preserves the last IOException that occurred during cleanup. If multiple iterators throw exceptions during close(), only the last one is thrown, potentially hiding other important errors. Consider collecting all exceptions or using a suppressed exception pattern to preserve all error information.

Suggested change
IOException exception = null;
for (int idx = 0; idx < iterators.size(); idx++) {
try {
closeItrAtIndex(idx);
} catch (IOException e) {
exception = e;
}
}
if (exception != null) {
throw new UncheckedIOException(exception);
IOException firstException = null;
for (int idx = 0; idx < iterators.size(); idx++) {
try {
closeItrAtIndex(idx);
} catch (IOException e) {
if (firstException == null) {
firstException = e;
} else {
firstException.addSuppressed(e);
}
}
}
if (firstException != null) {
throw new UncheckedIOException(firstException);

Copilot uses AI. Check for mistakes.
}
}
Comment on lines +139 to +151
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The close() method doesn't clear the minHeap after closing all iterators. While the iterators themselves are closed and set to null in the list, HeapEntry objects remain in the minHeap, preventing proper garbage collection and potentially causing issues if the iterator is accidentally used after close().

Copilot uses AI. Check for mistakes.

/**
* A wrapper class that holds an iterator and its current value for heap operations.
*/
private static final class HeapEntry<T> implements Comparable<HeapEntry<T>> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is moved from SstFileSetRead.HeapEntry, with index and comparator.
index is not used.
comparator is customizable.

private final int index;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

index is not used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HeapEntry<K> entry = minHeap.poll();
int idx = entry.index;
// Set the key for the current entry in the keys list.
keys.put(idx, entry.getCurrentKey());
if (entry.advance()) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is being used while setting the value in the Map returned

private final Iterator<T> iterator;
private T currentKey;
private Comparator<T> comparator;

private HeapEntry(int index, Iterator<T> iterator, Comparator<T> comparator) {
this.iterator = iterator;
this.index = index;
this.comparator = comparator;
advance();
}

private boolean advance() {
if (iterator.hasNext()) {
currentKey = iterator.next();
return true;
} else {
currentKey = null;
return false;
}
}

private T getCurrentKey() {
return currentKey;
}

@Override
public int compareTo(@Nonnull HeapEntry<T> other) {
return Comparator.comparing(HeapEntry<T>::getCurrentKey, this.comparator).compare(this, other);
}

@Override
@SuppressWarnings("unchecked")
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}

HeapEntry<T> other = (HeapEntry<T>) obj;
return this.compareTo(other) == 0;
}

@Override
public int hashCode() {
return currentKey.hashCode();
}
}
}
Loading