diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 698563ed7a1f2..4ead1200a34d4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -4415,6 +4415,7 @@ public CompletableFuture getManagedLedgerInternalSta info.entries = li.getEntries(); info.size = li.getSize(); info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete(); + info.bookkeeperDeleted = li.hasOffloadContext() && li.getOffloadContext().getBookkeeperDeleted(); if (includeLedgerMetadata) { // lookup metadata from the hashmap which contains completed async operations info.metadata = ledgerMetadataFutures.get(li.getLedgerId()).getNow(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 936091edce557..8f8ea7720455f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2549,6 +2549,8 @@ public CompletableFuture getInternalStats(boolean info.entries = li.getEntries(); info.size = li.getSize(); info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete(); + info.bookkeeperDeleted = + li.hasOffloadContext() && li.getOffloadContext().getBookkeeperDeleted(); stats.ledgers.add(info); if (includeLedgerMetadata) { futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ManagedLedgerInternalStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ManagedLedgerInternalStats.java index 95a45d37d9556..50db0e19d50f9 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ManagedLedgerInternalStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ManagedLedgerInternalStats.java @@ -77,6 +77,7 @@ public static class LedgerInfo { public boolean offloaded; public String metadata; public boolean underReplicated; + public boolean bookkeeperDeleted; } /** diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/TestFileSystemOffload2.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/TestFileSystemOffload2.java new file mode 100644 index 0000000000000..49ce19fdb5edb --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/TestFileSystemOffload2.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.backwardscompatibility; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.tests.integration.offload.TestBaseOffload; +import org.testng.annotations.Test; + +@Slf4j +public class TestFileSystemOffload2 extends TestBaseOffload { + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testPublishOffloadAndConsumeViaCLI(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testPublishOffloadAndConsumeViaCLI(serviceUrl.get(), adminUrl.get()); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testPublishOffloadAndConsumeViaThreshold(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testPublishOffloadAndConsumeViaThreshold(serviceUrl.get(), adminUrl.get()); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testPublishOffloadAndConsumeDeletionLag(Supplier serviceUrl, Supplier adminUrl) throws Exception { + super.testPublishOffloadAndConsumeDeletionLag(serviceUrl.get(), adminUrl.get()); + } + + @Override + protected Map getEnv() { + Map result = new HashMap<>(); + result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger())); + result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); + result.put("managedLedgerOffloadDriver", "filesystem"); + result.put("fileSystemURI", "file:///tmp"); + + return result; + } + +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/TestFileSystemOffloadWithClient3_0.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/TestFileSystemOffloadWithClient3_0.java new file mode 100644 index 0000000000000..df539bbb696de --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/TestFileSystemOffloadWithClient3_0.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.backwardscompatibility; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; +import org.apache.pulsar.tests.integration.containers.PulsarContainer; +import org.apache.pulsar.tests.integration.offload.TestBaseOffload; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +public class TestFileSystemOffloadWithClient3_0 extends TestBaseOffload { + @Override + protected Map getEnv() { + Map result = new HashMap<>(); + result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger())); + result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); + result.put("managedLedgerOffloadDriver", "filesystem"); + result.put("fileSystemURI", "file:///tmp"); + + return result; + } + + @Override + protected void beforeStartCluster() throws Exception { + super.beforeStartCluster(); + pulsarCluster.getProxy().setDockerImageName(PulsarContainer.PULSAR_3_0_IMAGE_NAME); + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testPublishOffloadAndConsumeDeletionLag(Supplier serviceUrl, Supplier adminUrl) + throws Exception { + final String tenant = "offload-test-deletion-lag-" + randomName(4); + final String namespace = tenant + "/ns1"; + final String topic = "persistent://" + namespace + "/topic1"; + + pulsarCluster.runAdminCommandOnAnyBroker("tenants", + "create", "--allowed-clusters", pulsarCluster.getClusterName(), + "--admin-roles", "offload-admin", tenant); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "create", "--clusters", pulsarCluster.getClusterName(), namespace); + + // set threshold to offload runs immediately after role + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-offload-threshold", "--size", "0", namespace); + + String output = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "get-offload-deletion-lag", namespace).getStdout(); + Assert.assertTrue(output.contains("Unset for namespace")); + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl.get()).build(); + + long offloadedLedger = writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic); + // give it up to 5 seconds to delete, it shouldn't + // so we wait this every time + Thread.sleep(5000); + Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger)); + + long finalOffloadedLedger1 = offloadedLedger; + ManagedLedgerInternalStats.LedgerInfo offloadedLedgerInfo = + admin.topics().getInternalStats(topic).ledgers.stream() + .filter((x) -> x.ledgerId == finalOffloadedLedger1).findFirst().get(); + Assert.assertTrue(offloadedLedgerInfo.offloaded); + Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace, + "--lag", "0m"); + output = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "get-offload-deletion-lag", namespace).getStdout(); + Assert.assertTrue(output.contains("0 minute(s)")); + + offloadedLedger = writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic); + // wait up to 10 seconds for ledger to be deleted + for (int i = 0; i < 10 && ledgerExistsInBookKeeper(offloadedLedger); i++) { + writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic); + Thread.sleep(1000); + } + Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger)); + + long finalOffloadedLedger2 = offloadedLedger; + offloadedLedgerInfo = admin.topics().getInternalStats(topic).ledgers.stream() + .filter((x) -> x.ledgerId == finalOffloadedLedger2).findFirst().get(); + Assert.assertTrue(offloadedLedgerInfo.offloaded); + + // old version (3.0.0) server, new version client + // the new field `bookkeeperDeleted` should always be false + Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "clear-offload-deletion-lag", namespace); + + Thread.sleep(5); // wait 5 seconds to allow broker to see update + + output = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "get-offload-deletion-lag", namespace).getStdout(); + Assert.assertTrue(output.contains("Unset for namespace")); + + offloadedLedger = writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic); + + // give it up to 5 seconds to delete, it shouldn't + // so we wait this every time + Thread.sleep(5000); + Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger)); + + long finalOffloadedLedger3 = offloadedLedger; + offloadedLedgerInfo = admin.topics().getInternalStats(topic).ledgers.stream() + .filter((x) -> x.ledgerId == finalOffloadedLedger3).findFirst().get(); + Assert.assertTrue(offloadedLedgerInfo.offloaded); + Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted); + + admin.close(); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/TestFileSystemOffloadWithServer3_0.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/TestFileSystemOffloadWithServer3_0.java new file mode 100644 index 0000000000000..1555b45693cc5 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/TestFileSystemOffloadWithServer3_0.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.backwardscompatibility; + +import static org.apache.pulsar.tests.integration.containers.PulsarContainer.CS_PORT; +import static org.apache.pulsar.tests.integration.containers.PulsarContainer.PULSAR_3_0_IMAGE_NAME; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.tests.integration.containers.CSContainer; +import org.apache.pulsar.tests.integration.containers.ToolsetContainer; +import org.apache.pulsar.tests.integration.containers.ZKContainer; +import org.apache.pulsar.tests.integration.offload.TestBaseOffload; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Test new version server and old version client. + */ +public class TestFileSystemOffloadWithServer3_0 extends TestBaseOffload { + + private ToolsetContainer toolsetContainer; + + @Override + protected Map getEnv() { + Map result = new HashMap<>(); + result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger())); + result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); + result.put("managedLedgerOffloadDriver", "filesystem"); + result.put("fileSystemURI", "file:///tmp"); + + return result; + } + + @Override + protected void beforeStartCluster() throws Exception { + super.beforeStartCluster(); + + String clusterName = this.getPulsarCluster().getSpec().clusterName(); + + toolsetContainer = new ToolsetContainer(clusterName, PULSAR_3_0_IMAGE_NAME) + .withEnv("metadataStoreUrl", ZKContainer.NAME) + .withEnv("configurationMetadataStoreUrl", CSContainer.NAME + ":" + CS_PORT) + .withEnv("clusterName", clusterName); + toolsetContainer.start(); + } + + @Override + public void tearDownCluster() throws Exception { + super.tearDownCluster(); + if (toolsetContainer != null) { + toolsetContainer.stop(); + } + } + + @Test(dataProvider = "ServiceAndAdminUrls") + public void testPublishOffloadAndConsumeDeletionLag(Supplier serviceUrl, Supplier adminUrl) + throws Exception { + final String tenant = "offload-test-deletion-lag-" + randomName(4); + final String namespace = tenant + "/ns1"; + final String topic = "persistent://" + namespace + "/topic1"; + + pulsarCluster.runAdminCommandOnAnyBroker("tenants", + "create", "--allowed-clusters", pulsarCluster.getClusterName(), + "--admin-roles", "offload-admin", tenant); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "create", "--clusters", pulsarCluster.getClusterName(), namespace); + + // set threshold to offload runs immediately after role + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", + "set-offload-threshold", "--size", "0", namespace); + + String output = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "get-offload-deletion-lag", namespace).getStdout(); + Assert.assertTrue(output.contains("Unset for namespace")); + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl.get()).build(); + + long offloadedLedger = writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic); + // give it up to 5 seconds to delete, it shouldn't + // so we wait this every time + Thread.sleep(5000); + Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger)); + + long finalOffloadedLedger1 = offloadedLedger; + ManagedLedgerInternalStats.LedgerInfo offloadedLedgerInfo = + admin.topics().getInternalStats(topic).ledgers.stream() + .filter((x) -> x.ledgerId == finalOffloadedLedger1).findFirst().get(); + Assert.assertTrue(offloadedLedgerInfo.offloaded); + Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace, + "--lag", "0m"); + output = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "get-offload-deletion-lag", namespace).getStdout(); + Assert.assertTrue(output.contains("0 minute(s)")); + + offloadedLedger = writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic); + // wait up to 10 seconds for ledger to be deleted + for (int i = 0; i < 10 && ledgerExistsInBookKeeper(offloadedLedger); i++) { + writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic); + Thread.sleep(1000); + } + Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger)); + + long finalOffloadedLedger2 = offloadedLedger; + offloadedLedgerInfo = admin.topics().getInternalStats(topic).ledgers.stream() + .filter((x) -> x.ledgerId == finalOffloadedLedger2).findFirst().get(); + Assert.assertTrue(offloadedLedgerInfo.offloaded); + Assert.assertTrue(offloadedLedgerInfo.bookkeeperDeleted); + + output = toolsetContainer.runAdminCommand("topics", "stats-internal", topic).getStdout(); + // old version client should not recognize `bookkeeperDeleted` + Assert.assertFalse(output.contains("bookkeeperDeleted")); + PersistentTopicInternalStats topicInternalStats = + jsonMapper().readValue(output, PersistentTopicInternalStats.class); + offloadedLedgerInfo = topicInternalStats.ledgers.stream() + .filter((x) -> x.ledgerId == finalOffloadedLedger2).findFirst().get(); + Assert.assertTrue(offloadedLedgerInfo.offloaded); + // old version client should not recognize `bookkeeperDeleted`, so should be default value `False` + Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted); + + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "clear-offload-deletion-lag", namespace); + + Thread.sleep(5); // wait 5 seconds to allow broker to see update + + output = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "get-offload-deletion-lag", namespace).getStdout(); + Assert.assertTrue(output.contains("Unset for namespace")); + + offloadedLedger = writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic); + + // give it up to 5 seconds to delete, it shouldn't + // so we wait this every time + Thread.sleep(5000); + Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger)); + + long finalOffloadedLedger3 = offloadedLedger; + offloadedLedgerInfo = admin.topics().getInternalStats(topic).ledgers.stream() + .filter((x) -> x.ledgerId == finalOffloadedLedger3).findFirst().get(); + Assert.assertTrue(offloadedLedgerInfo.offloaded); + Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted); + + admin.close(); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java index 77cdc1bfd28a9..585da32f985d7 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java @@ -54,6 +54,7 @@ public abstract class PulsarContainer> exte public static final String DEFAULT_IMAGE_NAME = System.getenv().getOrDefault("PULSAR_TEST_IMAGE_NAME", "apachepulsar/pulsar-test-latest-version:latest"); public static final String DEFAULT_HTTP_PATH = "/metrics"; + public static final String PULSAR_3_0_IMAGE_NAME = "apachepulsar/pulsar:3.0.0"; public static final String PULSAR_2_5_IMAGE_NAME = "apachepulsar/pulsar:2.5.0"; public static final String PULSAR_2_4_IMAGE_NAME = "apachepulsar/pulsar:2.4.0"; public static final String PULSAR_2_3_IMAGE_NAME = "apachepulsar/pulsar:2.3.0"; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ToolsetContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ToolsetContainer.java new file mode 100644 index 0000000000000..2a58e794b1043 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ToolsetContainer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.containers; + +import static org.apache.pulsar.tests.integration.containers.PulsarContainer.DEFAULT_IMAGE_NAME; +import static org.apache.pulsar.tests.integration.topologies.PulsarCluster.ADMIN_SCRIPT; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; + +/** + * A pulsar container that runs nothing. + */ +public class ToolsetContainer extends ChaosContainer { + + public static final String NAME = "toolset"; + + public ToolsetContainer(String clusterName) { + super(clusterName, DEFAULT_IMAGE_NAME); + } + + public ToolsetContainer(String clusterName, String imageName) { + super(clusterName, imageName); + } + + @Override + public String getContainerName() { + return clusterName + "-" + NAME; + } + + @Override + protected void configure() { + super.configure(); + this.withNetworkAliases(NAME) + .withCreateContainerCmdModifier(createContainerCmd -> { + createContainerCmd.withHostName(NAME); + createContainerCmd.withName(getContainerName()); + createContainerCmd.withEntrypoint("sleep", "infinity"); + }); + } + + public ContainerExecResult runAdminCommand(String... commands) throws Exception { + String[] cmds = new String[commands.length + 1]; + cmds[0] = ADMIN_SCRIPT; + System.arraycopy(commands, 0, cmds, 1, commands.length); + return this.execCmd(cmds); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java index 7d38aa77bf027..65815c1a05dbc 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestBaseOffload.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.tests.integration.suites.PulsarTieredStorageTestSuite; import org.awaitility.Awaitility; @@ -219,7 +220,7 @@ private boolean ledgerOffloaded(List le .map(l -> l.offloaded).findFirst().get(); } - private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic) + protected long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic) throws Exception { return writeAndWaitForOffload(serviceUrl, adminUrl, topic, -1); } @@ -304,12 +305,21 @@ protected void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String "namespaces", "get-offload-deletion-lag", namespace).getStdout(); Assert.assertTrue(output.contains("Unset for namespace")); + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build(); + long offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic); // give it up to 5 seconds to delete, it shouldn't // so we wait this every time Thread.sleep(5000); Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger)); + long finalOffloadedLedger1 = offloadedLedger; + ManagedLedgerInternalStats.LedgerInfo offloadedLedgerInfo = + admin.topics().getInternalStats(topic).ledgers.stream() + .filter((x) -> x.ledgerId == finalOffloadedLedger1).findFirst().get(); + Assert.assertTrue(offloadedLedgerInfo.offloaded); + Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted); + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace, "--lag", "0m"); output = pulsarCluster.runAdminCommandOnAnyBroker( @@ -324,6 +334,12 @@ protected void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String } Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger)); + long finalOffloadedLedger2 = offloadedLedger; + offloadedLedgerInfo = admin.topics().getInternalStats(topic).ledgers.stream() + .filter((x) -> x.ledgerId == finalOffloadedLedger2).findFirst().get(); + Assert.assertTrue(offloadedLedgerInfo.offloaded); + Assert.assertTrue(offloadedLedgerInfo.bookkeeperDeleted); + pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "clear-offload-deletion-lag", namespace); Thread.sleep(5); // wait 5 seconds to allow broker to see update @@ -338,6 +354,14 @@ protected void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String // so we wait this every time Thread.sleep(5000); Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger)); + + long finalOffloadedLedger3 = offloadedLedger; + offloadedLedgerInfo = admin.topics().getInternalStats(topic).ledgers.stream() + .filter((x) -> x.ledgerId == finalOffloadedLedger3).findFirst().get(); + Assert.assertTrue(offloadedLedgerInfo.offloaded); + Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted); + + admin.close(); } protected void testDeleteOffloadedTopic(String serviceUrl, String adminUrl, diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java index 6bf4061760b01..6668a01299340 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTieredStorageTestSuite.java @@ -53,7 +53,7 @@ public final void setupCluster() throws Exception { @AfterClass(alwaysRun = true) @Override - public final void tearDownCluster() throws Exception { + public void tearDownCluster() throws Exception { super.tearDownCluster(); } diff --git a/tests/integration/src/test/resources/pulsar-backwards-compatibility.xml b/tests/integration/src/test/resources/pulsar-backwards-compatibility.xml index b50af74ed17f0..87f8276b147ba 100644 --- a/tests/integration/src/test/resources/pulsar-backwards-compatibility.xml +++ b/tests/integration/src/test/resources/pulsar-backwards-compatibility.xml @@ -22,6 +22,8 @@ + +