2525import com .google .common .base .Preconditions ;
2626import java .io .Closeable ;
2727import java .io .IOException ;
28+ import java .util .EnumMap ;
2829import java .util .Map ;
2930import java .util .concurrent .ConcurrentHashMap ;
3031import java .util .concurrent .atomic .AtomicReference ;
3435import java .util .function .Consumer ;
3536import java .util .stream .Collectors ;
3637import java .util .stream .Stream ;
38+ import org .apache .commons .lang3 .tuple .Pair ;
3739import org .apache .commons .pool2 .BasePooledObjectFactory ;
3840import org .apache .commons .pool2 .PooledObject ;
3941import org .apache .commons .pool2 .impl .DefaultPooledObject ;
4042import org .apache .commons .pool2 .impl .GenericObjectPool ;
4143import org .apache .commons .pool2 .impl .GenericObjectPoolConfig ;
4244import org .apache .hadoop .hdds .conf .OzoneConfiguration ;
45+ import org .apache .ratis .util .UncheckedAutoCloseable ;
4346import org .slf4j .Logger ;
4447import org .slf4j .LoggerFactory ;
4548
@@ -56,7 +59,7 @@ public class PoolBasedHierarchicalResourceLockManager implements HierarchicalRes
5659
5760 private final GenericObjectPool <ReadWriteLock > lockPool ;
5861 private final ResourceLockTracker <DAGLeveledResource > resourceLockTracker ;
59- private final Map <DAGLeveledResource , Map <String , LockReferenceCountPair >> lockMap ;
62+ private final Map <DAGLeveledResource , Pair < ReadWriteLock , Map <String , LockReferenceCountPair > >> lockMap ;
6063
6164 public PoolBasedHierarchicalResourceLockManager (OzoneConfiguration conf ) {
6265 int softLimit = conf .getInt (OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_SOFT_LIMIT ,
@@ -68,15 +71,18 @@ public PoolBasedHierarchicalResourceLockManager(OzoneConfiguration conf) {
6871 config .setMaxTotal (hardLimit );
6972 config .setBlockWhenExhausted (true );
7073 this .lockPool = new GenericObjectPool <>(new ReadWriteLockFactory (), config );
71- this .lockMap = new ConcurrentHashMap <>();
74+ this .lockMap = new EnumMap <>(DAGLeveledResource .class );
75+ for (DAGLeveledResource resource : DAGLeveledResource .values ()) {
76+ this .lockMap .put (resource , Pair .of (new ReentrantReadWriteLock (), new ConcurrentHashMap <>(0 )));
77+ }
7278 this .resourceLockTracker = DAGResourceLockTracker .get ();
7379 }
7480
7581 private ReadWriteLock operateOnLock (DAGLeveledResource resource , String key ,
7682 Consumer <LockReferenceCountPair > function ) throws IOException {
7783 AtomicReference <IOException > exception = new AtomicReference <>();
78- Map <String , LockReferenceCountPair > resourceLockMap =
79- this . lockMap . computeIfAbsent ( resource , k -> new ConcurrentHashMap <>() );
84+ Pair < ReadWriteLock , Map <String , LockReferenceCountPair >> resourceLockPair = this . lockMap . get ( resource );
85+ Map < String , LockReferenceCountPair > resourceLockMap = resourceLockPair . getValue ( );
8086 LockReferenceCountPair lockRef = resourceLockMap .compute (key , (k , v ) -> {
8187 if (v == null ) {
8288 try {
@@ -111,6 +117,16 @@ public HierarchicalResourceLock acquireWriteLock(DAGLeveledResource resource, St
111117 return acquireLock (resource , key , false );
112118 }
113119
120+ @ Override
121+ public HierarchicalResourceLock acquireResourceWriteLock (DAGLeveledResource resource ) throws IOException {
122+ if (!resourceLockTracker .canLockResource (resource )) {
123+ String errorMessage = getErrorMessage (resource );
124+ LOG .error (errorMessage );
125+ throw new RuntimeException (errorMessage );
126+ }
127+ return new PoolBasedHierarchicalResourceLock (resource );
128+ }
129+
114130 private String getErrorMessage (IOzoneManagerLock .Resource resource ) {
115131 return "Thread '" + Thread .currentThread ().getName () + "' cannot " +
116132 "acquire " + resource .getName () + " lock while holding " +
@@ -135,7 +151,7 @@ private HierarchicalResourceLock acquireLock(DAGLeveledResource resource, String
135151 throw new IOException ("Unable to acquire " + (isReadLock ? "read" : "write" ) + " lock on resource "
136152 + resource + " and key " + key );
137153 }
138- return new PoolBasedHierarchicalResourceLock (resource , key ,
154+ return new PoolBasedHierarchicalResourceKeyLock (resource , key ,
139155 isReadLock ? readWriteLock .readLock () : readWriteLock .writeLock ());
140156 }
141157
@@ -144,6 +160,40 @@ public void close() {
144160 this .lockPool .close ();
145161 }
146162
163+ /**
164+ * The PoolBasedHierachicalResourceLock class implements the HierarchicalResourceLock
165+ * and UncheckedAutoCloseable interfaces to manage hierarchical resource locks from
166+ * a shared pool. It ensures proper lock acquisition and release during its lifecycle.
167+ */
168+ private final class PoolBasedHierarchicalResourceLock implements HierarchicalResourceLock ,
169+ UncheckedAutoCloseable {
170+ private final DAGLeveledResource resource ;
171+ private final Lock resourceLock ;
172+ private boolean lockAcquired ;
173+
174+ private PoolBasedHierarchicalResourceLock (DAGLeveledResource resource ) {
175+ this .resource = resource ;
176+ this .resourceLock = lockMap .get (this .resource ).getKey ().writeLock ();
177+ resourceLock .lock ();
178+ resourceLockTracker .lockResource (this .resource );
179+ lockAcquired = true ;
180+ }
181+
182+ @ Override
183+ public boolean isLockAcquired () {
184+ return lockAcquired ;
185+ }
186+
187+ @ Override
188+ public synchronized void close () {
189+ if (lockAcquired ) {
190+ resourceLock .unlock ();
191+ resourceLockTracker .unlockResource (this .resource );
192+ lockAcquired = false ;
193+ }
194+ }
195+ }
196+
147197 /**
148198 * Represents a hierarchical resource lock mechanism that operates
149199 * using a resource pool for acquiring and releasing locks. This class
@@ -159,20 +209,23 @@ public void close() {
159209 * class, {@code PoolBasedHierarchicalResourceLockManager}, which oversees
160210 * the lifecycle of multiple such locks.
161211 */
162- private final class PoolBasedHierarchicalResourceLock implements HierarchicalResourceLock , Closeable {
212+ private final class PoolBasedHierarchicalResourceKeyLock implements HierarchicalResourceLock , Closeable {
163213
164214 private boolean isLockAcquired ;
165- private final Lock lock ;
215+ private final Lock resourceLock ;
216+ private final Lock keyLock ;
166217 private final DAGLeveledResource resource ;
167218 private final String key ;
168219
169- private PoolBasedHierarchicalResourceLock (DAGLeveledResource resource , String key , Lock lock ) {
170- this .isLockAcquired = true ;
171- this .lock = lock ;
220+ private PoolBasedHierarchicalResourceKeyLock (DAGLeveledResource resource , String key , Lock lock ) {
221+ this .keyLock = lock ;
172222 this .resource = resource ;
173223 this .key = key ;
174- this .lock .lock ();
224+ this .resourceLock = lockMap .get (resource ).getKey ().readLock ();
225+ this .resourceLock .lock ();
226+ this .keyLock .lock ();
175227 resourceLockTracker .lockResource (resource );
228+ this .isLockAcquired = true ;
176229 }
177230
178231 @ Override
@@ -183,7 +236,8 @@ public boolean isLockAcquired() {
183236 @ Override
184237 public synchronized void close () throws IOException {
185238 if (isLockAcquired ) {
186- this .lock .unlock ();
239+ this .keyLock .unlock ();
240+ this .resourceLock .unlock ();
187241 resourceLockTracker .unlockResource (resource );
188242 operateOnLock (resource , key , (LockReferenceCountPair ::decrement ));
189243 isLockAcquired = false ;
@@ -193,7 +247,7 @@ public synchronized void close() throws IOException {
193247
194248 private static final class LockReferenceCountPair {
195249 private int count ;
196- private ReadWriteLock lock ;
250+ private final ReadWriteLock lock ;
197251
198252 private LockReferenceCountPair (ReadWriteLock lock ) {
199253 this .count = 0 ;
@@ -220,7 +274,7 @@ private ReadWriteLock getLock() {
220274 private static class ReadWriteLockFactory extends BasePooledObjectFactory <ReadWriteLock > {
221275
222276 @ Override
223- public ReadWriteLock create () throws Exception {
277+ public ReadWriteLock create () {
224278 return new ReentrantReadWriteLock ();
225279 }
226280
0 commit comments