Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.CustomLog;
Expand All @@ -35,6 +36,8 @@
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

/**
* Metadata store access for scalable topic metadata.
Expand Down Expand Up @@ -71,10 +74,98 @@ public class ScalableTopicResources extends BaseResources<ScalableTopicMetadata>
private final MetadataCache<SubscriptionMetadata> subscriptionCache;
private final MetadataCache<ConsumerRegistration> consumerRegistrationCache;

/**
* Per-namespace listeners for scalable topic create / modify / delete events.
* Keyed by listener so each subscriber can deregister cleanly on close. The map
* is consulted from the single store-level listener registered at construction
* time, eliminating the listener leak that would otherwise occur every time a
* watcher session ends (the metadata store API has no
* {@code unregisterListener}). Mirrors the {@link TopicResources}
* pattern for {@code TopicListener}.
*/
private final Map<NamespaceListener, NamespaceName> namespaceListeners =
new ConcurrentHashMap<>();

public ScalableTopicResources(MetadataStore store, int operationTimeoutSec) {
super(store, ScalableTopicMetadata.class, operationTimeoutSec);
this.subscriptionCache = store.getMetadataCache(SubscriptionMetadata.class);
this.consumerRegistrationCache = store.getMetadataCache(ConsumerRegistration.class);
// Single shared metadata-store listener fans out to every registered watcher.
// Per-watcher registration happens via registerNamespaceListener; close() calls
// deregisterNamespaceListener so closed watchers are not on the dispatch list.
if (store instanceof MetadataStoreExtended ext) {
ext.registerListener(this::handleNotification);
} else {
store.registerListener(this::handleNotification);
}
}

// --- Namespace-level scalable-topics listeners ---

/**
* Listener for scalable-topic create / modify / delete events under a single
* namespace. The fan-out in {@link ScalableTopicResources} filters notifications
* to the listener's namespace and to direct topic records (skipping subtree paths
* like {@code <topic>/subscriptions/...} or {@code <topic>/controller}).
*/
public interface NamespaceListener {
/** Namespace this listener is scoped to. */
NamespaceName getNamespaceName();

/** Called for every metadata event affecting a topic record in the namespace. */
void onNotification(Notification notification);
}

/**
* Register a per-namespace listener. The listener will receive every
* Created / Modified / Deleted event whose path is a direct child of
* {@code /topics/<tenant>/<ns>}. Idempotent — re-registering the same listener
* just updates the namespace mapping.
*/
public void registerNamespaceListener(NamespaceListener listener) {
namespaceListeners.put(listener, listener.getNamespaceName());
}

/**
* Deregister a previously-registered namespace listener. Safe to call multiple
* times or for listeners that were never registered.
*/
public void deregisterNamespaceListener(NamespaceListener listener) {
namespaceListeners.remove(listener);
}

/**
* Single fan-out path: for each registered listener, emit the notification iff
* its path is a direct child of the listener's namespace base path. Filters out
* subtree events (subscriptions, controller lock) up front.
*/
void handleNotification(Notification notification) {
if (namespaceListeners.isEmpty()) {
return;
}
String path = notification.getPath();
if (!path.startsWith(SCALABLE_TOPIC_PATH + "/")) {
return;
}
for (Map.Entry<NamespaceListener, NamespaceName> entry : namespaceListeners.entrySet()) {
String basePath = namespacePath(entry.getValue());
if (!path.startsWith(basePath + "/")) {
continue;
}
// Direct child only — strip the prefix and check there's no further '/'.
String rest = path.substring(basePath.length() + 1);
if (rest.indexOf('/') >= 0) {
continue;
}
try {
entry.getKey().onNotification(notification);
} catch (Exception e) {
log.warn().attr("listener", entry.getKey())
.attr("path", path)
.exceptionMessage(e)
.log("Failed to dispatch scalable-topic notification");
}
}
}

public CompletableFuture<Void> createScalableTopicAsync(TopicName tn, ScalableTopicMetadata metadata) {
Expand Down Expand Up @@ -279,6 +370,15 @@ public String topicPath(TopicName tn) {
return joinPath(SCALABLE_TOPIC_PATH, tn.getNamespace(), tn.getEncodedLocalName());
}

/**
* Path under which all scalable topic records for a namespace live as direct
* children. Used by namespace-wide watchers as the prefix to filter metadata
* notifications down to events that touch a topic record.
*/
public String namespacePath(NamespaceName ns) {
return joinPath(SCALABLE_TOPIC_PATH, ns.toString());
}

public String subscriptionPath(TopicName tn, String subscription) {
return joinPath(topicPath(tn), SUBSCRIPTIONS_SEGMENT, subscription);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,17 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
});
dagWatchSessions.clear();

