2222import static org .apache .hadoop .hdds .HddsConfigKeys .HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT ;
2323import static org .apache .hadoop .hdds .client .ReplicationFactor .THREE ;
2424import static org .apache .hadoop .hdds .client .ReplicationType .RATIS ;
25+ import static org .apache .hadoop .hdds .scm .ScmConfigKeys .OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL ;
2526import static org .apache .hadoop .hdds .scm .ScmConfigKeys .OZONE_SCM_HA_RATIS_SNAPSHOT_GAP ;
2627import static org .apache .hadoop .hdds .scm .ScmConfigKeys .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL ;
2728import static org .apache .hadoop .ozone .OzoneConfigKeys .OZONE_BLOCK_DELETING_SERVICE_INTERVAL ;
28- import static org .apache .hadoop .ozone .OzoneConfigKeys . OZONE_BLOCK_DELETING_SERVICE_TIMEOUT ;
29+ import static org .apache .hadoop .ozone .om . OMConfigKeys . OZONE_DIR_DELETING_SERVICE_INTERVAL ;
2930import static org .apache .hadoop .ozone .recon .TestReconEndpointUtil .getReconWebAddress ;
3031import static org .junit .jupiter .api .Assertions .assertDoesNotThrow ;
3132import static org .junit .jupiter .api .Assertions .assertEquals ;
3233import static org .junit .jupiter .api .Assertions .assertNotNull ;
34+ import static org .junit .jupiter .api .Assertions .assertTrue ;
3335
3436import com .fasterxml .jackson .databind .ObjectMapper ;
3537import java .time .Duration ;
3638import java .util .Collections ;
3739import java .util .List ;
40+ import java .util .Map ;
41+ import java .util .Objects ;
3842import java .util .concurrent .TimeUnit ;
3943import org .apache .hadoop .fs .CommonConfigurationKeysPublic ;
4044import org .apache .hadoop .fs .FSDataOutputStream ;
4650import org .apache .hadoop .hdds .client .ReplicationConfig ;
4751import org .apache .hadoop .hdds .conf .OzoneConfiguration ;
4852import org .apache .hadoop .hdds .scm .ScmConfig ;
53+ import org .apache .hadoop .hdds .scm .container .ContainerInfo ;
54+ import org .apache .hadoop .hdds .scm .events .SCMEvents ;
4955import org .apache .hadoop .hdds .scm .server .StorageContainerManager ;
5056import org .apache .hadoop .hdds .utils .IOUtils ;
5157import org .apache .hadoop .ozone .HddsDatanodeService ;
6672import org .apache .hadoop .ozone .om .helpers .OmKeyArgs ;
6773import org .apache .hadoop .ozone .om .helpers .OmKeyLocationInfo ;
6874import org .apache .hadoop .ozone .om .helpers .OmKeyLocationInfoGroup ;
75+ import org .apache .hadoop .ozone .recon .api .DataNodeMetricsService ;
76+ import org .apache .hadoop .ozone .recon .api .types .DataNodeMetricsServiceResponse ;
77+ import org .apache .hadoop .ozone .recon .api .types .ScmPendingDeletion ;
6978import org .apache .hadoop .ozone .recon .api .types .StorageCapacityDistributionResponse ;
7079import org .apache .hadoop .ozone .recon .spi .impl .OzoneManagerServiceProviderImpl ;
7180import org .apache .ozone .test .GenericTestUtils ;
@@ -100,6 +109,7 @@ public class TestStorageDistributionEndpoint {
100109 private static final ObjectMapper MAPPER = new ObjectMapper ();
101110
102111 private static final String STORAGE_DIST_ENDPOINT = "/api/v1/storageDistribution" ;
112+ private static final String PENDING_DELETION_ENDPOINT = "/api/v1/pendingDeletion" ;
103113
104114 static List <Arguments > replicationConfigs () {
105115 return Collections .singletonList (
@@ -110,17 +120,14 @@ static List<Arguments> replicationConfigs() {
110120 @ BeforeAll
111121 public static void setup () throws Exception {
112122 conf = new OzoneConfiguration ();
113- conf .setTimeDuration (OZONE_BLOCK_DELETING_SERVICE_INTERVAL , 100 ,
114- TimeUnit .MILLISECONDS );
115- conf .setTimeDuration (OZONE_BLOCK_DELETING_SERVICE_TIMEOUT , 100 ,
116- TimeUnit .MILLISECONDS );
117- conf .setTimeDuration (OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL ,
118- 100 , TimeUnit .MILLISECONDS );
123+ conf .setTimeDuration (OZONE_DIR_DELETING_SERVICE_INTERVAL , 100 , TimeUnit .MILLISECONDS );
124+ conf .setTimeDuration (OZONE_BLOCK_DELETING_SERVICE_INTERVAL , 100 , TimeUnit .MILLISECONDS );
125+ conf .setTimeDuration (OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL , 100 , TimeUnit .MILLISECONDS );
119126 conf .setLong (OZONE_SCM_HA_RATIS_SNAPSHOT_GAP , 1L );
120- conf .setTimeDuration (HDDS_HEARTBEAT_INTERVAL , 50 ,
121- TimeUnit .MILLISECONDS );
122- conf .setTimeDuration (HDDS_CONTAINER_REPORT_INTERVAL , 200 ,
123- TimeUnit . MILLISECONDS );
127+ conf .setTimeDuration (HDDS_HEARTBEAT_INTERVAL , 50 , TimeUnit . MILLISECONDS );
128+ conf . setTimeDuration ( HDDS_CONTAINER_REPORT_INTERVAL , 200 , TimeUnit .MILLISECONDS );
129+ conf .setTimeDuration (OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL , 500 , TimeUnit . MILLISECONDS );
130+ conf . set ( ReconServerConfigKeys . OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY , "5s" );
124131
125132 // Enhanced SCM configuration for faster block deletion processing
126133 ScmConfig scmConfig = conf .getObject (ScmConfig .class );
@@ -129,18 +136,9 @@ public static void setup() throws Exception {
129136 conf .set (HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT , "0s" );
130137
131138 // Enhanced DataNode configuration to move pending deletion from SCM to DN faster
132- DatanodeConfiguration dnConf =
133- conf .getObject (DatanodeConfiguration .class );
134- dnConf .setBlockDeletionInterval (Duration .ofMillis (100 ));
135- // Increase block delete queue limit to allow more queued commands on DN
136- dnConf .setBlockDeleteQueueLimit (50 );
137- // Reduce the interval for delete command worker processing
138- dnConf .setBlockDeleteCommandWorkerInterval (Duration .ofMillis (100 ));
139- // Increase blocks deleted per interval to speed up deletion
140- dnConf .setBlockDeletionLimit (5000 );
139+ DatanodeConfiguration dnConf = conf .getObject (DatanodeConfiguration .class );
140+ dnConf .setBlockDeletionInterval (Duration .ofMillis (30000 ));
141141 conf .setFromObject (dnConf );
142- // Increase DN delete threads for faster parallel processing
143- conf .setInt ("ozone.datanode.block.delete.threads.max" , 10 );
144142
145143 recon = new ReconService (conf );
146144 cluster = MiniOzoneCluster .newBuilder (conf )
@@ -190,19 +188,133 @@ public void testStorageDistributionEndpoint(ReplicationConfig replicationConfig)
190188 }
191189 }
192190 waitForKeysCreated (replicationConfig );
193- Thread .sleep (10000 );
194- StringBuilder urlBuilder = new StringBuilder ();
195- urlBuilder .append (getReconWebAddress (conf ))
196- .append (STORAGE_DIST_ENDPOINT );
197- String response = TestReconEndpointUtil .makeHttpCall (conf , urlBuilder );
198- StorageCapacityDistributionResponse storageResponse =
199- MAPPER .readValue (response , StorageCapacityDistributionResponse .class );
200-
201- assertEquals (20 , storageResponse .getGlobalNamespace ().getTotalKeys ());
202- assertEquals (60 , storageResponse .getGlobalNamespace ().getTotalUsedSpace ());
203- assertEquals (0 , storageResponse .getUsedSpaceBreakDown ().getOpenKeyBytes ());
204- assertEquals (60 , storageResponse .getUsedSpaceBreakDown ().getCommittedKeyBytes ());
205- assertEquals (3 , storageResponse .getDataNodeUsage ().size ());
191+ GenericTestUtils .waitFor (this ::verifyStorageDistributionAfterKeyCreation , 1000 , 30000 );
192+ closeAllContainers ();
193+ fs .delete (dir1 , true );
194+ GenericTestUtils .waitFor (this ::verifyPendingDeletionAfterKeyDeletionOm , 1000 , 30000 );
195+ GenericTestUtils .waitFor (this ::verifyPendingDeletionAfterKeyDeletionScm , 2000 , 30000 );
196+ GenericTestUtils .waitFor (() ->
197+ Objects .requireNonNull (scm .getClientProtocolServer ().getDeletedBlockSummary ()).getTotalBlockCount () == 0 ,
198+ 1000 , 30000 );
199+ GenericTestUtils .waitFor (this ::verifyPendingDeletionAfterKeyDeletionDn , 2000 , 60000 );
200+ GenericTestUtils .waitFor (this ::verifyPendingDeletionClearsAtDn , 2000 , 60000 );
201+ cluster .getHddsDatanodes ().get (0 ).stop ();
202+ GenericTestUtils .waitFor (this ::verifyPendingDeletionAfterKeyDeletionOnDnFailure , 2000 , 60000 );
203+ }
204+
205+ private boolean verifyStorageDistributionAfterKeyCreation () {
206+ try {
207+ StringBuilder urlBuilder = new StringBuilder ();
208+ urlBuilder .append (getReconWebAddress (conf )).append (STORAGE_DIST_ENDPOINT );
209+ String response = TestReconEndpointUtil .makeHttpCall (conf , urlBuilder );
210+ StorageCapacityDistributionResponse storageResponse =
211+ MAPPER .readValue (response , StorageCapacityDistributionResponse .class );
212+
213+ assertEquals (20 , storageResponse .getGlobalNamespace ().getTotalKeys ());
214+ assertEquals (60 , storageResponse .getGlobalNamespace ().getTotalUsedSpace ());
215+ assertEquals (0 , storageResponse .getUsedSpaceBreakDown ().getOpenKeyBytes ());
216+ assertEquals (60 , storageResponse .getUsedSpaceBreakDown ().getCommittedKeyBytes ());
217+ assertEquals (3 , storageResponse .getDataNodeUsage ().size ());
218+
219+ return true ;
220+ } catch (Exception e ) {
221+ LOG .debug ("Waiting for storage distribution assertions to pass" , e );
222+ return false ;
223+ }
224+ }
225+
226+ private boolean verifyPendingDeletionAfterKeyDeletionOm () {
227+ try {
228+ syncDataFromOM ();
229+ StringBuilder urlBuilder = new StringBuilder ();
230+ urlBuilder .append (getReconWebAddress (conf )).append (PENDING_DELETION_ENDPOINT ).append ("?component=om" );
231+ String response = TestReconEndpointUtil .makeHttpCall (conf , urlBuilder );
232+ Map <String , Number > pendingDeletionMap = MAPPER .readValue (response , Map .class );
233+ assertEquals (30L , pendingDeletionMap .get ("totalSize" ).longValue ());
234+ assertEquals (30L , pendingDeletionMap .get ("pendingDirectorySize" ).longValue () +
235+ pendingDeletionMap .get ("pendingKeySize" ).longValue ());
236+ return true ;
237+ } catch (Exception e ) {
238+ LOG .debug ("Waiting for storage distribution assertions to pass" , e );
239+ return false ;
240+ }
241+ }
242+
243+ private boolean verifyPendingDeletionAfterKeyDeletionScm () {
244+ try {
245+ StringBuilder urlBuilder = new StringBuilder ();
246+ urlBuilder .append (getReconWebAddress (conf )).append (PENDING_DELETION_ENDPOINT ).append ("?component=scm" );
247+ String response = TestReconEndpointUtil .makeHttpCall (conf , urlBuilder );
248+ ScmPendingDeletion pendingDeletion = MAPPER .readValue (response , ScmPendingDeletion .class );
249+ assertEquals (30 , pendingDeletion .getTotalReplicatedBlockSize ());
250+ assertEquals (10 , pendingDeletion .getTotalBlocksize ());
251+ assertEquals (10 , pendingDeletion .getTotalBlocksCount ());
252+ return true ;
253+ } catch (Throwable e ) {
254+ LOG .debug ("Waiting for storage distribution assertions to pass" , e );
255+ return false ;
256+ }
257+ }
258+
259+ private boolean verifyPendingDeletionAfterKeyDeletionDn () {
260+ try {
261+ scm .getScmHAManager ().asSCMHADBTransactionBuffer ().flush ();
262+ StringBuilder urlBuilder = new StringBuilder ();
263+ urlBuilder .append (getReconWebAddress (conf )).append (PENDING_DELETION_ENDPOINT ).append ("?component=dn" );
264+ String response = TestReconEndpointUtil .makeHttpCall (conf , urlBuilder );
265+ DataNodeMetricsServiceResponse pendingDeletion = MAPPER .readValue (response , DataNodeMetricsServiceResponse .class );
266+ assertNotNull (pendingDeletion );
267+ assertEquals (30 , pendingDeletion .getTotalPendingDeletionSize ());
268+ assertEquals (DataNodeMetricsService .MetricCollectionStatus .FINISHED , pendingDeletion .getStatus ());
269+ assertEquals (pendingDeletion .getTotalNodesQueried (), pendingDeletion .getPendingDeletionPerDataNode ().size ());
270+ assertEquals (0 , pendingDeletion .getTotalNodeQueryFailures ());
271+ pendingDeletion .getPendingDeletionPerDataNode ().forEach (dn -> {
272+ assertEquals (10 , dn .getPendingBlockSize ());
273+ });
274+ return true ;
275+ } catch (Throwable e ) {
276+ LOG .debug ("Waiting for storage distribution assertions to pass" , e );
277+ return false ;
278+ }
279+ }
280+
281+ private boolean verifyPendingDeletionClearsAtDn () {
282+ try {
283+ scm .getScmHAManager ().asSCMHADBTransactionBuffer ().flush ();
284+ StringBuilder urlBuilder = new StringBuilder ();
285+ urlBuilder .append (getReconWebAddress (conf )).append (PENDING_DELETION_ENDPOINT ).append ("?component=dn" );
286+ String response = TestReconEndpointUtil .makeHttpCall (conf , urlBuilder );
287+ DataNodeMetricsServiceResponse pendingDeletion = MAPPER .readValue (response , DataNodeMetricsServiceResponse .class );
288+ assertNotNull (pendingDeletion );
289+ assertEquals (0 , pendingDeletion .getTotalPendingDeletionSize ());
290+ assertEquals (DataNodeMetricsService .MetricCollectionStatus .FINISHED , pendingDeletion .getStatus ());
291+ assertEquals (pendingDeletion .getTotalNodesQueried (), pendingDeletion .getPendingDeletionPerDataNode ().size ());
292+ assertEquals (0 , pendingDeletion .getTotalNodeQueryFailures ());
293+ pendingDeletion .getPendingDeletionPerDataNode ().forEach (dn -> {
294+ assertEquals (0 , dn .getPendingBlockSize ());
295+ });
296+ return true ;
297+ } catch (Throwable e ) {
298+ LOG .debug ("Waiting for storage distribution assertions to pass" , e );
299+ return false ;
300+ }
301+ }
302+
303+ private boolean verifyPendingDeletionAfterKeyDeletionOnDnFailure () {
304+ try {
305+ StringBuilder urlBuilder = new StringBuilder ();
306+ urlBuilder .append (getReconWebAddress (conf )).append (PENDING_DELETION_ENDPOINT ).append ("?component=dn" );
307+ String response = TestReconEndpointUtil .makeHttpCall (conf , urlBuilder );
308+ DataNodeMetricsServiceResponse pendingDeletion = MAPPER .readValue (response , DataNodeMetricsServiceResponse .class );
309+ assertNotNull (pendingDeletion );
310+ assertEquals (1 , pendingDeletion .getTotalNodeQueryFailures ());
311+ assertTrue (pendingDeletion .getPendingDeletionPerDataNode ()
312+ .stream ()
313+ .anyMatch (dn -> dn .getPendingBlockSize () == -1 ));
314+ return true ;
315+ } catch (Throwable e ) {
316+ return false ;
317+ }
206318 }
207319
208320 private void verifyBlocksCreated (
@@ -286,4 +398,12 @@ public static void tear() {
286398 cluster .shutdown ();
287399 }
288400 }
401+
402+ private static void closeAllContainers () {
403+ for (ContainerInfo container :
404+ scm .getContainerManager ().getContainers ()) {
405+ scm .getEventQueue ().fireEvent (SCMEvents .CLOSE_CONTAINER ,
406+ container .containerID ());
407+ }
408+ }
289409}
0 commit comments