Skip to content

Commit 8652efa

Browse files
[feat][broker] PIP-469: Legacy-aware topic policies backend routing and metadata-store topic policies (apache#25707)
1 parent dd94626 commit 8652efa

14 files changed

Lines changed: 852 additions & 46 deletions

pip/pip-469.md

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,15 @@ Broker startup validates both backends:
105105
- `SystemTopicBasedTopicPoliciesService` must be instantiable.
106106
- The configured `topicPoliciesServiceClassName` must be instantiable.
107107

108-
If either backend cannot be instantiated or started, broker startup fails. There is no per-request fallback from one
109-
backend to another.
108+
`LegacyAwareTopicPoliciesService#start` starts only the configured backend. It intentionally does not call
109+
`SystemTopicBasedTopicPoliciesService#start`, because that start path registers a namespace-bundle ownership listener
110+
whose only purpose is to eagerly create a reader on `<namespace>/__change_events` when a namespace bundle is loaded.
111+
Under legacy-aware routing, that eager optimization would be counterproductive because it can create readers for
112+
namespaces that do not have topic policies in `__change_events`. For legacy namespaces, the system-topic reader and
113+
policy cache are initialized lazily by the routed system-topic backend operations.
114+
115+
If either backend cannot be instantiated, or if the configured backend cannot be started, broker startup fails. There is
116+
no per-request fallback from one backend to another.
110117

111118
### Namespace-scoped service routing
112119

@@ -118,6 +125,10 @@ backend to another.
118125
the system-topic backend when the system topic exists.
119126
- Routing the same operations to the configured backend when the system topic does not exist.
120127

128+
Listener registration is routed through `TopicPoliciesService#registerListenerAsync`. This lets the wrapper resolve the
129+
namespace backend before registering the listener, and the listener is registered only on the selected backend instead
130+
of being registered on both backends.
131+
121132
The system-topic existence check can be cached per namespace in memory, but the routing rule is defined by actual topic
122133
existence rather than by new namespace metadata.
123134

@@ -137,9 +148,13 @@ meaning the system-topic-backed topic-policies state is gone.
137148

138149
- Topic names are normalized to the partitioned topic name, so all partitions share the same topic-policies record.
139150
- Global policies are stored in the configuration metadata store path:
140-
`/admin/topic-policies/{tenant}/{namespace}/{domain}/{encodedTopic}`.
151+
`/admin/topic-policies/global/{tenant}/{namespace}/{domain}/{encodedTopic}`.
141152
- Local policies are stored in the local metadata store path:
142-
`/admin/local-policies/topic-policies/{tenant}/{namespace}/{domain}/{encodedTopic}`.
153+
`/admin/topic-policies/local/{tenant}/{namespace}/{domain}/{encodedTopic}`.
154+
155+
To avoid possible conflicts like the listener registered on the `/admin/local-policies` path from
156+
`BrokerService#handleMetadataChanges`, these two paths share the same root path `/admin/topic-policies`, which is not
157+
used by any other component.
143158

144159
Each node stores a serialized `TopicPolicies` document. The backend writes and reads the two scopes independently:
145160

@@ -159,6 +174,11 @@ managed-ledger metadata updates.
159174

160175
### Listener behavior
161176

177+
`TopicPoliciesService` adds `registerListenerAsync(TopicName, TopicPolicyListener)` for listener registration. The
178+
existing synchronous `registerListener(TopicName, TopicPolicyListener)` method is retained as a deprecated compatibility
179+
hook for existing custom implementations, and the default async method delegates to it. Implementations that need async
180+
routing or initialization, such as `LegacyAwareTopicPoliciesService`, override `registerListenerAsync` directly.
181+
162182
The backend registers watchers on both metadata stores:
163183

164184
- A change on the local path re-reads the local node and notifies listeners with the latest local `TopicPolicies` or
@@ -173,6 +193,11 @@ append-only replay log; it relies on metadata-store notifications and read-after
173193

174194
### Public API
175195

196+
The `TopicPoliciesService` extension point gains a default
197+
`CompletableFuture<Boolean> registerListenerAsync(TopicName, TopicPolicyListener)` method. Existing implementations
198+
remain compatible because `registerListener(TopicName, TopicPolicyListener)` is retained and used by the default async
199+
implementation.
200+
176201
No new namespace policy field is introduced.
177202

178203
No new namespace admin REST endpoint or Java admin client method is introduced.
@@ -221,6 +246,10 @@ This upgrade rule is intentionally conservative:
221246
This means some namespaces with an empty but already-created `__change_events` topic may continue using the
222247
system-topic backend. That is acceptable because it avoids missing legacy state.
223248

249+
Existing custom `TopicPoliciesService` implementations that only implement the synchronous `registerListener` method
250+
continue to work through the default `registerListenerAsync` bridge. Implementations can override
251+
`registerListenerAsync` when registration itself needs asynchronous backend resolution or initialization.
252+
224253
## Downgrade / Rollback
225254

226255
Rolling back to a broker version that does not understand legacy-aware routing returns topic-policies backend

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1750,8 +1750,16 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
17501750

17511751
@FieldContext(
17521752
category = CATEGORY_SERVER,
1753-
doc = "The class name of the topic policies service. The default config only takes affect when the "
1754-
+ "systemTopicEnable config is true"
1753+
doc = """
1754+
The class name of the topic policies service. There are 2 built-in implementations:
1755+
1. "org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService" (default)
1756+
It stores a topic's policies in the `__change_events` topic. If `systemTopicEnabled` is false,
1757+
the topic policies will just be disabled
1758+
2. "org.apache.pulsar.broker.service.MetadataStoreTopicPoliciesService"
1759+
It stores a topic's policies in the metadata store. If `systemTopicEnabled` is true and the
1760+
topic's namespace has a `__change_events` topic, the policies will still be stored in the
1761+
`__change_events` topic for backward compatibility.
1762+
"""
17551763
)
17561764
private String topicPoliciesServiceClassName =
17571765
"org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService";

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
import org.apache.pulsar.broker.rest.Topics;
108108
import org.apache.pulsar.broker.service.BrokerService;
109109
import org.apache.pulsar.broker.service.HealthChecker;
110+
import org.apache.pulsar.broker.service.LegacyAwareTopicPoliciesService;
110111
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
111112
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
112113
import org.apache.pulsar.broker.service.Topic;
@@ -2293,8 +2294,16 @@ private TopicPoliciesService initTopicPoliciesService() throws Exception {
22932294
return TopicPoliciesService.DISABLED;
22942295
}
22952296
}
2296-
return (TopicPoliciesService) Reflections.createInstance(className,
2297+
final var configuredService = (TopicPoliciesService) Reflections.createInstance(className,
22972298
Thread.currentThread().getContextClassLoader());
2299+
if (!config.isSystemTopicEnabled()) {
2300+
log.info()
2301+
.attr("className", className)
2302+
.log("System topic is disabled, using configured topic policies service without legacy routing");
2303+
return configuredService;
2304+
}
2305+
return new LegacyAwareTopicPoliciesService(this, new SystemTopicBasedTopicPoliciesService(this),
2306+
configuredService);
22982307
}
22992308

