Skip to content

Commit 76b146c

Browse files
authored
HDDS-13967. OM bootstrap lock should acquire a lock on SnapshotLocalDataManager before copying the local data files (#9335)
1 parent 40304b6 commit 76b146c

5 files changed

Lines changed: 92 additions & 62 deletions

File tree

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@
3535
import static org.junit.jupiter.api.Assertions.assertNull;
3636
import static org.junit.jupiter.api.Assertions.assertTrue;
3737
import static org.junit.jupiter.api.Assertions.fail;
38+
import static org.mockito.ArgumentMatchers.anyCollection;
3839
import static org.mockito.ArgumentMatchers.anyInt;
3940
import static org.mockito.ArgumentMatchers.anyMap;
4041
import static org.mockito.ArgumentMatchers.anySet;
4142
import static org.mockito.Mockito.any;
4243
import static org.mockito.Mockito.anyBoolean;
44+
import static org.mockito.Mockito.doAnswer;
4345
import static org.mockito.Mockito.doCallRealMethod;
4446
import static org.mockito.Mockito.doNothing;
4547
import static org.mockito.Mockito.eq;
@@ -109,6 +111,7 @@
109111
import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
110112
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
111113
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
114+
import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
112115
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
113116
import org.apache.hadoop.security.UserGroupInformation;
114117
import org.apache.ozone.test.GenericTestUtils;
@@ -169,7 +172,8 @@ private void setupCluster() throws Exception {
169172
conf.set(OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD);
170173
cluster.waitForClusterToBeReady();
171174
client = cluster.newClient();
172-
om = cluster.getOzoneManager();
175+
OzoneManager normalOm = cluster.getOzoneManager();
176+
om = spy(normalOm);
173177
}
174178

175179
private void setupMocks() throws Exception {
@@ -244,7 +248,7 @@ public void write(int b) throws IOException {
244248
doCallRealMethod().when(omDbCheckpointServletMock).getCompactionLogDir();
245249
doCallRealMethod().when(omDbCheckpointServletMock).getSstBackupDir();
246250
doCallRealMethod().when(omDbCheckpointServletMock)
247-
.transferSnapshotData(anySet(), any(), anySet(), any(), any(), anyMap());
251+
.transferSnapshotData(anySet(), any(), anyCollection(), anyCollection(), any(), any(), anyMap());
248252
doCallRealMethod().when(omDbCheckpointServletMock).createAndPrepareCheckpoint(anyBoolean());
249253
doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirsFromDB(any(), any(), any());
250254
}
@@ -767,27 +771,33 @@ public void testCheckpointIncludesSnapshotsFromFrozenState() throws Exception {
767771
.filter(snap -> snap.getName().equals("snapshot1"))
768772
.findFirst()
769773
.orElseThrow(() -> new RuntimeException("snapshot1 not found"));
770-
771774
// Setup servlet mocks for checkpoint processing
772775
setupMocks();
773776
when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)).thenReturn("true");
774777

775778
// Create a checkpoint that captures current state (S1)
776-
DBStore dbStore = om.getMetadataManager().getStore();
777-
DBStore spyDbStore = spy(dbStore);
778-
AtomicReference<DBCheckpoint> capturedCheckpoint = new AtomicReference<>();
779+
DBStore spyDbStore = spy(om.getMetadataManager().getStore());
779780

780-
when(spyDbStore.getCheckpoint(true)).thenAnswer(invocation -> {
781-
// Purge snapshot2 before checkpoint
781+
AtomicReference<DBCheckpoint> capturedCheckpoint = new AtomicReference<>();
782+
SnapshotCache spySnapshotCache = spy(om.getOmSnapshotManager().getSnapshotCache());
783+
OmSnapshotManager spySnapshotManager = spy(om.getOmSnapshotManager());
784+
when(om.getOmSnapshotManager()).thenReturn(spySnapshotManager);
785+
when(spySnapshotManager.getSnapshotCache()).thenReturn(spySnapshotCache);
786+
// Mock the snapshot cache to create a snapshot2 just after taking a snapshot cache lock.
787+
doAnswer(invocationOnMock -> {
788+
Object ret = invocationOnMock.callRealMethod();
782789
// create snapshot 3 before checkpoint
783790
client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot2");
784791
// Also wait for double buffer to flush to ensure all transactions are committed
785792
om.awaitDoubleBufferFlush();
786-
DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true));
793+
return ret;
794+
}).when(spySnapshotCache).lock();
795+
doAnswer(invocation -> {
796+
DBCheckpoint checkpoint = (DBCheckpoint) spy(invocation.callRealMethod());
787797
doNothing().when(checkpoint).cleanupCheckpoint(); // Don't cleanup for verification
788798
capturedCheckpoint.set(checkpoint);
789799
return checkpoint;
790-
});
800+
}).when(spyDbStore).getCheckpoint(eq(true));
791801

