diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ClosableIterator.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ClosableIterator.java index ec8ee8a70a7e..d358bd91d8ca 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ClosableIterator.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/util/ClosableIterator.java @@ -17,12 +17,13 @@ package org.apache.hadoop.ozone.util; +import java.io.Closeable; import java.util.Iterator; /** * An {@link Iterator} that may hold resources until it is closed. */ -public interface ClosableIterator extends Iterator, AutoCloseable { +public interface ClosableIterator extends Iterator, Closeable { @Override void close(); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java index a269ebc56b9b..298260efdf9e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java @@ -17,13 +17,23 @@ package org.apache.hadoop.hdds.utils.db; +import static org.apache.hadoop.hdds.utils.db.Table.newKeyValue; + import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.hadoop.hdds.annotation.InterfaceStability; +import org.apache.hadoop.hdds.utils.db.Table.KeyValue; import org.apache.hadoop.hdds.utils.db.cache.TableCache; import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType; import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions; +import org.apache.hadoop.ozone.util.ClosableIterator; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; import org.apache.ratis.util.UncheckedAutoCloseable; @@ -170,4 +180,39 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) boolean isClosed(); String getSnapshotsParentDir(); + + /** + * Creates an iterator that merges multiple tables into a single iterator, + * grouping values with the same key across the tables. + * + * @param the type of keys for the tables + * @param keyComparator the comparator used to compare keys from different tables + * @param prefix the prefix used to filter entries of each table + * @param table one or more tables to merge + * @return a closable iterator over merged key-value pairs, where each key corresponds + * to a collection of values from the tables + */ + default ClosableIterator>> getMergeIterator( + Comparator keyComparator, KEY prefix, Table... table) { + List tableValues = IntStream.range(0, table.length).mapToObj(i -> null).collect(Collectors.toList()); + KeyValue defaultNullValue = newKeyValue(null, null); + Comparator> comparator = Comparator.comparing(KeyValue::getKey, keyComparator); + return new MinHeapMergeIterator, Table.KeyValueIterator, + KeyValue>>(table.length + 1, comparator) { + @Override + protected Table.KeyValueIterator getIterator(int idx) throws IOException { + return table[idx].iterator(prefix); + } + + @Override + protected KeyValue> merge(Map> keysToMerge) { + KEY key = keysToMerge.values().stream().findAny() + .orElseThrow(() -> new NoSuchElementException("No keys found")).getKey(); + for (int i = 0; i < tableValues.size(); i++) { + tableValues.set(i, keysToMerge.getOrDefault(i, defaultNullValue).getValue()); + } + return newKeyValue(key, tableValues); + } + }; + } } diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/MinHeapMergeIterator.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/MinHeapMergeIterator.java new file mode 100644 index 000000000000..8326b9e9ca1f --- /dev/null +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/MinHeapMergeIterator.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import jakarta.annotation.Nonnull; +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.ozone.util.ClosableIterator; + +/** + * An abstract class that provides functionality to merge elements from multiple sorted iterators + * using a min-heap. The {@code MinHeapMergeIterator} ensures the merged output is in sorted order + * by repeatedly polling the smallest element from the heap of iterators. + * + * @param the type of keys being merged, must be {@link Comparable} + * @param the type of iterators being used, must extend {@link Iterator} and implement {@link Closeable} + * @param the type of the final merged output + */ +public abstract class MinHeapMergeIterator & Closeable, V> + implements ClosableIterator { + private final PriorityQueue> minHeap; + private final Map keys; + private final List iterators; + private boolean initialized; + private final Comparator comparator; + + public MinHeapMergeIterator(int numberOfIterators, Comparator comparator) { + this.minHeap = new PriorityQueue<>(Math.max(numberOfIterators, 1)); + keys = new HashMap<>(numberOfIterators); + iterators = IntStream.range(0, numberOfIterators).mapToObj(i -> (I) null).collect(Collectors.toList()); + this.initialized = false; + this.comparator = comparator; + } + + protected abstract I getIterator(int idx) throws IOException; + + private boolean initHeap() throws IOException { + if (initialized) { + return false; + } + initialized = true; + int count = 0; + try { + for (int idx = 0; idx < iterators.size(); idx++) { + I itr = getIterator(idx); + iterators.set(idx, itr); + HeapEntry entry = new HeapEntry<>(idx, itr, comparator); + if (entry.getCurrentKey() != null) { + minHeap.add(entry); + count++; + } else { + // No valid entries, close the iterator. + closeItrAtIndex(idx); + } + } + } catch (IOException e) { + close(); + throw e; + } + return count > 0; + } + + @Override + public boolean hasNext() { + try { + return !minHeap.isEmpty() || initHeap(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + protected abstract V merge(Map keysToMerge); + + @Override + public V next() { + if (!hasNext()) { + throw new NoSuchElementException("No more elements found."); + } + + assert minHeap.peek() != null; + // Get current key from heap + K currentKey = minHeap.peek().getCurrentKey(); + // Clear the keys list by setting all entries to null. + keys.clear(); + // Advance all entries with the same key (from different files) + while (!minHeap.isEmpty() && Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) { + HeapEntry entry = minHeap.poll(); + int idx = entry.index; + // Set the key for the current entry in the keys list. + keys.put(idx, entry.getCurrentKey()); + if (entry.advance()) { + minHeap.offer(entry); + } else { + // Iterator is exhausted, close it to prevent resource leak + try { + closeItrAtIndex(idx); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + return merge(keys); + } + + private void closeItrAtIndex(int idx) throws IOException { + if (iterators.get(idx) != null) { + iterators.get(idx).close(); + iterators.set(idx, null); + } + } + + @Override + public void close() { + IOException exception = null; + for (int idx = 0; idx < iterators.size(); idx++) { + try { + closeItrAtIndex(idx); + } catch (IOException e) { + exception = e; + } + } + if (exception != null) { + throw new UncheckedIOException(exception); + } + } + + /** + * A wrapper class that holds an iterator and its current value for heap operations. + */ + private static final class HeapEntry implements Comparable> { + private final int index; + private final Iterator iterator; + private T currentKey; + private Comparator comparator; + + private HeapEntry(int index, Iterator iterator, Comparator comparator) { + this.iterator = iterator; + this.index = index; + this.comparator = comparator; + advance(); + } + + private boolean advance() { + if (iterator.hasNext()) { + currentKey = iterator.next(); + return true; + } else { + currentKey = null; + return false; + } + } + + private T getCurrentKey() { + return currentKey; + } + + @Override + public int compareTo(@Nonnull HeapEntry other) { + return Comparator.comparing(HeapEntry::getCurrentKey, this.comparator).compare(this, other); + } + + @Override + @SuppressWarnings("unchecked") + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + HeapEntry other = (HeapEntry) obj; + return this.compareTo(other) == 0; + } + + @Override + public int hashCode() { + return currentKey.hashCode(); + } + } +} diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java index 4ad74d08f18a..243720c4b4ed 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java @@ -19,19 +19,20 @@ import static java.nio.charset.StandardCharsets.UTF_8; -import jakarta.annotation.Nonnull; -import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Path; import java.util.Collection; -import java.util.Comparator; +import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; -import java.util.PriorityQueue; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.hadoop.hdds.utils.db.MinHeapMergeIterator; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader; @@ -56,7 +57,7 @@ public SstFileSetReader(final Collection sstFiles) { this.sstFiles = sstFiles; } - public long getEstimatedTotalKeys() throws RocksDBException { + public long getEstimatedTotalKeys() throws RocksDatabaseException { if (estimatedTotalKeys != -1) { return estimatedTotalKeys; } @@ -72,6 +73,8 @@ public long getEstimatedTotalKeys() throws RocksDBException { try (ManagedSstFileReader fileReader = new ManagedSstFileReader(options)) { fileReader.open(sstFile.toAbsolutePath().toString()); estimatedSize += fileReader.getTableProperties().getNumEntries(); + } catch (RocksDBException e) { + throw new RocksDatabaseException("Failed to open SST file: " + sstFile, e); } } } @@ -81,8 +84,7 @@ public long getEstimatedTotalKeys() throws RocksDBException { return estimatedTotalKeys; } - public ClosableIterator getKeyStream(String lowerBound, - String upperBound) throws RocksDBException { + public ClosableIterator getKeyStream(String lowerBound, String upperBound) { // TODO: [SNAPSHOT] Check if default Options and ReadOptions is enough. final MultipleSstFileIterator itr = new MultipleSstFileIterator(sstFiles) { private ManagedOptions options; @@ -110,7 +112,7 @@ protected void init() { } @Override - protected ClosableIterator getKeyIteratorForFile(String file) throws RocksDBException { + protected ClosableIterator getKeyIteratorForFile(String file) throws RocksDatabaseException { return new ManagedSstFileIterator(file, options, readOptions) { @Override protected String getIteratorValue(ManagedSstFileReaderIterator iterator) { @@ -130,8 +132,7 @@ public void close() throws UncheckedIOException { return itr; } - public ClosableIterator getKeyStreamWithTombstone(String lowerBound, String upperBound) - throws RocksDBException { + public ClosableIterator getKeyStreamWithTombstone(String lowerBound, String upperBound) { final MultipleSstFileIterator itr = new MultipleSstFileIterator(sstFiles) { //TODO: [SNAPSHOT] Check if default Options is enough. private ManagedOptions options; @@ -172,11 +173,15 @@ private abstract static class ManagedSstFileIterator implements ClosableIterator private final ManagedSstFileReaderIterator fileReaderIterator; ManagedSstFileIterator(String path, ManagedOptions options, ManagedReadOptions readOptions) - throws RocksDBException { - this.fileReader = new ManagedSstFileReader(options); - this.fileReader.open(path); - this.fileReaderIterator = ManagedSstFileReaderIterator.managed(fileReader.newIterator(readOptions)); - fileReaderIterator.get().seekToFirst(); + throws RocksDatabaseException { + try { + this.fileReader = new ManagedSstFileReader(options); + this.fileReader.open(path); + this.fileReaderIterator = ManagedSstFileReaderIterator.managed(fileReader.newIterator(readOptions)); + fileReaderIterator.get().seekToFirst(); + } catch (RocksDBException e) { + throw new RocksDatabaseException("Failed to open SST file: " + path, e); + } } @Override @@ -228,63 +233,6 @@ public String next() { } } - /** - * A wrapper class that holds an iterator and its current value for heap operations. - */ - private static class HeapEntry> - implements Comparable>, Closeable { - private final ClosableIterator iterator; - private T currentKey; - - HeapEntry(ClosableIterator iterator) { - this.iterator = iterator; - advance(); - } - - @Override - public void close() { - iterator.close(); - } - - boolean advance() { - if (iterator.hasNext()) { - currentKey = iterator.next(); - return true; - } else { - currentKey = null; - return false; - } - } - - T getCurrentKey() { - return currentKey; - } - - @Override - public int compareTo(@Nonnull HeapEntry other) { - return Comparator.comparing(HeapEntry::getCurrentKey).compare(this, other); - } - - @Override - @SuppressWarnings("unchecked") - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - - HeapEntry other = (HeapEntry) obj; - return this.compareTo(other) == 0; - } - - @Override - public int hashCode() { - return currentKey.hashCode(); - } - } - /** * The MultipleSstFileIterator class is an abstract base for iterating over multiple SST files. * It uses a PriorityQueue to merge keys from all files in sorted order. @@ -292,73 +240,29 @@ public int hashCode() { * which ensures stable ordering for identical keys by considering the file index. * @param */ - private abstract static class MultipleSstFileIterator> implements ClosableIterator { - private final PriorityQueue> minHeap; + private abstract static class MultipleSstFileIterator> + extends MinHeapMergeIterator, T> { + private final List sstFiles; private MultipleSstFileIterator(Collection sstFiles) { - this.minHeap = new PriorityQueue<>(); + super(sstFiles.size(), Comparable::compareTo); init(); - initMinHeap(sstFiles); + this.sstFiles = sstFiles.stream().map(Path::toAbsolutePath).collect(Collectors.toList()); } protected abstract void init(); - protected abstract ClosableIterator getKeyIteratorForFile(String file) throws RocksDBException, IOException; - - private void initMinHeap(Collection files) { - try { - for (Path file : files) { - ClosableIterator iterator = getKeyIteratorForFile(file.toAbsolutePath().toString()); - HeapEntry entry = new HeapEntry<>(iterator); - - if (entry.getCurrentKey() != null) { - minHeap.offer(entry); - } else { - // No valid entries, close the iterator - entry.close(); - } - } - } catch (IOException | RocksDBException e) { - // Clean up any opened iterators - close(); - throw new RuntimeException("Failed to initialize SST file iterators", e); - } - } - - @Override - public boolean hasNext() { - return !minHeap.isEmpty(); - } + protected abstract ClosableIterator getKeyIteratorForFile(String file) throws IOException; @Override - public T next() { - if (!hasNext()) { - throw new NoSuchElementException("No more elements found."); - } - - assert minHeap.peek() != null; - // Get current key from heap - T currentKey = minHeap.peek().getCurrentKey(); - - // Advance all entries with the same key (from different files) - while (!minHeap.isEmpty() && Objects.equals(minHeap.peek().getCurrentKey(), currentKey)) { - HeapEntry entry = minHeap.poll(); - if (entry.advance()) { - minHeap.offer(entry); - } else { - // Iterator is exhausted, close it to prevent resource leak - entry.close(); - } - } - - return currentKey; + protected ClosableIterator getIterator(int idx) throws IOException { + return getKeyIteratorForFile(sstFiles.get(idx).toString()); } @Override - public void close() { - while (!minHeap.isEmpty()) { - minHeap.poll().close(); - } + protected T merge(Map keys) { + return keys.values().stream().findAny() + .orElseThrow(() -> new NoSuchElementException("All values are null from all iterators.")); } } diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestMinHeapMergeIterator.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestMinHeapMergeIterator.java new file mode 100644 index 000000000000..1ac177c646b6 --- /dev/null +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/hadoop/hdds/utils/db/TestMinHeapMergeIterator.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link MinHeapMergeIterator}. + */ +class TestMinHeapMergeIterator { + + private static final Comparator STRING_COMPARATOR = String::compareTo; + + /** + * A closeable iterator which tracks close() calls. + */ + private static final class TrackingCloseableIterator + implements Iterator, Closeable { + private final List data; + private int idx = 0; + private int closeCount = 0; + + private TrackingCloseableIterator(List data) { + this.data = data; + } + + @Override + public boolean hasNext() { + return idx < data.size(); + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException("Iterator exhausted"); + } + return data.get(idx++); + } + + @Override + public void close() { + closeCount++; + } + + int getCloseCount() { + return closeCount; + } + } + + private static final class MergeResult { + private final String key; + private final Set sources; + + private MergeResult(String key, Set sources) { + this.key = key; + this.sources = sources; + } + + String getKey() { + return key; + } + + Set getSources() { + return sources; + } + } + + /** + * Concrete implementation for tests. + */ + private static final class TestIterator extends MinHeapMergeIterator, MergeResult> { + + private final List> itrs; + private final List merged = new ArrayList<>(); + + private IOException ioExceptionAtIndex; + private int exceptionIndex = -1; + + private TestIterator(List> itrs) { + super(itrs.size(), STRING_COMPARATOR); + this.itrs = itrs; + } + + private TestIterator withGetIteratorIOException(int index, IOException ex) { + this.exceptionIndex = index; + this.ioExceptionAtIndex = ex; + return this; + } + + @Override + protected TrackingCloseableIterator getIterator(int idx) + throws IOException { + if (idx == exceptionIndex) { + if (ioExceptionAtIndex != null) { + throw ioExceptionAtIndex; + } + } + return itrs.get(idx); + } + + @Override + protected MergeResult merge(Map keysToMerge) { + // All values in keysToMerge are expected to be equal (same key across iterators). + String key = keysToMerge.values().iterator().next(); + MergeResult r = new MergeResult(key, new HashSet<>(keysToMerge.keySet())); + merged.add(r); + return r; + } + + List getMerged() { + return merged; + } + } + + @Test + void testMergedOrderAndDuplicateGroupingAndAutoCloseOnExhaustion() { + TrackingCloseableIterator itr0 = + new TrackingCloseableIterator<>(ImmutableList.of("a", "c", "e", "g")); + TrackingCloseableIterator itr1 = + new TrackingCloseableIterator<>(ImmutableList.of("b", "c", "d", "g", "h")); + TrackingCloseableIterator itr2 = + new TrackingCloseableIterator<>(ImmutableList.of("c", "e", "f", "h")); + + List keys = new ArrayList<>(); + try (TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1, itr2))) { + while (mergeItr.hasNext()) { + keys.add(mergeItr.next().getKey()); + } + + assertEquals(ImmutableList.of("a", "b", "c", "d", "e", "f", "g", "h"), keys); + + // Validate sources for every merged key. + java.util.Map> expectedSources = + ImmutableMap.>builder() + .put("a", ImmutableSet.of(0)) + .put("b", ImmutableSet.of(1)) + .put("c", ImmutableSet.of(0, 1, 2)) + .put("d", ImmutableSet.of(1)) + .put("e", ImmutableSet.of(0, 2)) + .put("f", ImmutableSet.of(2)) + .put("g", ImmutableSet.of(0, 1)) + .put("h", ImmutableSet.of(1, 2)) + .build(); + + ImmutableMap.Builder> actualSourcesBuilder = ImmutableMap.builder(); + for (MergeResult r : mergeItr.getMerged()) { + actualSourcesBuilder.put(r.getKey(), r.getSources()); + } + java.util.Map> actualSources = actualSourcesBuilder.build(); + assertEquals(expectedSources, actualSources); + } + + // All iterators should have been auto-closed when they became exhausted. + assertEquals(1, itr0.getCloseCount()); + assertEquals(1, itr1.getCloseCount()); + assertEquals(1, itr2.getCloseCount()); + } + + @Test + void testInitClosesEmptyIterators() { + TrackingCloseableIterator empty = + new TrackingCloseableIterator<>(Collections.emptyList()); + TrackingCloseableIterator nonEmpty = + new TrackingCloseableIterator<>(ImmutableList.of("a")); + + try (TestIterator mergeItr = new TestIterator(ImmutableList.of(empty, nonEmpty))) { + assertTrue(mergeItr.hasNext()); // triggers init + assertEquals(1, empty.getCloseCount(), "Empty iterator should be closed during init"); + + assertEquals("a", mergeItr.next().getKey()); + assertFalse(mergeItr.hasNext()); + } + assertEquals(1, nonEmpty.getCloseCount(), "Iterator should be closed when exhausted"); + } + + @Test + void testCloseClosesAllIterators() { + TrackingCloseableIterator itr0 = + new TrackingCloseableIterator<>(ImmutableList.of("a", "c")); + TrackingCloseableIterator itr1 = + new TrackingCloseableIterator<>(ImmutableList.of("b", "d")); + + try (TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1))) { + assertTrue(mergeItr.hasNext()); // triggers init + mergeItr.close(); + assertEquals(1, itr0.getCloseCount()); + assertEquals(1, itr1.getCloseCount()); + + // idempotent close + mergeItr.close(); + } + assertEquals(1, itr0.getCloseCount()); + assertEquals(1, itr1.getCloseCount()); + } + + @Test + void testHasNextWrapsIOExceptionFromGetIterator() { + IOException expected = new IOException("boom"); + TrackingCloseableIterator itr0 = + new TrackingCloseableIterator<>(ImmutableList.of("a")); + TrackingCloseableIterator itr1 = + new TrackingCloseableIterator<>(ImmutableList.of("b")); + TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1)); + mergeItr.withGetIteratorIOException(1, expected); + try (TestIterator ignored = mergeItr) { + UncheckedIOException ex = assertThrows(UncheckedIOException.class, mergeItr::hasNext); + assertEquals(expected, ex.getCause()); + } + + // itr0 is registered with MinHeapMergeIterator before idx=1 throws and must be closed by cleanup. + assertEquals(1, itr0.getCloseCount()); + // itr1 was never registered; close explicitly to avoid leaks. + itr1.close(); + assertEquals(1, itr1.getCloseCount()); + } + + @Test + void testHasNextWrapsRocksDBExceptionFromGetIteratorAndClosesOpenedIterators() throws Exception { + TrackingCloseableIterator itr0 = + new TrackingCloseableIterator<>(ImmutableList.of("a", "b")); + TrackingCloseableIterator itr1 = + new TrackingCloseableIterator<>(ImmutableList.of("c")); + RocksDatabaseException rdbEx = new RocksDatabaseException("rocks"); + TestIterator mergeItr = new TestIterator(ImmutableList.of(itr0, itr1)); + mergeItr.withGetIteratorIOException(1, rdbEx); + try (TestIterator ignored = mergeItr) { + UncheckedIOException ex = assertThrows(UncheckedIOException.class, mergeItr::hasNext); + assertInstanceOf(RocksDatabaseException.class, ex.getCause()); + assertEquals(rdbEx, ex.getCause()); + } + + // itr0 was created before the exception and should have been closed via initHeap() cleanup. + assertEquals(1, itr0.getCloseCount()); + // itr1 was never registered; close explicitly to avoid leaks. + itr1.close(); + assertEquals(1, itr1.getCloseCount()); + } + + @Test + void testNextWhenEmptyThrowsNoSuchElement() { + TrackingCloseableIterator empty = + new TrackingCloseableIterator<>(Collections.emptyList()); + try (TestIterator mergeItr = new TestIterator(ImmutableList.of(empty))) { + assertFalse(mergeItr.hasNext()); + assertThrows(NoSuchElementException.class, mergeItr::next); + } + assertEquals(1, empty.getCloseCount(), "Empty iterator should be closed during init"); + } +} + + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java index 4eeec3dc4c62..0c764f948860 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java @@ -1081,16 +1081,12 @@ void addToObjectIdMap(Table fsTable, throw new RuntimeException(e); } } - } catch (RocksDBException rocksDBException) { - // TODO: [SNAPSHOT] Gracefully handle exception - // e.g. when input files do not exist - throw new RuntimeException(rocksDBException); } } private void validateEstimatedKeyChangesAreInLimits( SstFileSetReader sstFileReader - ) throws RocksDBException, IOException { + ) throws IOException { if (sstFileReader.getEstimatedTotalKeys() > maxAllowedKeyChangesForASnapDiff) { // TODO: [SNAPSHOT] HDDS-8202: Change it to custom snapshot exception. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java index 2b72e05115bc..daca37095334 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java @@ -91,7 +91,6 @@ import org.apache.ozone.rocksdb.util.SstFileSetReader; import org.apache.ratis.util.UncheckedAutoCloseable; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; -import org.rocksdb.RocksDBException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -354,8 +353,6 @@ private Pair spillTableDiffIntoSstFile(List deltaFilePaths, deltaEntriesCount++; } } - } catch (RocksDBException e) { - throw new RocksDatabaseException("Error while reading sst files.", e); } // If there are no delta entries then delete the delta file. No need to ingest the file as a diff. return Pair.of(fileToBeIngested, deltaEntriesCount != 0);