23002309
/**

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ protected boolean isProducersExceeded(boolean isRemote) {
559559

560560
protected void registerTopicPolicyListener() {
561561
brokerService.getPulsar().getTopicPoliciesService()
562-
.registerListener(TopicName.getPartitionedTopicName(topic), this);
562+
.registerListenerAsync(TopicName.getPartitionedTopicName(topic), this);
563563
}
564564

565565
protected void unregisterTopicPolicyListener() {
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
22+
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
23+
import com.github.benmanes.caffeine.cache.Caffeine;
24+
import com.google.common.annotations.VisibleForTesting;
25+
import java.time.Duration;
26+
import java.util.Optional;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.Executor;
29+
import java.util.function.Consumer;
30+
import lombok.CustomLog;
31+
import org.apache.pulsar.broker.PulsarService;
32+
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
33+
import org.apache.pulsar.common.events.EventType;
34+
import org.apache.pulsar.common.naming.NamespaceName;
35+
import org.apache.pulsar.common.naming.TopicName;
36+
import org.apache.pulsar.common.policies.data.TopicPolicies;
37+
import org.jspecify.annotations.NonNull;
38+
39+
/**
40+
* Routes topic policy operations to the legacy system-topic backend when a namespace already has
41+
* a topic-policy {@code __change_events} system topic, and otherwise to the configured backend.
42+
*/
43+
@CustomLog
44+
public class LegacyAwareTopicPoliciesService implements TopicPoliciesService {
45+
46+
private final AsyncLoadingCache<NamespaceName, Boolean> isLegacyNamespace;
47+
@VisibleForTesting
48+
final SystemTopicBasedTopicPoliciesService systemTopicService;
49+
private final TopicPoliciesService configuredService;
50+
51+
public LegacyAwareTopicPoliciesService(PulsarService pulsar,
52+
SystemTopicBasedTopicPoliciesService systemTopicService,
53+
TopicPoliciesService configuredService) {
54+
// Generally, we only need to check if the __change_events topic exists once because the __change_events topic
55+
// should only be created by broker before the upgrade, where `SystemTopicBasedTopicPoliciesService` is
56+
// configured as the topic policies service.
57+
this.isLegacyNamespace = Caffeine.newBuilder().expireAfterWrite(Duration.ofHours(1))
58+
.buildAsync(new AsyncCacheLoader<>() {
59+
@NonNull
60+
@Override
61+
public CompletableFuture<? extends Boolean> asyncLoad(NamespaceName key,
62+
@NonNull Executor executor) {
63+
return NamespaceEventsSystemTopicFactory.checkSystemTopicExists(key, EventType.TOPIC_POLICY,
64+
pulsar);
65+
}
66+
});
67+
this.systemTopicService = systemTopicService;
68+
this.configuredService = configuredService;
69+
if (configuredService instanceof SystemTopicBasedTopicPoliciesService) {
70+
throw new IllegalArgumentException(
71+
"configuredService should not be an instance of SystemTopicBasedTopicPoliciesService");
72+
}
73+
}
74+
75+
@Override
76+
public void start(PulsarService pulsarService) {
77+
// We should not call `systemTopicService.start()`, which just registers a namespace bundle listener to create
78+
// a reader on `<namespace>/__change_events` when the namespace's bundle is loaded firstly. It's just an
79+
// optimization to create the reader before loading any topic. However, it could create a reader on a namespace
80+
// that does not even have the __change_events topic.
81+
configuredService.start(pulsarService);
82+
}
83+
84+
@Override
85+
public void close() throws Exception {
86+
try {
87+
configuredService.close();
88+
} finally {
89+
systemTopicService.close();
90+
}
91+
}
92+
93+
@Override
94+
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicName topicName, GetType type) {
95+
return resolveService(topicName.getNamespaceObject())
96+
.thenCompose(service -> service.getTopicPoliciesAsync(topicName, type));
97+
}
98+
99+
@Override
100+
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, boolean isGlobalPolicy,
101+
boolean skipUpdateWhenTopicPolicyDoesntExist,
102+
Consumer<TopicPolicies> policyUpdater) {
103+
return resolveService(topicName.getNamespaceObject())
104+
.thenCompose(service -> service.updateTopicPoliciesAsync(topicName, isGlobalPolicy,
105+
skipUpdateWhenTopicPolicyDoesntExist, policyUpdater));
106+
}
107+
108+
@Override
109+
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
110+
return resolveService(topicName.getNamespaceObject())
111+
.thenCompose(service -> service.deleteTopicPoliciesAsync(topicName));
112+
}
113+
114+
@Override
115+
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName,
116+
boolean keepGlobalPoliciesAfterDeleting) {
117+
return resolveService(topicName.getNamespaceObject())
118+
.thenCompose(service -> service.deleteTopicPoliciesAsync(topicName,
119+
keepGlobalPoliciesAfterDeleting));
120+
}
121+
122+
@Override
123+
public CompletableFuture<Boolean> registerListenerAsync(TopicName topicName, TopicPolicyListener listener) {
124+
return resolveService(topicName.getNamespaceObject())
125+
.thenCompose(service -> service.registerListenerAsync(topicName, listener));
126+
}
127+
128+
@Override
129+
public boolean registerListener(TopicName topicName, TopicPolicyListener listener) {
130+
throw new RuntimeException("should not be called");
131+
}
132+
133+
@Override
134+
public void unregisterListener(TopicName topicName, TopicPolicyListener listener) {
135+
configuredService.unregisterListener(topicName, listener);
136+
systemTopicService.unregisterListener(topicName, listener);
137+
}
138+
139+
@VisibleForTesting
140+
CompletableFuture<TopicPoliciesService> resolveService(NamespaceName namespace) {
141+
return isLegacyNamespace.get(namespace)
142+
.thenApply(isLegacy -> isLegacy ? systemTopicService : configuredService);
143+
}
144+
}

0 commit comments

Comments
 (0)