-
Notifications
You must be signed in to change notification settings - Fork 603
HDDS-14053. Extract generic MinHeapMergeIterator from SstFileSetReader #9409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9e3bd7f
df22f21
655bbb0
ac75395
bddf1e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -17,13 +17,23 @@ | |||||||||||
|
|
||||||||||||
| package org.apache.hadoop.hdds.utils.db; | ||||||||||||
|
|
||||||||||||
| import static org.apache.hadoop.hdds.utils.db.Table.newKeyValue; | ||||||||||||
|
|
||||||||||||
| import java.io.File; | ||||||||||||
| import java.io.IOException; | ||||||||||||
| import java.util.Collection; | ||||||||||||
| import java.util.Comparator; | ||||||||||||
| import java.util.List; | ||||||||||||
| import java.util.Map; | ||||||||||||
| import java.util.NoSuchElementException; | ||||||||||||
| import java.util.stream.Collectors; | ||||||||||||
| import java.util.stream.IntStream; | ||||||||||||
| import org.apache.hadoop.hdds.annotation.InterfaceStability; | ||||||||||||
| import org.apache.hadoop.hdds.utils.db.Table.KeyValue; | ||||||||||||
| import org.apache.hadoop.hdds.utils.db.cache.TableCache; | ||||||||||||
| import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType; | ||||||||||||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions; | ||||||||||||
| import org.apache.hadoop.ozone.util.ClosableIterator; | ||||||||||||
| import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; | ||||||||||||
| import org.apache.ratis.util.UncheckedAutoCloseable; | ||||||||||||
|
|
||||||||||||
|
|
@@ -170,4 +180,39 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) | |||||||||||
| boolean isClosed(); | ||||||||||||
|
|
||||||||||||
| String getSnapshotsParentDir(); | ||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * Creates an iterator that merges multiple tables into a single iterator, | ||||||||||||
| * grouping values with the same key across the tables. | ||||||||||||
| * | ||||||||||||
| * @param <KEY> the type of keys for the tables | ||||||||||||
| * @param keyComparator the comparator used to compare keys from different tables | ||||||||||||
| * @param prefix the prefix used to filter entries of each table | ||||||||||||
| * @param table one or more tables to merge | ||||||||||||
| * @return a closable iterator over merged key-value pairs, where each key corresponds | ||||||||||||
| * to a collection of values from the tables | ||||||||||||
| */ | ||||||||||||
| default <KEY> ClosableIterator<KeyValue<KEY, Collection<Object>>> getMergeIterator( | ||||||||||||
| Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) { | ||||||||||||
|
||||||||||||
| Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) { | |
| Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) { | |
| if (table == null || table.length == 0) { | |
| throw new IllegalArgumentException("At least one table must be provided to getMergeIterator."); | |
| } |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor creates a PriorityQueue with size Math.max(numberOfIterators, 1) but then creates a MinHeapMergeIterator with table.length + 1 on line 202. This results in a heap size one larger than the number of tables, which is wasteful and inconsistent. The size should be table.length not table.length + 1.
| KeyValue<KEY, Collection<Object>>>(table.length + 1, comparator) { | |
| KeyValue<KEY, Collection<Object>>>(table.length, comparator) { |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tableValues list is reused and mutated on every call to merge(). This means all returned Collection<Object> values from the iterator share the same underlying list, and subsequent calls to next() will modify the contents of previously returned collections. This can lead to subtle bugs if callers retain references to these collections. Consider creating a new list for each merge result: return newKeyValue(key, new ArrayList<>(tableValues));
| return newKeyValue(key, tableValues); | |
| return newKeyValue(key, new java.util.ArrayList<>(tableValues)); |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,207 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * contributor license agreements. See the NOTICE file distributed with | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * this work for additional information regarding copyright ownership. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * (the "License"); you may not use this file except in compliance with | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * the License. You may obtain a copy of the License at | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| package org.apache.hadoop.hdds.utils.db; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| import jakarta.annotation.Nonnull; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.io.Closeable; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.io.IOException; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.io.UncheckedIOException; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.Comparator; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.HashMap; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.Iterator; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.List; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.Map; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.NoSuchElementException; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.Objects; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.PriorityQueue; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.stream.Collectors; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.util.stream.IntStream; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.ozone.util.ClosableIterator; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * An abstract class that provides functionality to merge elements from multiple sorted iterators | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged output is in sorted order | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * by repeatedly polling the smallest element from the heap of iterators. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param <K> the type of keys being merged, must be {@link Comparable} | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param <K> the type of keys being merged, must be {@link Comparable} | |
| * @param <K> the type of keys being merged; a {@link Comparator} for K must be provided via the constructor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Encapsulates several methods from SstFileSetReader.MultipleSstFileIterator, to be used by DBStore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved from SstFileSetReader.MultipleSstFileIterator.next(), except that it also manages keys map.
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 113 says "Clear the keys list by setting all entries to null" but the code actually calls keys.clear() which removes all entries, not sets them to null. The comment should be updated to: "Clear the keys map from the previous iteration."
| // Clear the keys list by setting all entries to null. | |
| // Clear the keys map from the previous iteration. |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method overrides ClosableIterator.close; it is advisable to add an Override annotation.
| @Override |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The close() method only preserves the last IOException that occurred during cleanup. If multiple iterators throw exceptions during close(), only the last one is thrown, potentially hiding other important errors. Consider collecting all exceptions or using a suppressed exception pattern to preserve all error information.
| IOException exception = null; | |
| for (int idx = 0; idx < iterators.size(); idx++) { | |
| try { | |
| closeItrAtIndex(idx); | |
| } catch (IOException e) { | |
| exception = e; | |
| } | |
| } | |
| if (exception != null) { | |
| throw new UncheckedIOException(exception); | |
| IOException firstException = null; | |
| for (int idx = 0; idx < iterators.size(); idx++) { | |
| try { | |
| closeItrAtIndex(idx); | |
| } catch (IOException e) { | |
| if (firstException == null) { | |
| firstException = e; | |
| } else { | |
| firstException.addSuppressed(e); | |
| } | |
| } | |
| } | |
| if (firstException != null) { | |
| throw new UncheckedIOException(firstException); |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The close() method doesn't clear the minHeap after closing all iterators. While the iterators themselves are closed and set to null in the list, HeapEntry objects remain in the minHeap, preventing proper garbage collection and potentially causing issues if the iterator is accidentally used after close().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is moved from SstFileSetRead.HeapEntry, with index and comparator.
index is not used.
comparator is customizable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
index is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines 117 to 121 in 9e3bd7f
| HeapEntry<K> entry = minHeap.poll(); | |
| int idx = entry.index; | |
| // Set the key for the current entry in the keys list. | |
| keys.put(idx, entry.getCurrentKey()); | |
| if (entry.advance()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is being used while setting the value in the Map returned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is acceptable. Closeable is a subclass of AutoCloseable, and the only difference is it throws IOException instead of Exception. This is not a public API.