Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4415,6 +4415,7 @@ public CompletableFuture<ManagedLedgerInternalStats> getManagedLedgerInternalSta
info.entries = li.getEntries();
info.size = li.getSize();
info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
info.bookkeeperDeleted = li.hasOffloadContext() && li.getOffloadContext().getBookkeeperDeleted();
if (includeLedgerMetadata) {
// lookup metadata from the hashmap which contains completed async operations
info.metadata = ledgerMetadataFutures.get(li.getLedgerId()).getNow(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2549,6 +2549,8 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
info.entries = li.getEntries();
info.size = li.getSize();
info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
info.bookkeeperDeleted =
li.hasOffloadContext() && li.getOffloadContext().getBookkeeperDeleted();
stats.ledgers.add(info);
if (includeLedgerMetadata) {
futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public static class LedgerInfo {
public boolean offloaded;
public String metadata;
public boolean underReplicated;
public boolean bookkeeperDeleted;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better add a compatibility test to cover the newly added field. Refer to: #21521 (comment)

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.backwardscompatibility;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.offload.TestBaseOffload;
import org.testng.annotations.Test;

@Slf4j
public class TestFileSystemOffload2 extends TestBaseOffload {

@Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeViaCLI(Supplier<String> serviceUrl, Supplier<String> adminUrl) throws Exception {
super.testPublishOffloadAndConsumeViaCLI(serviceUrl.get(), adminUrl.get());
}

@Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeViaThreshold(Supplier<String> serviceUrl, Supplier<String> adminUrl) throws Exception {
super.testPublishOffloadAndConsumeViaThreshold(serviceUrl.get(), adminUrl.get());
}

@Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeDeletionLag(Supplier<String> serviceUrl, Supplier<String> adminUrl) throws Exception {
super.testPublishOffloadAndConsumeDeletionLag(serviceUrl.get(), adminUrl.get());
}

@Override
protected Map<String, String> getEnv() {
Map<String, String> result = new HashMap<>();
result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger()));
result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
result.put("managedLedgerOffloadDriver", "filesystem");
result.put("fileSystemURI", "file:///tmp");

return result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.backwardscompatibility;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.offload.TestBaseOffload;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

public class TestFileSystemOffloadWithClient3_0 extends TestBaseOffload {
@Override
protected Map<String, String> getEnv() {
Map<String, String> result = new HashMap<>();
result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger()));
result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
result.put("managedLedgerOffloadDriver", "filesystem");
result.put("fileSystemURI", "file:///tmp");

return result;
}

@Override
protected void beforeStartCluster() throws Exception {
super.beforeStartCluster();
pulsarCluster.getProxy().setDockerImageName(PulsarContainer.PULSAR_3_0_IMAGE_NAME);
}

@Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeDeletionLag(Supplier<String> serviceUrl, Supplier<String> adminUrl)
throws Exception {
final String tenant = "offload-test-deletion-lag-" + randomName(4);
final String namespace = tenant + "/ns1";
final String topic = "persistent://" + namespace + "/topic1";

pulsarCluster.runAdminCommandOnAnyBroker("tenants",
"create", "--allowed-clusters", pulsarCluster.getClusterName(),
"--admin-roles", "offload-admin", tenant);

pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
"create", "--clusters", pulsarCluster.getClusterName(), namespace);

// set threshold to offload runs immediately after role
pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
"set-offload-threshold", "--size", "0", namespace);

String output = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces", "get-offload-deletion-lag", namespace).getStdout();
Assert.assertTrue(output.contains("Unset for namespace"));

PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl.get()).build();

long offloadedLedger = writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic);
// give it up to 5 seconds to delete, it shouldn't
// so we wait this every time
Thread.sleep(5000);
Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));

long finalOffloadedLedger1 = offloadedLedger;
ManagedLedgerInternalStats.LedgerInfo offloadedLedgerInfo =
admin.topics().getInternalStats(topic).ledgers.stream()
.filter((x) -> x.ledgerId == finalOffloadedLedger1).findFirst().get();
Assert.assertTrue(offloadedLedgerInfo.offloaded);
Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted);

pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace,
"--lag", "0m");
output = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces", "get-offload-deletion-lag", namespace).getStdout();
Assert.assertTrue(output.contains("0 minute(s)"));

offloadedLedger = writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic);
// wait up to 10 seconds for ledger to be deleted
for (int i = 0; i < 10 && ledgerExistsInBookKeeper(offloadedLedger); i++) {
writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic);
Thread.sleep(1000);
}
Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger));

long finalOffloadedLedger2 = offloadedLedger;
offloadedLedgerInfo = admin.topics().getInternalStats(topic).ledgers.stream()
.filter((x) -> x.ledgerId == finalOffloadedLedger2).findFirst().get();
Assert.assertTrue(offloadedLedgerInfo.offloaded);

// old version (3.0.0) server, new version client
// the new field `bookkeeperDeleted` should always be false
Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted);

pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "clear-offload-deletion-lag", namespace);

Thread.sleep(5); // wait 5 seconds to allow broker to see update

output = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces", "get-offload-deletion-lag", namespace).getStdout();
Assert.assertTrue(output.contains("Unset for namespace"));

offloadedLedger = writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic);

// give it up to 5 seconds to delete, it shouldn't
// so we wait this every time
Thread.sleep(5000);
Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));

long finalOffloadedLedger3 = offloadedLedger;
offloadedLedgerInfo = admin.topics().getInternalStats(topic).ledgers.stream()
.filter((x) -> x.ledgerId == finalOffloadedLedger3).findFirst().get();
Assert.assertTrue(offloadedLedgerInfo.offloaded);
Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted);

admin.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.backwardscompatibility;

import static org.apache.pulsar.tests.integration.containers.PulsarContainer.CS_PORT;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.PULSAR_3_0_IMAGE_NAME;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.tests.integration.containers.CSContainer;
import org.apache.pulsar.tests.integration.containers.ToolsetContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.offload.TestBaseOffload;
import org.testng.Assert;
import org.testng.annotations.Test;

/**
* Test new version server and old version client.
*/
public class TestFileSystemOffloadWithServer3_0 extends TestBaseOffload {

private ToolsetContainer toolsetContainer;

@Override
protected Map<String, String> getEnv() {
Map<String, String> result = new HashMap<>();
result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(getNumEntriesPerLedger()));
result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0");
result.put("managedLedgerOffloadDriver", "filesystem");
result.put("fileSystemURI", "file:///tmp");

return result;
}

@Override
protected void beforeStartCluster() throws Exception {
super.beforeStartCluster();

String clusterName = this.getPulsarCluster().getSpec().clusterName();

toolsetContainer = new ToolsetContainer(clusterName, PULSAR_3_0_IMAGE_NAME)
.withEnv("metadataStoreUrl", ZKContainer.NAME)
.withEnv("configurationMetadataStoreUrl", CSContainer.NAME + ":" + CS_PORT)
.withEnv("clusterName", clusterName);
toolsetContainer.start();
}

@Override
public void tearDownCluster() throws Exception {
super.tearDownCluster();
if (toolsetContainer != null) {
toolsetContainer.stop();
}
}

@Test(dataProvider = "ServiceAndAdminUrls")
public void testPublishOffloadAndConsumeDeletionLag(Supplier<String> serviceUrl, Supplier<String> adminUrl)
throws Exception {
final String tenant = "offload-test-deletion-lag-" + randomName(4);
final String namespace = tenant + "/ns1";
final String topic = "persistent://" + namespace + "/topic1";

pulsarCluster.runAdminCommandOnAnyBroker("tenants",
"create", "--allowed-clusters", pulsarCluster.getClusterName(),
"--admin-roles", "offload-admin", tenant);

pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
"create", "--clusters", pulsarCluster.getClusterName(), namespace);

// set threshold to offload runs immediately after role
pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
"set-offload-threshold", "--size", "0", namespace);