// Same for namespace-wide scalable-topic watchers.
scalableTopicsWatchers.values().forEach(session -> {
try {
session.close();
} catch (Exception e) {
log.warn().exceptionMessage(e)
.log("Error closing scalable-topics watcher on connection close");
}
});
scalableTopicsWatchers.clear();

// Notify the scalable-topic controller that this connection's scalable consumers
// have dropped. The controller marks them disconnected and starts the grace-period
// timer; if they reconnect in time, their assignment is preserved.
Expand Down Expand Up @@ -832,6 +843,113 @@ protected void handleCommandScalableTopicLookup(
});
}

// --- Scalable topics namespace watcher ---

private final java.util.concurrent.ConcurrentHashMap<Long,
org.apache.pulsar.broker.service.scalable.ScalableTopicsWatcherSession>
scalableTopicsWatchers = new java.util.concurrent.ConcurrentHashMap<>();

@Override
protected void handleCommandWatchScalableTopics(
org.apache.pulsar.common.api.proto.CommandWatchScalableTopics cmd) {
checkArgument(state == State.Connected);

final long watchId = cmd.getWatchId();
final String namespaceStr = cmd.getNamespace();
log.debug().attr("namespace", namespaceStr).attr("watchId", watchId)
.log("Received WatchScalableTopics");

if (!scalableTopicsEnabled) {
ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
ServerError.NotAllowedError, "Scalable topics are disabled on this broker"));
return;
}

final NamespaceName namespaceName;
try {
namespaceName = NamespaceName.get(namespaceStr);
} catch (Exception e) {
log.warn().attr("namespace", namespaceStr).log("Invalid namespace in WatchScalableTopics");
ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
ServerError.InvalidTopicName, "Invalid namespace: " + namespaceStr));
return;
}

final java.util.Map<String, String> propertyFilters = new java.util.HashMap<>();
for (int i = 0; i < cmd.getPropertyFiltersCount(); i++) {
var kv = cmd.getPropertyFilterAt(i);
propertyFilters.put(kv.getKey(), kv.getValue());
}
final String clientHash = cmd.hasCurrentHash() ? cmd.getCurrentHash() : null;

if (!this.service.getPulsar().isRunning()) {
log.warn("WatchScalableTopics rejected: broker not ready");
ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
ServerError.ServiceNotReady, "Broker not ready"));
return;
}

org.apache.pulsar.broker.resources.ScalableTopicResources resources =
service.getPulsar().getPulsarResources().getScalableTopicResources();
if (resources == null) {
log.warn("WatchScalableTopics rejected: scalable topic resources not available");
ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
ServerError.ServiceNotReady, "Scalable topic resources not available"));
return;
}

isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS)
.thenAccept(isAuthorized -> {
if (!isAuthorized) {
final String msg = "Client is not authorized to WatchScalableTopics";
log.warn().attr("principal", getPrincipal()).attr("namespace", namespaceName)
.log(msg);
ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
ServerError.AuthorizationError, msg));
return;
}
var session = new org.apache.pulsar.broker.service.scalable
.ScalableTopicsWatcherSession(watchId, namespaceName, propertyFilters,
clientHash, this, resources, service.getPulsar().getExecutor());
scalableTopicsWatchers.put(watchId, session);

session.start().exceptionally(ex -> {
Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
log.warn().attr("namespace", namespaceName).exception(cause)
.log("WatchScalableTopics failed");
scalableTopicsWatchers.remove(watchId);
session.close();
ctx.executor().execute(() -> ctx.writeAndFlush(
Commands.newWatchScalableTopicsError(watchId,
ServerError.UnknownError, cause.getMessage())));
return null;
});
})
.exceptionally(ex -> {
logNamespaceNameAuthException(remoteAddress, "watch-scalable-topics",
getPrincipal(), Optional.of(namespaceName), ex);
ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
ServerError.AuthorizationError,
"Exception occurred while authorizing WatchScalableTopics"));
return null;
});
}

@Override
protected void handleCommandWatchScalableTopicsClose(
org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsClose cmd) {
// Same idempotent-close semantics as DAG watch / consumer close: per-cnx
// session, originating subscribe was authorized at create time, no per-call
// authz needed. Unknown watchId is a no-op.
checkArgument(state == State.Connected);
long watchId = cmd.getWatchId();
log.debug().attr("watchId", watchId).log("Received WatchScalableTopicsClose");
var session = scalableTopicsWatchers.remove(watchId);
if (session != null) {
session.close();
}
}

@Override
protected void handleCommandScalableTopicClose(
CommandScalableTopicClose commandScalableTopicClose) {
Expand Down
Loading
Loading