Skip to content

Commit 3e27cc6

Browse files
authored
[fix][broker] PIP-468: Fix listener leak in DagWatchSession (#25650)
1 parent 0117c66 commit 3e27cc6

4 files changed

Lines changed: 305 additions & 47 deletions

File tree

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java

Lines changed: 97 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,26 @@ public class ScalableTopicResources extends BaseResources<ScalableTopicMetadata>
7575
private final MetadataCache<ConsumerRegistration> consumerRegistrationCache;
7676

7777
/**
78-
* Per-namespace listeners for scalable topic create / modify / delete events.
79-
* Keyed by listener so each subscriber can deregister cleanly on close. The map
80-
* is consulted from the single store-level listener registered at construction
81-
* time, eliminating the listener leak that would otherwise occur every time a
82-
* watcher session ends (the metadata store API has no
83-
* {@code unregisterListener}). Mirrors the {@link TopicResources}
84-
* pattern for {@code TopicListener}.
78+
* Per-path listeners for scalable-topic metadata events. Each listener watches a
79+
* single exact path (typically a topic record); the resources-level fan-out
80+
* dispatches notifications whose path equals the listener's registered path.
81+
* Used by {@link DagWatchSession}-style subscribers that want events for one
82+
* specific topic.
83+
*
84+
* <p>Hosted here — rather than letting each subscriber call
85+
* {@code store.registerListener} directly — because {@code MetadataStore} has no
86+
* {@code unregisterListener}: per-subscriber direct registration would leak a
87+
* listener for the broker's lifetime every time a session ends, and every
88+
* metadata notification would fan out to all stale listeners. Mirrors
89+
* {@link TopicResources} for {@code TopicListener}.
90+
*/
91+
private final Map<MetadataPathListener, String> pathListeners = new ConcurrentHashMap<>();
92+
93+
/**
94+
* Per-namespace listeners for scalable-topic create / modify / delete events.
95+
* Used by namespace-wide watchers (e.g. multi-topic consumer wrappers); the
96+
* fan-out matches direct children of the listener's namespace base path. Same
97+
* leak-avoidance rationale as {@link #pathListeners}.
8598
*/
8699
private final Map<NamespaceListener, NamespaceName> namespaceListeners =
87100
new ConcurrentHashMap<>();
@@ -90,16 +103,47 @@ public ScalableTopicResources(MetadataStore store, int operationTimeoutSec) {
90103
super(store, ScalableTopicMetadata.class, operationTimeoutSec);
91104
this.subscriptionCache = store.getMetadataCache(SubscriptionMetadata.class);
92105
this.consumerRegistrationCache = store.getMetadataCache(ConsumerRegistration.class);
93-
// Single shared metadata-store listener fans out to every registered watcher.
94-
// Per-watcher registration happens via registerNamespaceListener; close() calls
95-
// deregisterNamespaceListener so closed watchers are not on the dispatch list.
106+
// Single shared metadata-store listener fans out to both per-path and
107+
// per-namespace subscribers. Per-subscriber lifecycle goes through the
108+
// register / deregister methods below.
96109
if (store instanceof MetadataStoreExtended ext) {
97110
ext.registerListener(this::handleNotification);
98111
} else {
99112
store.registerListener(this::handleNotification);
100113
}
101114
}
102115

116+
// --- Per-path metadata listeners ---
117+
118+
/**
119+
* Listener for metadata events on a specific scalable-topic-related path. The
120+
* fan-out in {@link ScalableTopicResources} compares each notification's path
121+
* against {@link #getMetadataPath()} and dispatches on exact match.
122+
*/
123+
public interface MetadataPathListener {
124+
/** Exact path this listener is interested in (no wildcard / prefix). */
125+
String getMetadataPath();
126+
127+
/** Called for every metadata event on the listener's path. */
128+
void onNotification(Notification notification);
129+
}
130+
131+
/**
132+
* Register a per-path metadata listener. Idempotent — re-registering the same
133+
* listener just refreshes its path mapping (e.g. if the listener moved its path).
134+
*/
135+
public void registerPathListener(MetadataPathListener listener) {
136+
pathListeners.put(listener, listener.getMetadataPath());
137+
}
138+
139+
/**
140+
* Deregister a previously-registered listener. Safe to call multiple times or for
141+
* listeners that were never registered.
142+
*/
143+
public void deregisterPathListener(MetadataPathListener listener) {
144+
pathListeners.remove(listener);
145+
}
146+
103147
// --- Namespace-level scalable-topics listeners ---
104148

105149
/**
@@ -135,35 +179,53 @@ public void deregisterNamespaceListener(NamespaceListener listener) {
135179
}
136180

137181
/**
138-
* Single fan-out path: for each registered listener, emit the notification iff
139-
* its path is a direct child of the listener's namespace base path. Filters out
140-
* subtree events (subscriptions, controller lock) up front.
182+
* Single fan-out path. For each registered subscriber:
183+
* <ul>
184+
* <li>Path listener: dispatch when the notification's path equals the listener's
185+
* registered path.</li>
186+
* <li>Namespace listener: dispatch when the notification's path is a direct
187+
* child of {@code /topics/<tenant>/<ns>} (skips subtree events like
188+
* subscriptions / controller lock).</li>
189+
* </ul>
141190
*/
142191
void handleNotification(Notification notification) {
143-
if (namespaceListeners.isEmpty()) {
144-
return;
145-
}
146192
String path = notification.getPath();
147-
if (!path.startsWith(SCALABLE_TOPIC_PATH + "/")) {
148-
return;
149-
}
150-
for (Map.Entry<NamespaceListener, NamespaceName> entry : namespaceListeners.entrySet()) {
151-
String basePath = namespacePath(entry.getValue());
152-
if (!path.startsWith(basePath + "/")) {
153-
continue;
154-
}
155-
// Direct child only — strip the prefix and check there's no further '/'.
156-
String rest = path.substring(basePath.length() + 1);
157-
if (rest.indexOf('/') >= 0) {
158-
continue;
193+
194+
// Path listeners — exact match.
195+
if (!pathListeners.isEmpty()) {
196+
for (Map.Entry<MetadataPathListener, String> entry : pathListeners.entrySet()) {
197+
if (entry.getValue().equals(path)) {
198+
try {
199+
entry.getKey().onNotification(notification);
200+
} catch (Exception e) {
201+
log.warn().attr("listener", entry.getKey())
202+
.attr("path", path)
203+
.exceptionMessage(e)
204+
.log("Failed to dispatch scalable-topic path notification");
205+
}
206+
}
159207
}
160-
try {
161-
entry.getKey().onNotification(notification);
162-
} catch (Exception e) {
163-
log.warn().attr("listener", entry.getKey())
164-
.attr("path", path)
165-
.exceptionMessage(e)
166-
.log("Failed to dispatch scalable-topic notification");
208+
}
209+
210+
// Namespace listeners — direct child of /topics/<ns>.
211+
if (!namespaceListeners.isEmpty() && path.startsWith(SCALABLE_TOPIC_PATH + "/")) {
212+
for (Map.Entry<NamespaceListener, NamespaceName> entry : namespaceListeners.entrySet()) {
213+
String basePath = namespacePath(entry.getValue());
214+
if (!path.startsWith(basePath + "/")) {
215+
continue;
216+
}
217+
String rest = path.substring(basePath.length() + 1);
218+
if (rest.indexOf('/') >= 0) {
219+
continue;
220+
}
221+
try {
222+
entry.getKey().onNotification(notification);
223+
} catch (Exception e) {
224+
log.warn().attr("listener", entry.getKey())
225+
.attr("path", path)
226+
.exceptionMessage(e)
227+
.log("Failed to dispatch scalable-topic namespace notification");
228+
}
167229
}
168230
}
169231
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
* <p>The session is tied to a connection. When the connection breaks, the session dies.
4949
* The client must reinitiate a new session (possibly with another broker).
5050
*/
51-
public class DagWatchSession {
51+
public class DagWatchSession implements ScalableTopicResources.MetadataPathListener {
5252

5353
private static final Logger LOG = Logger.get(DagWatchSession.class);
5454
private final Logger log;
@@ -61,7 +61,6 @@ public class DagWatchSession {
6161
private final BrokerService brokerService;
6262

6363
private final String metadataPath;
64-
private final java.util.function.Consumer<Notification> notificationListener;
6564
private volatile boolean closed = false;
6665

6766
public DagWatchSession(long sessionId,
@@ -75,17 +74,22 @@ public DagWatchSession(long sessionId,
7574
this.resources = resources;
7675
this.brokerService = brokerService;
7776
this.metadataPath = resources.topicPath(topicName);
78-
this.notificationListener = this::onNotification;
7977
this.log = LOG.with().attr("topic", topicName).attr("sessionId", sessionId).build();
8078
}
8179

80+
@Override
81+
public String getMetadataPath() {
82+
return metadataPath;
83+
}
84+
8285
/**
8386
* Start the session: load current metadata, set up watch, and return
8487
* the initial layout response.
8588
*/
8689
public CompletableFuture<ScalableTopicLayoutResponse> start() {
87-
// Register metadata store listener for changes to this topic's metadata
88-
resources.getStore().registerListener(notificationListener);
90+
// Register through the resources-level fan-out so close() can deregister us
91+
// and we don't accumulate stale store-level listeners over time.
92+
resources.registerPathListener(this);
8993

9094
return resources.getScalableTopicMetadataAsync(topicName, true)
9195
.thenCompose(optMd -> {
@@ -98,8 +102,13 @@ public CompletableFuture<ScalableTopicLayoutResponse> start() {
98102
});
99103
}
100104

101-
// Visible for testing — invoked by the metadata-store listener registered in start().
102-
void onNotification(Notification notification) {
105+
/**
106+
* Invoked by the {@link ScalableTopicResources} fan-out for every metadata event
107+
* matching this session's topic path. The registry already path-filtered for us;
108+
* we re-check defensively so a registry-level bug can't cause a reload storm.
109+
*/
110+
@Override
111+
public void onNotification(Notification notification) {
103112
if (closed) {
104113
return;
105114
}
@@ -184,7 +193,10 @@ private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse response) {
184193

185194
public void close() {
186195
closed = true;
187-
// Listener is guarded by the closed flag; MetadataStore does not support unregister.
196+
// Drop ourselves from the resources' fan-out so the per-event dispatch skips
197+
// us — no listener leak, no per-notification dispatch tax across the broker's
198+
// lifetime.
199+
resources.deregisterPathListener(this);
188200
}
189201

190202
/**

0 commit comments

Comments
 (0)