String output = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces", "get-offload-deletion-lag", namespace).getStdout();
Assert.assertTrue(output.contains("Unset for namespace"));

PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl.get()).build();

long offloadedLedger = writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic);
// give it up to 5 seconds to delete, it shouldn't
// so we wait this every time
Thread.sleep(5000);
Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));

long finalOffloadedLedger1 = offloadedLedger;
ManagedLedgerInternalStats.LedgerInfo offloadedLedgerInfo =
admin.topics().getInternalStats(topic).ledgers.stream()
.filter((x) -> x.ledgerId == finalOffloadedLedger1).findFirst().get();
Assert.assertTrue(offloadedLedgerInfo.offloaded);
Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted);

pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace,
"--lag", "0m");
output = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces", "get-offload-deletion-lag", namespace).getStdout();
Assert.assertTrue(output.contains("0 minute(s)"));

offloadedLedger = writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic);
// wait up to 10 seconds for ledger to be deleted
for (int i = 0; i < 10 && ledgerExistsInBookKeeper(offloadedLedger); i++) {
writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic);
Thread.sleep(1000);
}
Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger));

long finalOffloadedLedger2 = offloadedLedger;
offloadedLedgerInfo = admin.topics().getInternalStats(topic).ledgers.stream()
.filter((x) -> x.ledgerId == finalOffloadedLedger2).findFirst().get();
Assert.assertTrue(offloadedLedgerInfo.offloaded);
Assert.assertTrue(offloadedLedgerInfo.bookkeeperDeleted);

output = toolsetContainer.runAdminCommand("topics", "stats-internal", topic).getStdout();
// old version client should not recognize `bookkeeperDeleted`
Assert.assertFalse(output.contains("bookkeeperDeleted"));
PersistentTopicInternalStats topicInternalStats =
jsonMapper().readValue(output, PersistentTopicInternalStats.class);
offloadedLedgerInfo = topicInternalStats.ledgers.stream()
.filter((x) -> x.ledgerId == finalOffloadedLedger2).findFirst().get();
Assert.assertTrue(offloadedLedgerInfo.offloaded);
// old version client should not recognize `bookkeeperDeleted`, so should be default value `False`
Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted);

pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "clear-offload-deletion-lag", namespace);

Thread.sleep(5); // wait 5 seconds to allow broker to see update

output = pulsarCluster.runAdminCommandOnAnyBroker(
"namespaces", "get-offload-deletion-lag", namespace).getStdout();
Assert.assertTrue(output.contains("Unset for namespace"));

offloadedLedger = writeAndWaitForOffload(serviceUrl.get(), adminUrl.get(), topic);

// give it up to 5 seconds to delete, it shouldn't
// so we wait this every time
Thread.sleep(5000);
Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));

long finalOffloadedLedger3 = offloadedLedger;
offloadedLedgerInfo = admin.topics().getInternalStats(topic).ledgers.stream()
.filter((x) -> x.ledgerId == finalOffloadedLedger3).findFirst().get();
Assert.assertTrue(offloadedLedgerInfo.offloaded);
Assert.assertFalse(offloadedLedgerInfo.bookkeeperDeleted);

admin.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public abstract class PulsarContainer<SelfT extends PulsarContainer<SelfT>> exte
public static final String DEFAULT_IMAGE_NAME = System.getenv().getOrDefault("PULSAR_TEST_IMAGE_NAME",
"apachepulsar/pulsar-test-latest-version:latest");
public static final String DEFAULT_HTTP_PATH = "/metrics";
public static final String PULSAR_3_0_IMAGE_NAME = "apachepulsar/pulsar:3.0.0";
public static final String PULSAR_2_5_IMAGE_NAME = "apachepulsar/pulsar:2.5.0";
public static final String PULSAR_2_4_IMAGE_NAME = "apachepulsar/pulsar:2.4.0";
public static final String PULSAR_2_3_IMAGE_NAME = "apachepulsar/pulsar:2.3.0";
Expand Down
Loading