792802
// Initialize servlet
793803
doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(),

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@
5353
import java.util.List;
5454
import java.util.Map;
5555
import java.util.Set;
56+
import java.util.UUID;
5657
import java.util.concurrent.atomic.AtomicLong;
58+
import java.util.stream.Collectors;
5759
import java.util.stream.Stream;
5860
import javax.servlet.ServletException;
5961
import javax.servlet.http.HttpServletRequest;
@@ -70,6 +72,7 @@
7072
import org.apache.hadoop.ozone.OzoneConsts;
7173
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
7274
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
75+
import org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager.HierarchicalResourceLock;
7376
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
7477
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
7578
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
@@ -208,7 +211,6 @@ Path getCompactionLogDir() {
208211
* @param tmpdir Temporary directory for staging files during archiving.
209212
* @throws IOException if an I/O error occurs during processing or streaming.
210213
*/
211-
212214
public void writeDbDataToStream(HttpServletRequest request, OutputStream destination,
213215
Set<String> sstFilesToExclude, Path tmpdir) throws IOException {
214216
DBCheckpoint checkpoint = null;
@@ -219,12 +221,12 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
219221
AtomicLong maxTotalSstSize = new AtomicLong(getConf().getLong(OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY,
220222
OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT));
221223

222-
Set<Path> snapshotPaths = Collections.emptySet();
224+
Collection<Path> snapshotPaths = Collections.emptySet();
223225

224226
if (!includeSnapshotData) {
225227
maxTotalSstSize.set(Long.MAX_VALUE);
226228
} else {
227-
snapshotPaths = getSnapshotDirsFromDB(omMetadataManager, omMetadataManager, snapshotLocalDataManager);
229+
snapshotPaths = getSnapshotDirsFromDB(omMetadataManager, omMetadataManager, snapshotLocalDataManager).values();
228230
}
229231

230232
if (sstFilesToExclude.isEmpty()) {
@@ -260,6 +262,7 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
260262
// this is the last step where we transfer the active om.db contents
261263
Map<String, String> hardLinkFileMap = new HashMap<>();
262264
SnapshotCache snapshotCache = om.getOmSnapshotManager().getSnapshotCache();
265+
OmSnapshotLocalDataManager localDataManager = om.getOmSnapshotManager().getSnapshotLocalDataManager();
263266
/*
264267
* Acquire snapshot cache lock when includeSnapshotData is true to prevent race conditions
265268
* between checkpoint operations and snapshot purge operations. Without this lock, a purge
@@ -272,7 +275,9 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
272275
* In this case, the try-with-resources block does not call close() on any resource,
273276
* which is intentional because snapshot consistency is not required.
274277
*/
275-
try (UncheckedAutoCloseableSupplier<OMLockDetails> lock = includeSnapshotData ? snapshotCache.lock() : null) {
278+
try (UncheckedAutoCloseableSupplier<OMLockDetails> snapshotDBLock =
279+
includeSnapshotData ? snapshotCache.lock() : null;
280+
HierarchicalResourceLock snapshotLocalDataLock = includeSnapshotData ? localDataManager.lock() : null) {
276281
// get the list of sst files of the checkpoint.
277282
checkpoint = createAndPrepareCheckpoint(true);
278283
// unlimited files as we want the Active DB contents to be transferred in a single batch
@@ -283,18 +288,21 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
283288
if (includeSnapshotData) {
284289
List<Path> sstBackupFiles = extractSSTFilesFromCompactionLog(checkpoint);
285290
// get the list of snapshots from the checkpoint
291+
Map<UUID, Path> snapshotInCheckpoint;
286292
try (OmMetadataManagerImpl checkpointMetadataManager = OmMetadataManagerImpl
287293
.createCheckpointMetadataManager(om.getConfiguration(), checkpoint)) {
288-
snapshotPaths = getSnapshotDirsFromDB(omMetadataManager, checkpointMetadataManager,
294+
snapshotInCheckpoint = getSnapshotDirsFromDB(omMetadataManager, checkpointMetadataManager,
289295
snapshotLocalDataManager);
290296
}
291297
writeDBToArchive(sstFilesToExclude, getCompactionLogDir(), maxTotalSstSize, archiveOutputStream, tmpdir,
292298
hardLinkFileMap, false);
293299
writeDBToArchive(sstFilesToExclude, sstBackupFiles.stream(), maxTotalSstSize, archiveOutputStream, tmpdir,
294300
hardLinkFileMap, false);
301+
Collection<Path> snapshotLocalPropertyFiles = getSnapshotLocalDataPaths(localDataManager,
302+
snapshotInCheckpoint.keySet());
295303
// This is done to ensure all data to be copied correctly is flushed in the snapshot DB
296-
transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths, maxTotalSstSize, archiveOutputStream,
297-
hardLinkFileMap);
304+
transferSnapshotData(sstFilesToExclude, tmpdir, snapshotInCheckpoint.values(), snapshotLocalPropertyFiles,
305+
maxTotalSstSize, archiveOutputStream, hardLinkFileMap);
298306
}
299307
}
300308
writeHardlinkFile(getConf(), hardLinkFileMap, archiveOutputStream);
@@ -309,6 +317,36 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
309317
}
310318
}
311319

320+
/**
321+
* Retrieves the paths to the local property YAML files for the specified snapshot IDs.
322+
* This method resolves the chain of previous snapshot references for each snapshot ID
323+
* and gathers their corresponding local property YAML file paths.
324+
*
325+
* @param localDataManager The OmSnapshotLocalDataManager instance responsible for managing
326+
* snapshot data and metadata.
327+
* @param snapshotIds A set of snapshot IDs for which the local property YAML file
328+
* paths should be resolved.
329+
* @return A collection of paths to the local property YAML files for the specified
330+
* snapshot IDs.
331+
*/
332+
private Collection<Path> getSnapshotLocalDataPaths(OmSnapshotLocalDataManager localDataManager,
333+
Set<UUID> snapshotIds) {
334+
Set<UUID> snapshotLocalDataIds = new HashSet<>();
335+
Map<UUID, OmSnapshotLocalDataManager.SnapshotVersionsMeta> versionNodeMap =
336+
localDataManager.getVersionNodeMapUnmodifiable();
337+
for (UUID snapshot : snapshotIds) {
338+
UUID id = snapshot;
339+
// Get the previous snapshot id for the current snapshot id until we reach null or the first snapshot id which
340+
// is already in the snapshotLocalDataIds set.
341+
while (id != null && !snapshotLocalDataIds.contains(id)) {
342+
snapshotLocalDataIds.add(id);
343+
id = versionNodeMap.get(id).getPreviousSnapshotId();
344+
}
345+
}
346+
return snapshotLocalDataIds.stream().map(localDataManager::getSnapshotLocalPropertyYamlPath)
347+
.map(Paths::get).collect(Collectors.toList());
348+
}
349+
312350
/**
313351
* Transfers the snapshot data from the specified snapshot directories into the archive output stream,
314352
* handling deduplication and managing resource locking.
@@ -321,19 +359,18 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina
321359
* @param hardLinkFileMap Map of hardlink file paths to their unique identifiers for deduplication.
322360
* @throws IOException if an I/O error occurs during processing.
323361
*/
324-
void transferSnapshotData(Set<String> sstFilesToExclude, Path tmpdir, Set<Path> snapshotPaths,
325-
AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry> archiveOutputStream,
326-
Map<String, String> hardLinkFileMap) throws IOException {
362+
void transferSnapshotData(Set<String> sstFilesToExclude, Path tmpdir, Collection<Path> snapshotPaths,
363+
Collection<Path> snapshotLocalPropertyFiles, AtomicLong maxTotalSstSize,
364+
ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Map<String, String> hardLinkFileMap)
365+
throws IOException {
327366
for (Path snapshotDir : snapshotPaths) {
328367
writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap,
329368
false);
330-
Path snapshotLocalPropertyYaml = Paths.get(
331-
OmSnapshotLocalDataManager.getSnapshotLocalPropertyYamlPath(snapshotDir));
332-
if (Files.exists(snapshotLocalPropertyYaml)) {
333-
File yamlFile = snapshotLocalPropertyYaml.toFile();
334-
hardLinkFileMap.put(yamlFile.getAbsolutePath(), yamlFile.getName());
335-
linkAndIncludeFile(yamlFile, yamlFile.getName(), archiveOutputStream, tmpdir);
336-
}
369+
}
370+
for (Path snapshotLocalPropertyYaml : snapshotLocalPropertyFiles) {
371+
File yamlFile = snapshotLocalPropertyYaml.toFile();
372+
hardLinkFileMap.put(yamlFile.getAbsolutePath(), yamlFile.getName());
373+
linkAndIncludeFile(yamlFile, yamlFile.getName(), archiveOutputStream, tmpdir);
337374
}
338375
}
339376

@@ -399,12 +436,12 @@ private OzoneConfiguration getConf() {
399436
* Collects paths to all snapshot databases from the OM DB.
400437
*
401438
* @param activeOMMetadataManager OMMetadataManager instance
402-
* @return Set of paths to snapshot databases
439+
* @return Map of paths to snapshot databases with snapshot IDs as keys.
403440
* @throws IOException if an I/O error occurs
404441
*/
405-
Set<Path> getSnapshotDirsFromDB(OMMetadataManager activeOMMetadataManager, OMMetadataManager omMetadataManager,
442+
Map<UUID, Path> getSnapshotDirsFromDB(OMMetadataManager activeOMMetadataManager, OMMetadataManager omMetadataManager,
406443
OmSnapshotLocalDataManager localDataManager) throws IOException {
407-
Set<Path> snapshotPaths = new HashSet<>();
444+
Map<UUID, Path> snapshotPaths = new HashMap<>();
408445
try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>> iter =
409446
omMetadataManager.getSnapshotInfoTable().iterator()) {
410447
while (iter.hasNext()) {
@@ -414,7 +451,7 @@ Set<Path> getSnapshotDirsFromDB(OMMetadataManager activeOMMetadataManager, OMMet
414451
localDataManager.getOmSnapshotLocalDataMeta(snapshotInfo.getSnapshotId())) {
415452
Path snapshotDir = getSnapshotPath(activeOMMetadataManager, snapshotInfo.getSnapshotId(),
416453
snapLocalMeta.getMeta().getVersion());
417-
snapshotPaths.add(snapshotDir);
454+
snapshotPaths.put(snapshotInfo.getSnapshotId(), snapshotDir);
418455
}
419456
}
420457
}

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMDBCheckpointUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
import java.nio.file.Files;
2525
import java.nio.file.Path;
26-
import java.util.Set;
26+
import java.util.Collection;
2727
import javax.servlet.http.HttpServletRequest;
2828
import org.apache.commons.io.IOCase;
2929
import org.apache.commons.io.file.Counters;
@@ -55,7 +55,7 @@ public static boolean includeSnapshotData(HttpServletRequest request) {
5555
return Boolean.parseBoolean(includeParam);
5656
}
5757

58-
public static void logEstimatedTarballSize(Path dbLocation, Set<Path> snapshotPaths) {
58+
public static void logEstimatedTarballSize(Path dbLocation, Collection<Path> snapshotPaths) {
5959
try {
6060
Counters.PathCounters counters = Counters.longPathCounters();
6161
CountingPathVisitor visitor = new CountingPathVisitor(

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotLocalDataManager.java

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,11 @@
6969
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
7070
import org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager;
7171
import org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager.HierarchicalResourceLock;
72-
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
7372
import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
7473
import org.apache.hadoop.ozone.util.ObjectSerializer;
7574
import org.apache.hadoop.ozone.util.YamlSerializer;
7675
import org.apache.ratis.util.function.CheckedFunction;
7776
import org.apache.ratis.util.function.CheckedSupplier;
78-
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
7977
import org.rocksdb.LiveFileMetaData;
8078
import org.slf4j.Logger;
8179
import org.slf4j.LoggerFactory;
@@ -111,8 +109,6 @@ public class OmSnapshotLocalDataManager implements AutoCloseable {
111109
private final MutableGraph<LocalDataVersionNode> localDataGraph;
112110
private final Map<UUID, SnapshotVersionsMeta> versionNodeMap;
113111
private final OMMetadataManager omMetadataManager;
114-
// Used for acquiring locks on the entire data structure.
115-
private final ReadWriteLock fullLock;
116112
// Used for taking a lock on internal data structure Map and Graph to ensure thread safety;
117113
private final ReadWriteLock internalLock;
118114
// Locks should be always acquired by iterating through the snapshot chain to avoid deadlocks.
@@ -136,11 +132,14 @@ public void computeAndSetChecksum(Yaml yaml, OmSnapshotLocalData data) throws IO
136132
}
137133
};
138134
this.versionNodeMap = new ConcurrentHashMap<>();
139-
this.fullLock = new ReentrantReadWriteLock();
140135
this.internalLock = new ReentrantReadWriteLock();
141136
init(configuration, snapshotChainManager, omLayoutVersionManager, defaultSnapProvider);
142137
}
143138

139+
public Map<UUID, SnapshotVersionsMeta> getVersionNodeMapUnmodifiable() {
140+
return Collections.unmodifiableMap(versionNodeMap);
141+
}
142+
144143
@VisibleForTesting
145144
Map<UUID, SnapshotVersionsMeta> getVersionNodeMap() {
146145
return versionNodeMap;
@@ -224,8 +223,7 @@ public WritableOmSnapshotLocalDataProvider getWritableOmSnapshotLocalData(UUID s
224223
return new WritableOmSnapshotLocalDataProvider(snapshotId, previousSnapshotId);
225224
}
226225

227-
public WritableOmSnapshotLocalDataProvider getWritableOmSnapshotLocalData(UUID snapshotId)
228-
throws IOException {
226+
public WritableOmSnapshotLocalDataProvider getWritableOmSnapshotLocalData(UUID snapshotId) throws IOException {
229227
return new WritableOmSnapshotLocalDataProvider(snapshotId);
230228
}
231229

@@ -448,19 +446,8 @@ void checkOrphanSnapshotVersions(OMMetadataManager metadataManager, SnapshotChai
448446
* @return an instance of {@code UncheckedAutoCloseableSupplier<OMLockDetails>} representing
449447
* the acquired lock details, where the lock will automatically be released on close.
450448
*/
451-
public UncheckedAutoCloseableSupplier<OMLockDetails> lock() {
452-
this.fullLock.writeLock().lock();
453-
return new UncheckedAutoCloseableSupplier<OMLockDetails>() {
454-
@Override
455-
public OMLockDetails get() {
456-
return OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED;
457-
}
458-
459-
@Override
460-
public void close() {
461-
fullLock.writeLock().unlock();
462-
}
463-
};
449+
public HierarchicalResourceLock lock() throws IOException {
450+
return locks.acquireResourceWriteLock(SNAPSHOT_LOCAL_DATA_LOCK);
464451
}
465452

466453
private void validateVersionRemoval(UUID snapshotId, int version) throws IOException {
@@ -831,18 +818,15 @@ public final class WritableOmSnapshotLocalDataProvider extends ReadableOmSnapsho
831818

832819
private WritableOmSnapshotLocalDataProvider(UUID snapshotId) throws IOException {
833820
super(snapshotId, false);
834-
fullLock.readLock().lock();
835821
}
836822

837823
private WritableOmSnapshotLocalDataProvider(UUID snapshotId, UUID snapshotIdToBeResolved) throws IOException {
838824
super(snapshotId, false, null, snapshotIdToBeResolved, true);
839-
fullLock.readLock().lock();
840825
}
841826

842827
private WritableOmSnapshotLocalDataProvider(UUID snapshotId,
843828
CheckedSupplier<Pair<OmSnapshotLocalData, File>, IOException> snapshotLocalDataSupplier) throws IOException {
844829
super(snapshotId, false, snapshotLocalDataSupplier, null, false);
845-
fullLock.readLock().lock();
846830
}
847831

848832
private SnapshotVersionsMeta validateModification(OmSnapshotLocalData snapshotLocalData)
@@ -1005,7 +989,6 @@ private boolean isDirty() {
1005989
@Override
1006990
public void close() throws IOException {
1007991
super.close();
1008-
fullLock.readLock().unlock();
1009992
}
1010993
}
1011994

0 commit comments

Comments
 (0)