Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,478 changes: 6 additions & 1,472 deletions .github/workflows/pulsar-ci.yaml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.qos;

import com.google.common.base.Splitter;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.util.Codec;

public class LegacyTopicName {

public static final String PUBLIC_TENANT = "public";
public static final String DEFAULT_NAMESPACE = "default";

public static final String PARTITIONED_TOPIC_SUFFIX = "-partition-";

private final String completeTopicName;

private final TopicDomain domain;
private final String tenant;
private final String cluster;
private final String namespacePortion;
private final String localName;

private final NamespaceName namespaceName;

private final int partitionIndex;

public LegacyTopicName(String completeTopicName) {
try {
// The topic name can be in two different forms, one is fully qualified topic name,
// the other one is short topic name
if (!completeTopicName.contains("://")) {
// The short topic name can be:
// - <topic>
// - <property>/<namespace>/<topic>
String[] parts = StringUtils.split(completeTopicName, '/');
if (parts.length == 3) {
completeTopicName = TopicDomain.persistent.name() + "://" + completeTopicName;
} else if (parts.length == 1) {
completeTopicName = TopicDomain.persistent.name() + "://"
+ PUBLIC_TENANT + "/" + DEFAULT_NAMESPACE + "/" + parts[0];
} else {
throw new IllegalArgumentException(
"Invalid short topic name '" + completeTopicName + "', it should be in the format of "
+ "<tenant>/<namespace>/<topic> or <topic>");
}
}

// The fully qualified topic name can be in two different forms:
// new: persistent://tenant/namespace/topic
// legacy: persistent://tenant/cluster/namespace/topic

List<String> parts = Splitter.on("://").limit(2).splitToList(completeTopicName);
this.domain = TopicDomain.getEnum(parts.get(0));

String rest = parts.get(1);

// The rest of the name can be in different forms:
// new: tenant/namespace/<localName>
// legacy: tenant/cluster/namespace/<localName>
// Examples of localName:
// 1. some, name, xyz
// 2. xyz-123, feeder-2


parts = Splitter.on("/").limit(4).splitToList(rest);
if (parts.size() == 3) {
// New topic name without cluster name
this.tenant = parts.get(0);
this.cluster = null;
this.namespacePortion = parts.get(1);
this.localName = parts.get(2);
this.partitionIndex = getPartitionIndex(completeTopicName);
this.namespaceName = NamespaceName.get(tenant, namespacePortion);
} else if (parts.size() == 4) {
// Legacy topic name that includes cluster name
this.tenant = parts.get(0);
this.cluster = parts.get(1);
this.namespacePortion = parts.get(2);
this.localName = parts.get(3);
this.partitionIndex = getPartitionIndex(completeTopicName);
this.namespaceName = NamespaceName.get(tenant, cluster, namespacePortion);
} else {
throw new IllegalArgumentException("Invalid topic name: " + completeTopicName);
}

if (StringUtils.isBlank(localName)) {
throw new IllegalArgumentException(String.format("Invalid topic name: %s. Topic local name must not"
+ " be blank.", completeTopicName));
}

} catch (NullPointerException e) {
throw new IllegalArgumentException("Invalid topic name: " + completeTopicName, e);
}
if (isV2()) {
this.completeTopicName = String.format("%s://%s/%s/%s",
domain, tenant, namespacePortion, localName);
} else {
this.completeTopicName = String.format("%s://%s/%s/%s/%s",
domain, tenant, cluster,
namespacePortion, localName);
}
}

public static int getPartitionIndex(String topic) {
int partitionIndex = -1;
if (topic.contains(PARTITIONED_TOPIC_SUFFIX)) {
try {
String idx = StringUtils.substringAfterLast(topic, PARTITIONED_TOPIC_SUFFIX);
partitionIndex = Integer.parseInt(idx);
if (partitionIndex < 0) {
// for the "topic-partition--1"
partitionIndex = -1;
} else if (StringUtils.length(idx) != String.valueOf(partitionIndex).length()) {
// for the "topic-partition-01"
partitionIndex = -1;
}
} catch (NumberFormatException nfe) {
// ignore exception
}
}

return partitionIndex;
}

/**
* get topic full name from managedLedgerName.
*
* @return the topic full name, format -> domain://tenant/namespace/topic
*/
public static String fromPersistenceNamingEncoding(String mlName) {
// The managedLedgerName convention is: tenant/namespace/domain/topic
// We want to transform to topic full name in the order: domain://tenant/namespace/topic
if (mlName == null || mlName.length() == 0) {
return mlName;
}
List<String> parts = Splitter.on("/").splitToList(mlName);
String tenant;
String cluster;
String namespacePortion;
String domain;
String localName;
if (parts.size() == 4) {
tenant = parts.get(0);
namespacePortion = parts.get(1);
domain = parts.get(2);
localName = Codec.decode(parts.get(3));
return String.format("%s://%s/%s/%s", domain, tenant, namespacePortion, localName);
} else if (parts.size() == 5) {
tenant = parts.get(0);
cluster = parts.get(1);
namespacePortion = parts.get(2);
domain = parts.get(3);
localName = Codec.decode(parts.get(4));
return String.format("%s://%s/%s/%s/%s", domain, tenant, cluster, namespacePortion, localName);
} else {
throw new IllegalArgumentException("Invalid managedLedger name: " + mlName);
}
}


public boolean isV2() {
return cluster == null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.qos;

import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.naming.TopicName;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

@Fork(1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Thread)
public class TopicNameBenchmark {

private static final String[] topicBases = {"test",
"tenant/ns/test",
"persistent://another-tenant/another-ns/test"
};

@Threads(1)
@Benchmark
@Warmup(time = 5, iterations = 1)
@Measurement(time = 5, iterations = 1)
public void testReadFromCache(Blackhole blackhole) {
for (int i = 0; i < 10000; i++) {
for (final var topicBase : topicBases) {
blackhole.consume(TopicName.get(topicBase + i));
}
}
}

@Threads(1)
@Benchmark
@Warmup(time = 5, iterations = 1)
@Measurement(time = 5, iterations = 1)
public void testConstruct(Blackhole blackhole) {
for (int i = 0; i < 10000; i++) {
for (final var topicBase : topicBases) {
blackhole.consume(new TopicName(topicBase + i, false));
}
}
}

@Threads(1)
@Benchmark
@Warmup(time = 5, iterations = 1)
@Measurement(time = 5, iterations = 1)
public void testConstructWithNamespaceInitialization(Blackhole blackhole) {
for (int i = 0; i < 10000; i++) {
for (final var topicBase : topicBases) {
blackhole.consume(new TopicName(topicBase + i, true));
}
}
}

@Threads(1)
@Benchmark
@Warmup(time = 5, iterations = 1)
@Measurement(time = 5, iterations = 1)
public void testLegacyConstruct(Blackhole blackhole) {
for (int i = 0; i < 10000; i++) {
for (final var topicBase : topicBases) {
blackhole.consume(new LegacyTopicName(topicBase + i));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -641,16 +641,6 @@ public void start() throws Exception {
this.updateBrokerDispatchThrottlingMaxRate();
this.startCheckReplicationPolicies();
this.startDeduplicationSnapshotMonitor();
this.startClearInvalidateTopicNameCacheTask();
}

protected void startClearInvalidateTopicNameCacheTask() {
final int maxSecondsToClearTopicNameCache = pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
inactivityMonitor.scheduleAtFixedRate(
() -> TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()),
maxSecondsToClearTopicNameCache,
maxSecondsToClearTopicNameCache,
TimeUnit.SECONDS);
}

protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) {
Expand Down
Loading
Loading