3232import org .apache .pulsar .client .api .Message ;
3333import org .apache .pulsar .client .api .Producer ;
3434import org .apache .pulsar .client .api .PulsarClient ;
35+ import org .apache .pulsar .common .policies .data .ManagedLedgerInternalStats ;
3536import org .apache .pulsar .common .policies .data .PersistentTopicInternalStats ;
3637import org .apache .pulsar .tests .integration .suites .PulsarTieredStorageTestSuite ;
3738import org .awaitility .Awaitility ;
@@ -304,12 +305,21 @@ protected void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String
304305 "namespaces" , "get-offload-deletion-lag" , namespace ).getStdout ();
305306 Assert .assertTrue (output .contains ("Unset for namespace" ));
306307
308+ PulsarAdmin admin = PulsarAdmin .builder ().serviceHttpUrl (adminUrl ).build ();
309+
307310 long offloadedLedger = writeAndWaitForOffload (serviceUrl , adminUrl , topic );
308311 // give it up to 5 seconds to delete, it shouldn't
309312 // so we wait this every time
310313 Thread .sleep (5000 );
311314 Assert .assertTrue (ledgerExistsInBookKeeper (offloadedLedger ));
312315
316+ long finalOffloadedLedger1 = offloadedLedger ;
317+ ManagedLedgerInternalStats .LedgerInfo offloadedLedgerInfo =
318+ admin .topics ().getInternalStats (topic ).ledgers .stream ()
319+ .filter ((x ) -> x .ledgerId == finalOffloadedLedger1 ).findFirst ().get ();
320+ Assert .assertTrue (offloadedLedgerInfo .offloaded );
321+ Assert .assertFalse (offloadedLedgerInfo .bookkeeperDeleted );
322+
313323 pulsarCluster .runAdminCommandOnAnyBroker ("namespaces" , "set-offload-deletion-lag" , namespace ,
314324 "--lag" , "0m" );
315325 output = pulsarCluster .runAdminCommandOnAnyBroker (
@@ -324,6 +334,12 @@ protected void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String
324334 }
325335 Assert .assertFalse (ledgerExistsInBookKeeper (offloadedLedger ));
326336
337+ long finalOffloadedLedger2 = offloadedLedger ;
338+ offloadedLedgerInfo = admin .topics ().getInternalStats (topic ).ledgers .stream ()
339+ .filter ((x ) -> x .ledgerId == finalOffloadedLedger2 ).findFirst ().get ();
340+ Assert .assertTrue (offloadedLedgerInfo .offloaded );
341+ Assert .assertTrue (offloadedLedgerInfo .bookkeeperDeleted );
342+
327343 pulsarCluster .runAdminCommandOnAnyBroker ("namespaces" , "clear-offload-deletion-lag" , namespace );
328344
329345 Thread .sleep (5 ); // wait 5 seconds to allow broker to see update
@@ -338,6 +354,14 @@ protected void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String
338354 // so we wait this every time
339355 Thread .sleep (5000 );
340356 Assert .assertTrue (ledgerExistsInBookKeeper (offloadedLedger ));
357+
358+ long finalOffloadedLedger3 = offloadedLedger ;
359+ offloadedLedgerInfo = admin .topics ().getInternalStats (topic ).ledgers .stream ()
360+ .filter ((x ) -> x .ledgerId == finalOffloadedLedger3 ).findFirst ().get ();
361+ Assert .assertTrue (offloadedLedgerInfo .offloaded );
362+ Assert .assertFalse (offloadedLedgerInfo .bookkeeperDeleted );
363+
364+ admin .close ();
341365 }
342366
343367 protected void testDeleteOffloadedTopic (String serviceUrl , String adminUrl ,
0 commit comments