Skip to content

Commit 8306462

Browse files
liangyepianzhoulhotari
authored andcommitted
[improve][common] Optimize TopicName.get() to reduce lock contention on cache lookup (#25367)
### Motivation `TopicName.get()` previously used `ConcurrentHashMap.computeIfAbsent()` to populate the topic-name cache. Although `computeIfAbsent` is atomic, it holds the internal bin-lock for the entire duration of the mapping function, which includes the non-trivial `TopicName` construction (string splitting, validation, etc.). Under high-concurrency workloads where many threads simultaneously encounter the same uncached topic name, this causes unnecessary lock contention and can degrade throughput. ### Modifications Replace `computeIfAbsent` with an explicit two-step pattern: 1. **Fast path**: call `cache.get(topic)` first — a single volatile read with no locking — and return immediately on a cache hit (steady-state case). 2. **Slow path** (cache miss): construct `TopicName` *outside* the lock, then use `cache.put()` to insert. (cherry picked from commit 8c4e83d)
1 parent e428972 commit 8306462

3 files changed

Lines changed: 117 additions & 1 deletion

File tree

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.naming;
20+
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
import org.apache.pulsar.common.naming.TopicName;
24+
import org.openjdk.jmh.annotations.Benchmark;
25+
import org.openjdk.jmh.annotations.BenchmarkMode;
26+
import org.openjdk.jmh.annotations.Fork;
27+
import org.openjdk.jmh.annotations.Level;
28+
import org.openjdk.jmh.annotations.Measurement;
29+
import org.openjdk.jmh.annotations.Mode;
30+
import org.openjdk.jmh.annotations.OperationsPerInvocation;
31+
import org.openjdk.jmh.annotations.OutputTimeUnit;
32+
import org.openjdk.jmh.annotations.Scope;
33+
import org.openjdk.jmh.annotations.Setup;
34+
import org.openjdk.jmh.annotations.State;
35+
import org.openjdk.jmh.annotations.Threads;
36+
import org.openjdk.jmh.annotations.Warmup;
37+
import org.openjdk.jmh.infra.Blackhole;
38+
39+
/**
40+
* JMH benchmark for {@link TopicName#get(String)} cold-start (cache-miss) performance
41+
* under 50-thread contention.
42+
*
43+
* <p>Uses {@code Mode.SingleShotTime} with {@code @Fork(10)} to measure
44+
* the total time of a fixed batch of cold-start calls. No cache clearing is
45+
* needed — each fork is a fresh JVM with an empty cache, and the batch size
46+
* is bounded to avoid OOM.
47+
*
48+
* <p>Run with:
49+
* <pre>
50+
* ./gradlew :microbench:shadowJar
51+
* java -jar microbench/build/libs/microbench-*-benchmarks.jar TopicNameGetBenchmark
52+
* </pre>
53+
*/
54+
@Fork(10)
55+
@BenchmarkMode(Mode.SingleShotTime)
56+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
57+
@Warmup(iterations = 2)
58+
@Measurement(iterations = 5)
59+
@Threads(50)
60+
@State(Scope.Thread)
61+
public class TopicNameGetBenchmark {
62+
63+
/**
64+
* Each thread processes 10,000 unique topics per invocation.
65+
* 50 threads × 10,000 = 500,000 total entries per invocation — well within memory.
66+
*/
67+
private static final int BATCH_SIZE = 10_000;
68+
private static final AtomicInteger COUNTER = new AtomicInteger();
69+
70+
private String[] topics;
71+
72+
@Setup(Level.Invocation)
73+
public void prepare() {
74+
int base = COUNTER.getAndAdd(BATCH_SIZE);
75+
topics = new String[BATCH_SIZE];
76+
for (int i = 0; i < BATCH_SIZE; i++) {
77+
topics[i] = "persistent://public/default/topic-" + (base + i);
78+
}
79+
}
80+
81+
@Benchmark
82+
@OperationsPerInvocation(BATCH_SIZE)
83+
public void coldStartGet(Blackhole bh) {
84+
for (int i = 0; i < BATCH_SIZE; i++) {
85+
bh.consume(TopicName.get(topics[i]));
86+
}
87+
}
88+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
/**
21+
* Benchmarks for {@link org.apache.pulsar.common.naming.TopicName#get(String)} cold-start (cache-miss) performance
22+
*/
23+
package org.apache.pulsar.broker.naming;

pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,16 @@ public static TopicName get(String domain, String tenant, String cluster, String
7979
}
8080

8181
public static TopicName get(String topic) {
82+
// Fast path: already cached — single volatile read, no lock.
8283
TopicName tp = cache.get(topic);
8384
if (tp != null) {
8485
return tp;
8586
}
86-
return cache.computeIfAbsent(topic, k -> new TopicName(k));
87+
// Use get()+put() instead of computeIfAbsent() to keep construction outside the bin-lock.
88+
// Duplicate instances from racing threads are safe to discard since TopicName is immutable.
89+
TopicName newTp = new TopicName(topic);
90+
TopicName existing = cache.put(topic, newTp);
91+
return existing != null ? existing : newTp;
8792
}
8893

8994
public static TopicName getPartitionedTopicName(String topic) {

0 commit comments

Comments
 (0)