1717
1818package org .apache .hadoop .hdds .scm .node ;
1919
20- import com .google .common .annotations .VisibleForTesting ;
2120import java .util .HashSet ;
2221import java .util .List ;
2322import java .util .Objects ;
2423import java .util .Set ;
25- import java .util .concurrent .ConcurrentHashMap ;
26- import org .apache .hadoop .hdds .protocol .DatanodeDetails ;
2724import org .apache .hadoop .hdds .protocol .DatanodeID ;
2825import org .apache .hadoop .hdds .protocol .proto .StorageContainerDatanodeProtocolProtos .StorageReportProto ;
2926import org .apache .hadoop .hdds .scm .container .ContainerID ;
30- import org .apache .hadoop .hdds .scm .pipeline .Pipeline ;
3127import org .apache .hadoop .ozone .container .common .volume .VolumeUsage ;
3228import org .apache .hadoop .util .Time ;
3329import org .slf4j .Logger ;
@@ -70,8 +66,6 @@ public class PendingContainerTracker {
7066
7167 private static final Logger LOG = LoggerFactory .getLogger (PendingContainerTracker .class );
7268
73- private final DatanodeBuckets datanodeBuckets ;
74-
7569 /**
7670 * Maximum container size in bytes.
7771 */
@@ -86,13 +80,15 @@ public class PendingContainerTracker {
8680 * Two-window bucket for a single DataNode.
8781 * Contains current and previous window sets, plus last roll timestamp.
8882 */
89- private static class TwoWindowBucket {
83+ public static class TwoWindowBucket {
9084 private Set <ContainerID > currentWindow = new HashSet <>();
9185 private Set <ContainerID > previousWindow = new HashSet <>();
9286 private long lastRollTime = Time .monotonicNow ();
9387 private final long rollIntervalMs ;
88+ private final DatanodeID datanodeID ;
9489
95- TwoWindowBucket (long rollIntervalMs ) {
90+ TwoWindowBucket (DatanodeID id , long rollIntervalMs ) {
91+ this .datanodeID = id ;
9692 this .rollIntervalMs = rollIntervalMs ;
9793 }
9894
@@ -127,22 +123,22 @@ synchronized boolean contains(ContainerID containerID) {
127123 /**
128124 * Add container to current window.
129125 */
130- synchronized boolean add (ContainerID containerID , DatanodeID dnID ) {
126+ synchronized boolean add (ContainerID containerID ) {
131127 boolean added = currentWindow .add (containerID );
132128 LOG .debug ("Recorded pending container {} on DataNode {}. Added={}, Total pending={}" ,
133- containerID , dnID , added , getCount ());
129+ containerID , datanodeID , added , getCount ());
134130 return added ;
135131 }
136132
137133 /**
138134 * Remove container from both windows.
139135 */
140- synchronized boolean remove (ContainerID containerID , DatanodeID dnID ) {
136+ synchronized boolean remove (ContainerID containerID ) {
141137 boolean removedFromCurrent = currentWindow .remove (containerID );
142138 boolean removedFromPrevious = previousWindow .remove (containerID );
143139 boolean removed = removedFromCurrent || removedFromPrevious ;
144140 LOG .debug ("Removed pending container {} from DataNode {}. Removed={}, Remaining={}" ,
145- containerID , dnID , removed , getCount ());
141+ containerID , datanodeID , removed , getCount ());
146142 return removed ;
147143 }
148144
@@ -154,63 +150,25 @@ synchronized int getCount() {
154150 }
155151 }
156152
157- /**
158- * Per-datanode two-window buckets.
159- */
160- private static class DatanodeBuckets {
161- private final ConcurrentHashMap <DatanodeID , TwoWindowBucket > map = new ConcurrentHashMap <>();
162- private final long rollIntervalMs ;
163-
164- DatanodeBuckets (long rollIntervalMs ) {
165- this .rollIntervalMs = rollIntervalMs ;
166- }
167-
168- TwoWindowBucket get (DatanodeID id ) {
169- final TwoWindowBucket bucket = map .compute (id , (k , b ) -> b != null ? b : new TwoWindowBucket (rollIntervalMs ));
170- bucket .rollIfNeeded ();
171- return bucket ;
172- }
173-
174- TwoWindowBucket get (DatanodeDetails dn ) {
175- Objects .requireNonNull (dn , "dn == null" );
176- return get (dn .getID ());
177- }
178- }
179-
180- public PendingContainerTracker (long maxContainerSize , long rollIntervalMs ,
181- SCMNodeMetrics metrics ) {
182- this .datanodeBuckets = new DatanodeBuckets (rollIntervalMs );
153+ public PendingContainerTracker (long maxContainerSize , long rollIntervalMs , SCMNodeMetrics metrics ) {
183154 this .maxContainerSize = maxContainerSize ;
184155 this .metrics = metrics ;
185156 LOG .info ("PendingContainerTracker initialized with maxContainerSize={}B, rollInterval={}ms" ,
186157 maxContainerSize , rollIntervalMs );
187158 }
188159
189- /**
190- * Advances the two-window tumbling bucket for this datanode when the roll interval has elapsed.
191- * Call on periodic paths (node report) so windows age even when there are no new
192- * allocations or container reports touching this tracker.
193- */
194- public void rollWindowsIfNeeded (DatanodeDetails node ) {
195- Objects .requireNonNull (node , "node == null" );
196- datanodeBuckets .get (node .getID ());
197- }
198-
199160 /**
200161 * Whether the datanode can fit another container of {@link #maxContainerSize} after accounting for
201162 * SCM pending allocations for {@code node} (this tracker) and usable space across volumes on
202- * {@code datanodeInfo}. Pending bytes are {@link #getPendingContainerCount} × {@code maxContainerSize};
163+ * {@code datanodeInfo}. Pending bytes are count × {@code maxContainerSize};
203164 * effective allocatable space sums full-container slots per storage report.
204165 *
205- * @param node identity used to look up pending allocations (same DN as {@code datanodeInfo})
206166 * @param datanodeInfo storage reports for the datanode
207167 */
208- public boolean hasEffectiveAllocatableSpaceForNewContainer (
209- DatanodeDetails node , DatanodeInfo datanodeInfo ) {
210- Objects .requireNonNull (node , "node == null" );
168+ public boolean hasEffectiveAllocatableSpaceForNewContainer (DatanodeInfo datanodeInfo ) {
211169 Objects .requireNonNull (datanodeInfo , "datanodeInfo == null" );
212170
213- long pendingAllocationSize = getPendingContainerCount ( node ) * maxContainerSize ;
171+ long pendingAllocationSize = datanodeInfo . getPendingContainerAllocations (). getCount ( ) * maxContainerSize ;
214172 List <StorageReportProto > storageReports = datanodeInfo .getStorageReports ();
215173 Objects .requireNonNull (storageReports , "storageReports == null" );
216174 if (storageReports .isEmpty ()) {
@@ -231,93 +189,37 @@ public boolean hasEffectiveAllocatableSpaceForNewContainer(
231189 return false ;
232190 }
233191
234- /**
235- * Record a pending container allocation for all DataNodes in the pipeline.
236- * Container is added to the current window.
237- *
238- * @param pipeline The pipeline where container is allocated
239- * @param containerID The container being allocated
240- */
241- public void recordPendingAllocation (Pipeline pipeline , ContainerID containerID ) {
242- Objects .requireNonNull (pipeline , "pipeline == null" );
243- Objects .requireNonNull (containerID , "containerID == null" );
244-
245- for (DatanodeDetails node : pipeline .getNodes ()) {
246- recordPendingAllocationForDatanode (node , containerID );
247- }
248- }
249-
250192 /**
251193 * Record a pending container allocation for a single DataNode.
252194 * Container is added to the current window.
253195 *
254- * @param node The DataNode where container is being allocated/replicated
255196 * @param containerID The container being allocated/replicated
256197 */
257- public void recordPendingAllocationForDatanode (DatanodeDetails node , ContainerID containerID ) {
258- Objects .requireNonNull (node , "node == null" );
198+ public void recordPendingAllocationForDatanode (DatanodeInfo datanodeInfo , ContainerID containerID ) {
259199 Objects .requireNonNull (containerID , "containerID == null" );
260-
261- DatanodeID dnID = node . getID () ;
262- boolean added = addContainerToBucket ( containerID , dnID );
263-
200+ if ( datanodeInfo == null ) {
201+ return ;
202+ }
203+ final boolean added = datanodeInfo . getPendingContainerAllocations (). add ( containerID );
264204 if (added && metrics != null ) {
265205 metrics .incNumPendingContainersAdded ();
266206 }
267207 }
268208
269- private boolean addContainerToBucket (ContainerID containerID , DatanodeID dnID ) {
270- return datanodeBuckets .get (dnID ).add (containerID , dnID );
271- }
272-
273209 /**
274210 * Remove a pending container allocation from a specific DataNode.
275211 * Removes from both current and previous windows.
276212 * Called when container is confirmed.
277213 *
278- * @param node The DataNode
279214 * @param containerID The container to remove from pending
280215 */
281- public void removePendingAllocation (DatanodeDetails node , ContainerID containerID ) {
282- Objects .requireNonNull (node , "node == null" );
216+ public void removePendingAllocation (TwoWindowBucket bucket , ContainerID containerID ) {
283217 Objects .requireNonNull (containerID , "containerID == null" );
284218
285- DatanodeID dnID = node .getID ();
286- boolean removed = removeContainerFromBucket (containerID , dnID );
219+ boolean removed = bucket .remove (containerID );
287220
288221 if (removed && metrics != null ) {
289222 metrics .incNumPendingContainersRemoved ();
290223 }
291224 }
292-
293- private boolean removeContainerFromBucket (ContainerID containerID , DatanodeID dnID ) {
294- return datanodeBuckets .get (dnID ).remove (containerID , dnID );
295- }
296-
297- /**
298- * Number of pending container allocations for this datanode (union of current and previous
299- * windows). This call may advance the internal tumbling window if the roll interval has elapsed.
300- *
301- * @param node The DataNode
302- * @return Pending container count
303- */
304- public long getPendingContainerCount (DatanodeDetails node ) {
305- Objects .requireNonNull (node , "node == null" );
306- return datanodeBuckets .get (node ).getCount ();
307- }
308-
309- /**
310- * Whether container is in the current or previous window for this datanode.
311- */
312- @ VisibleForTesting
313- public boolean containsPendingContainer (DatanodeDetails node , ContainerID containerID ) {
314- Objects .requireNonNull (node , "node == null" );
315- Objects .requireNonNull (containerID , "containerID == null" );
316- return datanodeBuckets .get (node ).contains (containerID );
317- }
318-
319- @ VisibleForTesting
320- public SCMNodeMetrics getMetrics () {
321- return metrics ;
322- }
323225}
0 commit comments