diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java index 3e01580847641..550212a4a1932 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; import lombok.CustomLog; @@ -35,6 +36,8 @@ import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; /** * Metadata store access for scalable topic metadata. @@ -71,10 +74,98 @@ public class ScalableTopicResources extends BaseResources private final MetadataCache subscriptionCache; private final MetadataCache consumerRegistrationCache; + /** + * Per-namespace listeners for scalable topic create / modify / delete events. + * Keyed by listener so each subscriber can deregister cleanly on close. The map + * is consulted from the single store-level listener registered at construction + * time, eliminating the listener leak that would otherwise occur every time a + * watcher session ends (the metadata store API has no + * {@code unregisterListener}). Mirrors the {@link TopicResources} + * pattern for {@code TopicListener}. + */ + private final Map namespaceListeners = + new ConcurrentHashMap<>(); + public ScalableTopicResources(MetadataStore store, int operationTimeoutSec) { super(store, ScalableTopicMetadata.class, operationTimeoutSec); this.subscriptionCache = store.getMetadataCache(SubscriptionMetadata.class); this.consumerRegistrationCache = store.getMetadataCache(ConsumerRegistration.class); + // Single shared metadata-store listener fans out to every registered watcher. + // Per-watcher registration happens via registerNamespaceListener; close() calls + // deregisterNamespaceListener so closed watchers are not on the dispatch list. + if (store instanceof MetadataStoreExtended ext) { + ext.registerListener(this::handleNotification); + } else { + store.registerListener(this::handleNotification); + } + } + + // --- Namespace-level scalable-topics listeners --- + + /** + * Listener for scalable-topic create / modify / delete events under a single + * namespace. The fan-out in {@link ScalableTopicResources} filters notifications + * to the listener's namespace and to direct topic records (skipping subtree paths + * like {@code /subscriptions/...} or {@code /controller}). + */ + public interface NamespaceListener { + /** Namespace this listener is scoped to. */ + NamespaceName getNamespaceName(); + + /** Called for every metadata event affecting a topic record in the namespace. */ + void onNotification(Notification notification); + } + + /** + * Register a per-namespace listener. The listener will receive every + * Created / Modified / Deleted event whose path is a direct child of + * {@code /topics//}. Idempotent — re-registering the same listener + * just updates the namespace mapping. + */ + public void registerNamespaceListener(NamespaceListener listener) { + namespaceListeners.put(listener, listener.getNamespaceName()); + } + + /** + * Deregister a previously-registered namespace listener. Safe to call multiple + * times or for listeners that were never registered. + */ + public void deregisterNamespaceListener(NamespaceListener listener) { + namespaceListeners.remove(listener); + } + + /** + * Single fan-out path: for each registered listener, emit the notification iff + * its path is a direct child of the listener's namespace base path. Filters out + * subtree events (subscriptions, controller lock) up front. + */ + void handleNotification(Notification notification) { + if (namespaceListeners.isEmpty()) { + return; + } + String path = notification.getPath(); + if (!path.startsWith(SCALABLE_TOPIC_PATH + "/")) { + return; + } + for (Map.Entry entry : namespaceListeners.entrySet()) { + String basePath = namespacePath(entry.getValue()); + if (!path.startsWith(basePath + "/")) { + continue; + } + // Direct child only — strip the prefix and check there's no further '/'. + String rest = path.substring(basePath.length() + 1); + if (rest.indexOf('/') >= 0) { + continue; + } + try { + entry.getKey().onNotification(notification); + } catch (Exception e) { + log.warn().attr("listener", entry.getKey()) + .attr("path", path) + .exceptionMessage(e) + .log("Failed to dispatch scalable-topic notification"); + } + } } public CompletableFuture createScalableTopicAsync(TopicName tn, ScalableTopicMetadata metadata) { @@ -279,6 +370,15 @@ public String topicPath(TopicName tn) { return joinPath(SCALABLE_TOPIC_PATH, tn.getNamespace(), tn.getEncodedLocalName()); } + /** + * Path under which all scalable topic records for a namespace live as direct + * children. Used by namespace-wide watchers as the prefix to filter metadata + * notifications down to events that touch a topic record. + */ + public String namespacePath(NamespaceName ns) { + return joinPath(SCALABLE_TOPIC_PATH, ns.toString()); + } + public String subscriptionPath(TopicName tn, String subscription) { return joinPath(topicPath(tn), SUBSCRIPTIONS_SEGMENT, subscription); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2ed913561b2f7..109cf1b489bc5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -486,6 +486,17 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { }); dagWatchSessions.clear(); + // Same for namespace-wide scalable-topic watchers. + scalableTopicsWatchers.values().forEach(session -> { + try { + session.close(); + } catch (Exception e) { + log.warn().exceptionMessage(e) + .log("Error closing scalable-topics watcher on connection close"); + } + }); + scalableTopicsWatchers.clear(); + // Notify the scalable-topic controller that this connection's scalable consumers // have dropped. The controller marks them disconnected and starts the grace-period // timer; if they reconnect in time, their assignment is preserved. @@ -832,6 +843,113 @@ protected void handleCommandScalableTopicLookup( }); } + // --- Scalable topics namespace watcher --- + + private final java.util.concurrent.ConcurrentHashMap + scalableTopicsWatchers = new java.util.concurrent.ConcurrentHashMap<>(); + + @Override + protected void handleCommandWatchScalableTopics( + org.apache.pulsar.common.api.proto.CommandWatchScalableTopics cmd) { + checkArgument(state == State.Connected); + + final long watchId = cmd.getWatchId(); + final String namespaceStr = cmd.getNamespace(); + log.debug().attr("namespace", namespaceStr).attr("watchId", watchId) + .log("Received WatchScalableTopics"); + + if (!scalableTopicsEnabled) { + ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId, + ServerError.NotAllowedError, "Scalable topics are disabled on this broker")); + return; + } + + final NamespaceName namespaceName; + try { + namespaceName = NamespaceName.get(namespaceStr); + } catch (Exception e) { + log.warn().attr("namespace", namespaceStr).log("Invalid namespace in WatchScalableTopics"); + ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId, + ServerError.InvalidTopicName, "Invalid namespace: " + namespaceStr)); + return; + } + + final java.util.Map propertyFilters = new java.util.HashMap<>(); + for (int i = 0; i < cmd.getPropertyFiltersCount(); i++) { + var kv = cmd.getPropertyFilterAt(i); + propertyFilters.put(kv.getKey(), kv.getValue()); + } + final String clientHash = cmd.hasCurrentHash() ? cmd.getCurrentHash() : null; + + if (!this.service.getPulsar().isRunning()) { + log.warn("WatchScalableTopics rejected: broker not ready"); + ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId, + ServerError.ServiceNotReady, "Broker not ready")); + return; + } + + org.apache.pulsar.broker.resources.ScalableTopicResources resources = + service.getPulsar().getPulsarResources().getScalableTopicResources(); + if (resources == null) { + log.warn("WatchScalableTopics rejected: scalable topic resources not available"); + ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId, + ServerError.ServiceNotReady, "Scalable topic resources not available")); + return; + } + + isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS) + .thenAccept(isAuthorized -> { + if (!isAuthorized) { + final String msg = "Client is not authorized to WatchScalableTopics"; + log.warn().attr("principal", getPrincipal()).attr("namespace", namespaceName) + .log(msg); + ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId, + ServerError.AuthorizationError, msg)); + return; + } + var session = new org.apache.pulsar.broker.service.scalable + .ScalableTopicsWatcherSession(watchId, namespaceName, propertyFilters, + clientHash, this, resources, service.getPulsar().getExecutor()); + scalableTopicsWatchers.put(watchId, session); + + session.start().exceptionally(ex -> { + Throwable cause = ex.getCause() != null ? ex.getCause() : ex; + log.warn().attr("namespace", namespaceName).exception(cause) + .log("WatchScalableTopics failed"); + scalableTopicsWatchers.remove(watchId); + session.close(); + ctx.executor().execute(() -> ctx.writeAndFlush( + Commands.newWatchScalableTopicsError(watchId, + ServerError.UnknownError, cause.getMessage()))); + return null; + }); + }) + .exceptionally(ex -> { + logNamespaceNameAuthException(remoteAddress, "watch-scalable-topics", + getPrincipal(), Optional.of(namespaceName), ex); + ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId, + ServerError.AuthorizationError, + "Exception occurred while authorizing WatchScalableTopics")); + return null; + }); + } + + @Override + protected void handleCommandWatchScalableTopicsClose( + org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsClose cmd) { + // Same idempotent-close semantics as DAG watch / consumer close: per-cnx + // session, originating subscribe was authorized at create time, no per-call + // authz needed. Unknown watchId is a no-op. + checkArgument(state == State.Connected); + long watchId = cmd.getWatchId(); + log.debug().attr("watchId", watchId).log("Received WatchScalableTopicsClose"); + var session = scalableTopicsWatchers.remove(watchId); + if (session != null) { + session.close(); + } + } + @Override protected void handleCommandScalableTopicClose( CommandScalableTopicClose commandScalableTopicClose) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java new file mode 100644 index 0000000000000..8598170d96977 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import io.github.merlimat.slog.Logger; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import org.apache.pulsar.broker.resources.ScalableTopicMetadata; +import org.apache.pulsar.broker.resources.ScalableTopicResources; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; + +/** + * Broker-side handler for a multi-topic consumer's namespace watch session. + * + *

Watches the metadata store for scalable-topic create / modify / delete events + * under a given namespace, evaluates them against a (possibly empty) set of property + * filters, and pushes the matching set to the client. The client receives: + *

    + *
  • One {@code Snapshot} on subscribe (and on every reconnect-resync).
  • + *
  • One {@code Diff} per coalescing window when membership changes.
  • + *
+ * + *

Tied to a connection — drop the session when the channel goes away. The client + * re-opens a fresh watch on reconnect; the new session emits a fresh snapshot and + * the client reconciles locally. + * + *

Any broker can serve this role: every broker observes the same metadata events + * via the registered listener, so no coordinator is needed at the namespace level. + */ +public class ScalableTopicsWatcherSession implements ScalableTopicResources.NamespaceListener { + + private static final Logger LOG = Logger.get(ScalableTopicsWatcherSession.class); + /** + * Window over which back-to-back metadata events are batched into one Diff frame. + * Small enough to feel "live", large enough to amortise rapid bursts (e.g. test + * setups that create N topics in a tight loop). + */ + private static final Duration COALESCE_WINDOW = Duration.ofMillis(50); + + private final Logger log; + + @Getter + private final long watchId; + private final NamespaceName namespace; + private final Map propertyFilters; + /** + * Hash of the topic set the client believes it has. Optional: present on + * reconnect, absent on first subscribe. When equal to the freshly-computed + * hash on the broker, {@link #start()} skips emitting the initial snapshot — + * the client's state is already correct. + */ + private final String clientHash; + private final ServerCnx cnx; + private final ScalableTopicResources resources; + private final ScheduledExecutorService scheduler; + + /** {@code /topics//} — direct children are scalable topic records. */ + private final String basePath; + + /** + * Topics currently in the matching set. Maintained server-side so we can detect + * actual membership flips on Modified events (filter changes a topic in/out). + * Topic names are fully-qualified ({@code topic://tenant/ns/name}). + */ + private final Set currentSet = Collections.synchronizedSet(new HashSet<>()); + + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicBoolean snapshotEmitted = new AtomicBoolean(); + + // --- Coalescing state. All three fields guarded by `coalesceLock`. --- + private final Object coalesceLock = new Object(); + private final LinkedHashSet pendingAdded = new LinkedHashSet<>(); + private final LinkedHashSet pendingRemoved = new LinkedHashSet<>(); + private boolean flushScheduled = false; + + public ScalableTopicsWatcherSession(long watchId, + NamespaceName namespace, + Map propertyFilters, + String clientHash, + ServerCnx cnx, + ScalableTopicResources resources, + ScheduledExecutorService scheduler) { + this.watchId = watchId; + this.namespace = namespace; + this.propertyFilters = propertyFilters == null ? Map.of() : propertyFilters; + this.clientHash = clientHash; + this.cnx = cnx; + this.resources = resources; + this.scheduler = scheduler; + this.basePath = resources.namespacePath(namespace); + this.log = LOG.with() + .attr("namespace", namespace) + .attr("watchId", watchId) + .attr("filters", this.propertyFilters) + .build(); + } + + @Override + public NamespaceName getNamespaceName() { + return namespace; + } + + /** + * Start the watch: register on the namespace listener registry first (so events + * during snapshot computation are queued, not lost), compute the initial filtered + * set, then emit the {@code Snapshot} frame. After that, deltas flow through the + * listener. + */ + public CompletableFuture start() { + // Register BEFORE computing the initial set: any event that arrives mid-snapshot + // is captured by the listener and either (a) already in the initial set we're + // about to emit, in which case the redundant Add is a no-op on the client + // (set semantics), or (b) genuinely newer than the snapshot, in which case it + // correctly flows through as a Diff after the Snapshot frame. + resources.registerNamespaceListener(this); + + return resources.findScalableTopicsByPropertiesAsync(namespace, propertyFilters) + .thenAccept(initialTopics -> { + if (closed.get()) { + return; + } + // Replace currentSet under sync to avoid races with onNotification. + synchronized (currentSet) { + currentSet.clear(); + currentSet.addAll(initialTopics); + } + snapshotEmitted.set(true); + // Hash short-circuit: if the client tells us it already has this + // exact set (reconnect within an unchanged window), don't waste + // bytes on the wire. Future Diffs flow as usual. + if (clientHash != null) { + String serverHash = TopicList.calculateHash(initialTopics); + if (clientHash.equals(serverHash)) { + log.info().attr("topics", initialTopics.size()) + .log("Reconnect hash matched; skipping snapshot"); + return; + } + } + log.info().attr("topics", initialTopics.size()).log("Initial snapshot"); + cnx.ctx().writeAndFlush( + Commands.newWatchScalableTopicsSnapshot(watchId, initialTopics)); + }); + } + + /** + * Invoked by {@link ScalableTopicResources} for every metadata event whose path + * is a direct child of this watcher's namespace base path. The resources-level + * fan-out has already done the namespace + direct-child filtering, so we go + * straight to evaluating the filter and updating the matching set. + */ + @Override + public void onNotification(Notification notification) { + if (closed.get()) { + return; + } + String path = notification.getPath(); + // Resources-level fan-out guarantees direct-child paths under basePath, but + // re-derive the encoded local name defensively. + String rest = path.startsWith(basePath + "/") + ? path.substring(basePath.length() + 1) : path; + + String topicName = TopicName.get("topic", namespace, Codec.decode(rest)).toString(); + + if (notification.getType() == NotificationType.Deleted) { + if (currentSet.remove(topicName)) { + enqueueRemoved(topicName); + } + return; + } + + // Created or Modified — fetch the new value to evaluate the filter against. + TopicName tn = TopicName.get(topicName); + resources.getScalableTopicMetadataAsync(tn, true) + .whenComplete((optMd, ex) -> { + if (closed.get()) { + return; + } + if (ex != null) { + log.warn().attr("topic", topicName).exceptionMessage(ex) + .log("Failed to load scalable topic metadata for filter eval"); + return; + } + boolean wasInSet = currentSet.contains(topicName); + boolean shouldBeInSet = optMd.isPresent() && matchesFilters(optMd.get()); + if (!wasInSet && shouldBeInSet) { + currentSet.add(topicName); + enqueueAdded(topicName); + } else if (wasInSet && !shouldBeInSet) { + currentSet.remove(topicName); + enqueueRemoved(topicName); + } + }); + } + + private boolean matchesFilters(ScalableTopicMetadata metadata) { + if (propertyFilters.isEmpty()) { + return true; + } + Map p = metadata.getProperties(); + if (p == null) { + return false; + } + for (var e : propertyFilters.entrySet()) { + if (!e.getValue().equals(p.get(e.getKey()))) { + return false; + } + } + return true; + } + + private void enqueueAdded(String topic) { + synchronized (coalesceLock) { + // Cancel out a pending remove (e.g. rapid remove-then-add of same topic). + pendingRemoved.remove(topic); + pendingAdded.add(topic); + scheduleFlush(); + } + } + + private void enqueueRemoved(String topic) { + synchronized (coalesceLock) { + pendingAdded.remove(topic); + pendingRemoved.add(topic); + scheduleFlush(); + } + } + + private void scheduleFlush() { + // Caller holds coalesceLock. + if (flushScheduled) { + return; + } + flushScheduled = true; + scheduler.schedule(this::flushPending, COALESCE_WINDOW.toMillis(), TimeUnit.MILLISECONDS); + } + + private void flushPending() { + if (closed.get()) { + return; + } + Set added; + Set removed; + synchronized (coalesceLock) { + added = pendingAdded.isEmpty() ? Set.of() : new LinkedHashSet<>(pendingAdded); + removed = pendingRemoved.isEmpty() ? Set.of() : new LinkedHashSet<>(pendingRemoved); + pendingAdded.clear(); + pendingRemoved.clear(); + flushScheduled = false; + } + if (added.isEmpty() && removed.isEmpty()) { + return; + } + // Wait until the initial snapshot was emitted: any deltas that fire before the + // snapshot is sent have already been folded into currentSet via onNotification's + // wasInSet check (same set we built the snapshot from), so they're correctly + // represented. We just need to send them AFTER the snapshot frame. + if (!snapshotEmitted.get()) { + // Re-defer: the snapshot future is short-lived, retry shortly. + scheduler.schedule(this::flushPending, COALESCE_WINDOW.toMillis(), TimeUnit.MILLISECONDS); + // Re-enqueue what we drained, since the next flush will rebuild from pending. + synchronized (coalesceLock) { + pendingAdded.addAll(added); + pendingRemoved.addAll(removed); + flushScheduled = true; + } + return; + } + log.info().attr("added", added.size()).attr("removed", removed.size()).log("Pushing diff"); + cnx.ctx().writeAndFlush(Commands.newWatchScalableTopicsDiff(watchId, added, removed)); + } + + /** + * Drop the session. Deregister from the resources' namespace listener registry so + * the per-event fan-out skips us — no listener leak, no per-notification dispatch + * tax for the broker's lifetime. + */ + public void close() { + if (!closed.compareAndSet(false, true)) { + return; + } + resources.deregisterNamespaceListener(this); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSessionHashTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSessionHashTest.java new file mode 100644 index 0000000000000..1e9d84897cdd2 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSessionHashTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultChannelPromise; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.concurrent.ImmediateEventExecutor; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.pulsar.broker.resources.ScalableTopicMetadata; +import org.apache.pulsar.broker.resources.ScalableTopicResources; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Coverage for the reconnect-hash short-circuit in + * {@link ScalableTopicsWatcherSession}: when the client's reported hash matches + * the freshly-computed server hash, {@code start()} returns without writing a + * Snapshot frame; otherwise it emits Snapshot as usual. + */ +public class ScalableTopicsWatcherSessionHashTest { + + private MetadataStoreExtended store; + private ScalableTopicResources resources; + private ServerCnx cnx; + private ChannelHandlerContext ctx; + private ScheduledExecutorService scheduler; + + @BeforeMethod + public void setUp() throws Exception { + store = new LocalMemoryMetadataStore("memory:local", + MetadataStoreConfig.builder().build()); + resources = new ScalableTopicResources(store, 30); + cnx = mock(ServerCnx.class); + ctx = mock(ChannelHandlerContext.class); + when(cnx.ctx()).thenReturn(ctx); + // writeAndFlush returns a ChannelFuture; provide a no-op promise to keep the + // call chain happy. The ScalableTopicsWatcherSession ignores the return. + when(ctx.writeAndFlush(org.mockito.ArgumentMatchers.any())) + .thenReturn(new DefaultChannelPromise(new EmbeddedChannel(), + ImmediateEventExecutor.INSTANCE)); + scheduler = Executors.newSingleThreadScheduledExecutor(); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws Exception { + if (scheduler != null) { + scheduler.shutdownNow(); + } + if (store != null) { + store.close(); + } + } + + private ScalableTopicMetadata meta() { + return ScalableTopicMetadata.builder().epoch(0).nextSegmentId(1) + .properties(Map.of()).build(); + } + + @Test + public void noHashOnFirstSubscribeEmitsSnapshot() throws Exception { + NamespaceName ns = NamespaceName.get("tenant/ns-fresh-" + suffix()); + TopicName tn = TopicName.get("topic://" + ns + "/t1"); + resources.createScalableTopicAsync(tn, meta()).get(); + + var session = new ScalableTopicsWatcherSession(1L, ns, Map.of(), + /* clientHash= */ null, cnx, resources, scheduler); + session.start().get(); + + // First subscribe: no hash → broker must emit one Snapshot frame. + verify(ctx, atLeastOnce()).writeAndFlush(org.mockito.ArgumentMatchers.any(ByteBuf.class)); + } + + @Test + public void matchingHashOnReconnectSkipsSnapshot() throws Exception { + NamespaceName ns = NamespaceName.get("tenant/ns-match-" + suffix()); + TopicName tn = TopicName.get("topic://" + ns + "/t1"); + resources.createScalableTopicAsync(tn, meta()).get(); + + // Client believes the namespace contains exactly this one topic. Compute the + // matching hash with the same function the broker uses (TopicList.crc32c). + String matchingHash = TopicList.calculateHash(List.of(tn.toString())); + + var session = new ScalableTopicsWatcherSession(2L, ns, Map.of(), + /* clientHash= */ matchingHash, cnx, resources, scheduler); + session.start().get(); + + // Hash matched → no Snapshot frame written. Future Diffs would still flow + // through writeAndFlush, but start() itself must stay silent. + verify(ctx, never()).writeAndFlush(org.mockito.ArgumentMatchers.any(ByteBuf.class)); + } + + @Test + public void differingHashOnReconnectStillEmitsSnapshot() throws Exception { + NamespaceName ns = NamespaceName.get("tenant/ns-diff-" + suffix()); + TopicName tn = TopicName.get("topic://" + ns + "/t1"); + resources.createScalableTopicAsync(tn, meta()).get(); + + // Client thinks the set is something different. Broker must emit a fresh + // Snapshot so the client can reconcile. + String staleHash = TopicList.calculateHash(List.of("topic://" + ns + "/something-else")); + + var session = new ScalableTopicsWatcherSession(3L, ns, Map.of(), + /* clientHash= */ staleHash, cnx, resources, scheduler); + session.start().get(); + + verify(ctx, atLeastOnce()).writeAndFlush(org.mockito.ArgumentMatchers.any(ByteBuf.class)); + } + + private static String suffix() { + return UUID.randomUUID().toString().substring(0, 8); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java new file mode 100644 index 0000000000000..ffdf0b0d277aa --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.NamespaceName; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +/** + * End-to-end coverage for the broker-side namespace scalable-topics watcher. + * + *

Uses reflection to construct a {@code ScalableTopicsWatcher} (package-private + * in the v5 module) directly against the test cluster's v4 client. Verifies: + *

    + *
  • Initial snapshot reflects pre-existing topics.
  • + *
  • Topic create / delete fires {@code Diff} events with the right names.
  • + *
  • Property filters narrow the matching set, and updating a topic's properties + * to fall outside the filter emits {@code Removed}.
  • + *
+ */ +public class V5ScalableTopicsWatcherTest extends V5ClientBaseTest { + + /** Captures every Snapshot/Diff so tests can assert on the cumulative state. */ + private static final class CapturingListener { + final Set currentSet = ConcurrentHashMap.newKeySet(); + final List> snapshots = new CopyOnWriteArrayList<>(); + // (added, removed) per diff + final List, List>> diffs = new CopyOnWriteArrayList<>(); + + void onSnapshot(List topics) { + snapshots.add(List.copyOf(topics)); + currentSet.clear(); + currentSet.addAll(topics); + } + + void onDiff(List added, List removed) { + diffs.add(Map.entry(List.copyOf(added), List.copyOf(removed))); + currentSet.removeAll(removed); + currentSet.addAll(added); + } + } + + @Test + public void watcherEmitsCreateAndDeleteEvents() throws Exception { + NamespaceName ns = NamespaceName.get(getNamespace()); + + WatcherHandle handle = openWatcher(ns, Map.of()); + try { + // Initial snapshot should be empty (or whatever pre-existing topics; the + // shared cluster gives us a fresh per-test namespace so it should be empty). + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .until(() -> !handle.listener.snapshots.isEmpty()); + assertEquals(handle.listener.snapshots.get(0).size(), 0, + "fresh namespace should yield empty initial snapshot"); + + String topicA = ns + "/a-" + UUID.randomUUID().toString().substring(0, 8); + String topicB = ns + "/b-" + UUID.randomUUID().toString().substring(0, 8); + admin.scalableTopics().createScalableTopic("topic://" + topicA, 1); + admin.scalableTopics().createScalableTopic("topic://" + topicB, 1); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(handle.listener.currentSet, Set.of( + "topic://" + topicA, "topic://" + topicB))); + + admin.scalableTopics().deleteScalableTopic("topic://" + topicA, true); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(handle.listener.currentSet, Set.of("topic://" + topicB))); + } finally { + handle.close(); + } + } + + @Test + public void watcherFiltersByProperty() throws Exception { + NamespaceName ns = NamespaceName.get(getNamespace()); + + WatcherHandle handle = openWatcher(ns, Map.of("owner", "alice")); + try { + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .until(() -> !handle.listener.snapshots.isEmpty()); + + String aliceTopic = ns + "/alice-" + UUID.randomUUID().toString().substring(0, 8); + String bobTopic = ns + "/bob-" + UUID.randomUUID().toString().substring(0, 8); + admin.scalableTopics().createScalableTopic("topic://" + aliceTopic, 1, + Map.of("owner", "alice")); + admin.scalableTopics().createScalableTopic("topic://" + bobTopic, 1, + Map.of("owner", "bob")); + + // Only alice's topic should surface — bob is filtered out. + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(handle.listener.currentSet, Set.of("topic://" + aliceTopic))); + + // bob's topic exists but never reaches the watcher's set. + assertTrue(!handle.listener.currentSet.contains("topic://" + bobTopic)); + } finally { + handle.close(); + } + } + + @Test + public void watcherEmptyFilterSubscribesToEveryTopic() throws Exception { + NamespaceName ns = NamespaceName.get(getNamespace()); + + // Pre-create a topic before the watcher opens so the initial snapshot has work + // to do (proves the snapshot path, not just the live-event path). + String preTopic = ns + "/pre-" + UUID.randomUUID().toString().substring(0, 8); + admin.scalableTopics().createScalableTopic("topic://" + preTopic, 1); + + WatcherHandle handle = openWatcher(ns, Map.of()); + try { + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(handle.listener.currentSet, Set.of("topic://" + preTopic))); + } finally { + handle.close(); + } + } + + // --- Helpers --- + + /** Bundle of watcher state needed to clean up at the end of a test. */ + private static final class WatcherHandle implements AutoCloseable { + final Object watcher; + final CapturingListener listener; + final CountDownLatch ready; + + WatcherHandle(Object watcher, CapturingListener listener, CountDownLatch ready) { + this.watcher = watcher; + this.listener = listener; + this.ready = ready; + } + + @Override + public void close() throws Exception { + Method closeMethod = watcher.getClass().getDeclaredMethod("close"); + closeMethod.setAccessible(true); + closeMethod.invoke(watcher); + } + } + + /** + * Construct a {@code ScalableTopicsWatcher} via reflection (it's package-private + * inside the v5 module, but we drive it directly from the broker tests). Wire up + * a {@link CapturingListener}, call {@code start()}, and return a handle. The + * initial snapshot is delivered via the future + {@code onSnapshot} on the + * listener for uniform handling — see the watcher's javadoc. + */ + private WatcherHandle openWatcher(NamespaceName ns, Map filters) + throws Exception { + // Pull the underlying v4 client out of v5Client so we can construct the + // (package-private) watcher directly. + Field v4Field = v5Client.getClass().getDeclaredField("v4Client"); + v4Field.setAccessible(true); + PulsarClientImpl v4 = (PulsarClientImpl) v4Field.get(v5Client); + + Class watcherCls = Class.forName( + "org.apache.pulsar.client.impl.v5.ScalableTopicsWatcher"); + Constructor ctor = watcherCls.getDeclaredConstructor( + PulsarClientImpl.class, NamespaceName.class, Map.class); + ctor.setAccessible(true); + Object watcher = ctor.newInstance(v4, ns, filters); + + CapturingListener listener = new CapturingListener(); + + // The watcher's Listener is a package-private nested interface; use a Proxy. + Class listenerCls = Class.forName( + "org.apache.pulsar.client.impl.v5.ScalableTopicsWatcher$Listener"); + Object listenerProxy = java.lang.reflect.Proxy.newProxyInstance( + listenerCls.getClassLoader(), new Class[]{listenerCls}, (proxy, method, args) -> { + switch (method.getName()) { + case "onSnapshot": + listener.onSnapshot((List) args[0]); + break; + case "onDiff": + listener.onDiff((List) args[0], (List) args[1]); + break; + default: + // Object methods (toString, hashCode, equals) — defaults are fine. + } + return null; + }); + + Method setListener = watcherCls.getDeclaredMethod("setListener", listenerCls); + setListener.setAccessible(true); + setListener.invoke(watcher, listenerProxy); + + Method start = watcherCls.getDeclaredMethod("start"); + start.setAccessible(true); + java.util.concurrent.CompletableFuture> startFut = + (java.util.concurrent.CompletableFuture>) start.invoke(watcher); + + CountDownLatch ready = new CountDownLatch(1); + startFut.thenAccept(initialTopics -> { + // start()'s future delivers the initial snapshot — feed it into the listener + // for uniform handling with subsequent snapshots. + listener.onSnapshot(initialTopics); + ready.countDown(); + }); + if (!ready.await(10, TimeUnit.SECONDS)) { + throw new AssertionError("watcher did not deliver initial snapshot in 10s"); + } + return new WatcherHandle(watcher, listener, ready); + } +} diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java new file mode 100644 index 0000000000000..0350d83df52eb --- /dev/null +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.v5; + +import io.github.merlimat.slog.Logger; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.ScalableTopicsWatcherSession; +import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.common.util.Backoff; + +/** + * Client-side manager for a namespace-wide scalable-topics watch session. + * + *

Opens a {@code CommandWatchScalableTopics} on attach, awaits the initial + * {@code Snapshot}, and forwards subsequent {@code Snapshot} / {@code Diff} events + * to a {@link Listener}. On connection drop, schedules a reconnect with backoff; + * when the new session lands, the broker pushes a fresh snapshot and the client + * applies it as a state replacement (no missed events). + * + *

Mirrors {@link DagWatchClient} in shape; the wire path is different — watch + * subscribe is fire-and-forget (no request/response), with the initial snapshot + * arriving as the first {@code WatchScalableTopicsUpdate}. + */ +final class ScalableTopicsWatcher implements ScalableTopicsWatcherSession, AutoCloseable { + + private static final Logger LOG = Logger.get(ScalableTopicsWatcher.class); + private static final AtomicLong WATCH_ID_GENERATOR = new AtomicLong(0); + + /** + * Listener for membership events. The watcher delivers events on the netty IO + * thread; implementations should not block. + */ + interface Listener { + /** Full set; replace any local state. */ + void onSnapshot(List topics); + + /** Apply removed before added (covers rapid remove-then-add of same name). */ + void onDiff(List added, List removed); + } + + private final Logger log; + private final PulsarClientImpl v4Client; + private final NamespaceName namespace; + private final Map propertyFilters; + private final long watchId; + private final Backoff reconnectBackoff; + + private final CompletableFuture> initialSnapshotFuture = new CompletableFuture<>(); + /** + * Mirrors the broker's view of the matching set so we can hand a hash on + * reconnect — when the set hasn't changed, the broker skips emitting a + * fresh snapshot. Updated on every Snapshot replace and Diff apply. + * Synchronised because Snapshot / Diff arrive on the netty thread but the + * hash may be read on a reconnect callback running elsewhere. + */ + private final Set currentSet = Collections.synchronizedSet(new HashSet<>()); + private volatile Listener listener; + private volatile ClientCnx cnx; + private volatile boolean closed = false; + + /** + * @param v4Client the underlying v4 client used to open broker connections + * @param namespace namespace to watch + * @param propertyFilters AND filters; empty map = match all + */ + ScalableTopicsWatcher(PulsarClientImpl v4Client, NamespaceName namespace, + Map propertyFilters) { + this.v4Client = v4Client; + this.namespace = namespace; + this.propertyFilters = propertyFilters == null ? Map.of() : propertyFilters; + this.watchId = WATCH_ID_GENERATOR.incrementAndGet(); + this.reconnectBackoff = Backoff.builder() + .initialDelay(Duration.ofMillis(100)) + .maxBackoff(Duration.ofSeconds(30)) + .build(); + this.log = LOG.with() + .attr("namespace", namespace) + .attr("watchId", watchId) + .attr("filters", this.propertyFilters) + .build(); + } + + /** + * Open the watch session on a connection to the configured service URL. + * Resolves with the initial snapshot's topic list. After this returns, every + * subsequent {@code Snapshot} / {@code Diff} flows through {@link #setListener}. + */ + CompletableFuture> start() { + v4Client.getConnectionToServiceUrl() + .thenAccept(this::attach) + .exceptionally(ex -> { + initialSnapshotFuture.completeExceptionally(ex); + return null; + }); + return initialSnapshotFuture; + } + + private void attach(ClientCnx newCnx) { + if (closed) { + return; + } + this.cnx = newCnx; + if (!newCnx.isSupportsScalableTopics()) { + initialSnapshotFuture.completeExceptionally( + new PulsarClientException.FeatureNotSupportedException( + "Broker does not support scalable topics", + PulsarClientException.FailedFeatureCheck.SupportsScalableTopics)); + return; + } + newCnx.registerScalableTopicsWatcher(watchId, this); + // First subscribe: send no hash so the broker emits the initial snapshot + // unconditionally. snapshot is what populates initialSnapshotFuture. + newCnx.ctx().writeAndFlush(Commands.newWatchScalableTopics( + watchId, namespace.toString(), propertyFilters, + /* currentHash= */ null)) + .addListener(writeFuture -> { + if (!writeFuture.isSuccess()) { + newCnx.removeScalableTopicsWatcher(watchId); + if (!initialSnapshotFuture.isDone()) { + initialSnapshotFuture.completeExceptionally( + new PulsarClientException(writeFuture.cause())); + } + } + }); + } + + @Override + public void onSnapshot(List topics) { + if (closed) { + return; + } + log.info().attr("topics", topics.size()).log("Snapshot received"); + // Reset backoff on every successful snapshot — that's the broker confirming + // the session is live and our local state is consistent. + reconnectBackoff.reset(); + // Replace local set so the next reconnect computes the right hash. + synchronized (currentSet) { + currentSet.clear(); + currentSet.addAll(topics); + } + if (!initialSnapshotFuture.isDone()) { + initialSnapshotFuture.complete(topics); + // The listener is set by the caller AFTER start() resolves, so the initial + // snapshot is delivered via the future, not via onSnapshot's fan-out. + return; + } + Listener l = listener; + if (l != null) { + try { + l.onSnapshot(topics); + } catch (Exception e) { + log.error().exception(e).log("Listener threw on snapshot"); + } + } + } + + @Override + public void onDiff(List added, List removed) { + if (closed) { + return; + } + log.info().attr("added", added.size()).attr("removed", removed.size()) + .log("Diff received"); + // Apply removed before added — covers rapid remove-then-add of the same name. + synchronized (currentSet) { + currentSet.removeAll(removed); + currentSet.addAll(added); + } + Listener l = listener; + if (l != null) { + try { + l.onDiff(added, removed); + } catch (Exception e) { + log.error().exception(e).log("Listener threw on diff"); + } + } + } + + /** + * Snapshot the current set under lock so the hash + the watch frame agree on + * the same view. CRC32C of sorted topic names — same function used by + * {@code CommandGetTopicsOfNamespace} so behaviour matches the existing + * topic-list watch on the wire. + */ + private String currentSetHash() { + synchronized (currentSet) { + if (currentSet.isEmpty()) { + return TopicList.calculateHash(java.util.List.of()); + } + return TopicList.calculateHash(new HashSet<>(currentSet)); + } + } + + @Override + public void onError(ServerError error, String message) { + log.warn().attr("error", error).attr("message", message) + .log("WatchScalableTopics rejected"); + if (!initialSnapshotFuture.isDone()) { + initialSnapshotFuture.completeExceptionally( + new PulsarClientException("WatchScalableTopics failed: " + error + + (message != null ? " - " + message : ""))); + } + } + + @Override + public void connectionClosed() { + log.warn("Scalable-topics watcher connection closed"); + cnx = null; + if (closed) { + return; + } + if (!initialSnapshotFuture.isDone()) { + // Initial subscribe never completed — surface the failure rather than + // retrying silently behind the caller. + initialSnapshotFuture.completeExceptionally( + new PulsarClientException( + "Connection closed before initial scalable-topics snapshot arrived")); + return; + } + scheduleReconnect(); + } + + private void scheduleReconnect() { + if (closed) { + return; + } + long delayMs = reconnectBackoff.next().toMillis(); + log.info().attr("delayMs", delayMs).log("Scheduling watcher reconnect"); + v4Client.timer().newTimeout(timeout -> reconnect(), + delayMs, TimeUnit.MILLISECONDS); + } + + private void reconnect() { + if (closed) { + return; + } + v4Client.getConnectionToServiceUrl() + .thenAccept(newCnx -> { + if (closed) { + return; + } + if (!newCnx.isSupportsScalableTopics()) { + log.warn().log("Watcher reconnect: broker doesn't support scalable topics"); + scheduleReconnect(); + return; + } + this.cnx = newCnx; + newCnx.registerScalableTopicsWatcher(watchId, this); + // Reconnect: send the hash of our current set. If the broker's + // freshly-computed hash matches, it skips emitting a Snapshot — + // the watch is live and our local state is correct. Future + // Diffs flow as usual; if the hash differs the broker sends a + // Snapshot which we apply as a full-state replace. + String hash = currentSetHash(); + newCnx.ctx().writeAndFlush(Commands.newWatchScalableTopics( + watchId, namespace.toString(), + propertyFilters, hash)) + .addListener(writeFuture -> { + if (!writeFuture.isSuccess()) { + newCnx.removeScalableTopicsWatcher(watchId); + log.warn().exceptionMessage(writeFuture.cause()) + .log("Watcher reconnect write failed"); + scheduleReconnect(); + return; + } + // Write reached the broker — connection is healthy. + // Reset the backoff so the next disconnect starts + // fresh. Crucial for the hash-skip path: the broker + // emits no Snapshot, so onSnapshot's reset never + // fires; without this, a chain of short blips keeps + // the backoff at its peak forever. + reconnectBackoff.reset(); + }); + }) + .exceptionally(ex -> { + log.warn().exceptionMessage(ex).log("Watcher reconnect failed"); + scheduleReconnect(); + return null; + }); + } + + /** + * Set the listener that receives {@code Snapshot} / {@code Diff} events. Should + * be called after {@link #start()} resolves — the initial snapshot is delivered + * via that future, not via the listener. + */ + void setListener(Listener listener) { + this.listener = listener; + } + + /** + * Visible for testing — snapshot of the current set. In production, the + * listener is the source of truth; this method exists so tests can poke the + * watcher's hash-tracking state directly. + */ + Set currentSetForTesting() { + synchronized (currentSet) { + return new HashSet<>(currentSet); + } + } + + long watchId() { + return watchId; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + ClientCnx c = cnx; + if (c != null) { + c.removeScalableTopicsWatcher(watchId); + c.ctx().writeAndFlush(Commands.newWatchScalableTopicsClose(watchId)); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index f45fc2c91f4fd..40c4e12aa0533 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -178,6 +178,18 @@ public class ClientCnx extends PulsarHandler { .concurrencyLevel(1) .build(); + /** + * Per-watcher namespace scalable-topics watch sessions, keyed by the + * {@code watchId} chosen by the client. The broker tags every + * {@link org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsUpdate} with + * this id. + */ + private final ConcurrentLongHashMap scalableTopicsWatchers = + ConcurrentLongHashMap.newBuilder() + .expectedItems(4) + .concurrencyLevel(1) + .build(); + private final CompletableFuture connectionFuture = new CompletableFuture(); private final ConcurrentLinkedQueue requestTimeoutQueue = new ConcurrentLinkedQueue<>(); @@ -375,6 +387,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { topicListWatchers.forEach((__, watcher) -> watcher.connectionClosed(this)); dagWatchSessions.forEach((__, session) -> session.connectionClosed()); scalableConsumerSessions.forEach((__, session) -> session.connectionClosed()); + scalableTopicsWatchers.forEach((__, session) -> session.connectionClosed()); waitingLookupRequests.clear(); @@ -383,6 +396,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { topicListWatchers.clear(); dagWatchSessions.clear(); scalableConsumerSessions.clear(); + scalableTopicsWatchers.clear(); timeoutTask.cancel(true); } @@ -1427,6 +1441,75 @@ public void removeScalableConsumerSession(long consumerId) { scalableConsumerSessions.remove(consumerId); } + @Override + protected void handleCommandWatchScalableTopicsUpdate( + org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsUpdate cmd) { + checkArgument(state == State.Ready); + + long watchId = cmd.getWatchId(); + log.debug().attr("watchId", watchId).log("Received WatchScalableTopicsUpdate"); + + if (cmd.hasError()) { + // Error response — terminal for this watch (subscribe was rejected or the + // server failed to compute the initial set). Drop the local registration so + // a retry from the caller starts fresh. + ScalableTopicsWatcherSession session = scalableTopicsWatchers.remove(watchId); + if (session != null) { + session.onError(cmd.getError(), cmd.hasMessage() ? cmd.getMessage() : null); + } else { + log.warn().attr("watchId", watchId) + .log("Received scalable-topics watch error for unknown watcher"); + } + return; + } + + ScalableTopicsWatcherSession session = scalableTopicsWatchers.get(watchId); + if (session == null) { + log.warn().attr("watchId", watchId) + .log("Received scalable-topics watch update for unknown watcher"); + return; + } + // Snapshot and diff are mutually exclusive via the proto oneof; switch on the + // generated event-case enum and unpack accordingly. + switch (cmd.getEventCase()) { + case SNAPSHOT -> { + var snapshot = cmd.getSnapshot(); + java.util.List topics = new java.util.ArrayList<>(snapshot.getTopicsCount()); + for (int i = 0; i < snapshot.getTopicsCount(); i++) { + topics.add(snapshot.getTopicAt(i)); + } + session.onSnapshot(topics); + } + case DIFF -> { + var diff = cmd.getDiff(); + // LightProto pluralises count accessors with a trailing 's' on the field name, + // hence `getAddedsCount` / `getRemovedsCount`. + java.util.List added = new java.util.ArrayList<>(diff.getAddedsCount()); + for (int i = 0; i < diff.getAddedsCount(); i++) { + added.add(diff.getAddedAt(i)); + } + java.util.List removed = new java.util.ArrayList<>(diff.getRemovedsCount()); + for (int i = 0; i < diff.getRemovedsCount(); i++) { + removed.add(diff.getRemovedAt(i)); + } + session.onDiff(added, removed); + } + case NOT_SET -> log.warn().attr("watchId", watchId) + .log("Received scalable-topics watch update with no event payload"); + default -> log.warn().attr("watchId", watchId) + .attr("case", cmd.getEventCase()) + .log("Received scalable-topics watch update with unknown event case"); + } + } + + public void registerScalableTopicsWatcher(long watchId, ScalableTopicsWatcherSession watcher) { + scalableTopicsWatchers.put(watchId, watcher); + } + + public void removeScalableTopicsWatcher(long watchId) { + scalableTopicsWatchers.remove(watchId); + } + /** * check serverError and take appropriate action. *

    diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableTopicsWatcherSession.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableTopicsWatcherSession.java new file mode 100644 index 0000000000000..7b71dc208ec38 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableTopicsWatcherSession.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.List; +import org.apache.pulsar.common.api.proto.ServerError; + +/** + * Client-side callback for a namespace-wide scalable-topics watch session. + * + *

    The broker pushes either a full {@code Snapshot} (initial subscribe and on every + * reconnect resync) or an incremental {@code Diff}. Implementations apply the + * snapshot as a full state replacement and the diff as a set delta — both are + * idempotent and self-healing across reconnects. + * + *

    Implemented by the V5 client's {@code ScalableTopicsWatcher}. + */ +public interface ScalableTopicsWatcherSession { + + /** + * Full set of topics currently matching the watch's filters. The implementation + * should replace any local state derived from prior events with this snapshot. + */ + void onSnapshot(List topics); + + /** + * Incremental membership change. Apply {@code removed} before {@code added} + * (covers a rapid remove-then-add of the same topic name within a coalescing + * window). + */ + void onDiff(List added, List removed); + + /** + * The broker rejected the watch (e.g. authz, broker shutting down). + * Implementations should fail any pending start future and stop emitting events. + */ + void onError(ServerError error, String message); + + /** + * The underlying connection dropped. Implementations should treat any local set + * as stale until the next snapshot arrives. + */ + void connectionClosed(); +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 02e03a491d071..2c0c74fb378b8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1774,6 +1774,95 @@ public static ByteBuf newScalableTopicAssignmentUpdate(long consumerId, return serializeWithSize(cmd); } + /** + * Client -> Broker: open a scalable-topics watch session. + * + * @param watchId client-assigned watch identifier + * @param namespace tenant/namespace to scope the watch to + * @param consumerName optional caller identity (carried for a future namespace + * coordinator); pass {@code null} if not yet assigned + * @param propertyFilters AND filters; empty / null = match all topics in the namespace + */ + /** + * @param currentHash optional hash of the client's currently-known topic set. + * Pass on reconnect to let the broker skip the snapshot when + * state hasn't changed; pass {@code null} on first subscribe. + */ + public static ByteBuf newWatchScalableTopics(long watchId, String namespace, + java.util.Map propertyFilters, + String currentHash) { + BaseCommand cmd = localCmd(Type.WATCH_SCALABLE_TOPICS); + org.apache.pulsar.common.api.proto.CommandWatchScalableTopics watch = + cmd.setWatchScalableTopics() + .setWatchId(watchId) + .setNamespace(namespace); + if (propertyFilters != null) { + for (var entry : propertyFilters.entrySet()) { + watch.addPropertyFilter() + .setKey(entry.getKey()) + .setValue(entry.getValue()); + } + } + if (currentHash != null) { + watch.setCurrentHash(currentHash); + } + return serializeWithSize(cmd); + } + + public static ByteBuf newWatchScalableTopicsClose(long watchId) { + BaseCommand cmd = localCmd(Type.WATCH_SCALABLE_TOPICS_CLOSE); + cmd.setWatchScalableTopicsClose().setWatchId(watchId); + return serializeWithSize(cmd); + } + + /** + * Broker -> Client: emit a full snapshot of the matching topic set. Sent on initial + * subscribe and on every reconnect-resync; the client replaces its local state. + */ + public static ByteBuf newWatchScalableTopicsSnapshot(long watchId, + java.util.Collection topics) { + BaseCommand cmd = new BaseCommand().setType(Type.WATCH_SCALABLE_TOPICS_UPDATE); + var update = cmd.setWatchScalableTopicsUpdate().setWatchId(watchId); + var snapshot = update.setSnapshot(); + for (String t : topics) { + snapshot.addTopic(t); + } + return serializeWithSize(cmd); + } + + /** + * Broker -> Client: emit an incremental membership change. Either {@code added} or + * {@code removed} (or both) may be empty. Apply removed before added when both + * appear together. + */ + public static ByteBuf newWatchScalableTopicsDiff(long watchId, + java.util.Collection added, + java.util.Collection removed) { + BaseCommand cmd = new BaseCommand().setType(Type.WATCH_SCALABLE_TOPICS_UPDATE); + var update = cmd.setWatchScalableTopicsUpdate().setWatchId(watchId); + var diff = update.setDiff(); + if (added != null) { + for (String t : added) { + diff.addAdded(t); + } + } + if (removed != null) { + for (String t : removed) { + diff.addRemoved(t); + } + } + return serializeWithSize(cmd); + } + + public static ByteBuf newWatchScalableTopicsError(long watchId, ServerError error, String message) { + BaseCommand cmd = new BaseCommand().setType(Type.WATCH_SCALABLE_TOPICS_UPDATE); + cmd.setWatchScalableTopicsUpdate() + .setWatchId(watchId) + .setError(error) + .setMessage(message); + return serializeWithSize(cmd); + } + public static ByteBuf serializeWithSize(BaseCommand cmd) { return serializeWithPrecalculatedSerializedSize(cmd, cmd.getSerializedSize()); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index 0eb2b1b9670dd..bca87683f279c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -514,6 +514,21 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception handleCommandScalableTopicAssignmentUpdate(cmd.getScalableTopicAssignmentUpdate()); break; + case WATCH_SCALABLE_TOPICS: + checkArgument(cmd.hasWatchScalableTopics()); + handleCommandWatchScalableTopics(cmd.getWatchScalableTopics()); + break; + + case WATCH_SCALABLE_TOPICS_UPDATE: + checkArgument(cmd.hasWatchScalableTopicsUpdate()); + handleCommandWatchScalableTopicsUpdate(cmd.getWatchScalableTopicsUpdate()); + break; + + case WATCH_SCALABLE_TOPICS_CLOSE: + checkArgument(cmd.hasWatchScalableTopicsClose()); + handleCommandWatchScalableTopicsClose(cmd.getWatchScalableTopicsClose()); + break; + default: break; } @@ -807,6 +822,23 @@ protected void handleCommandScalableTopicAssignmentUpdate( throw new UnsupportedOperationException(); } + protected void handleCommandWatchScalableTopics( + org.apache.pulsar.common.api.proto.CommandWatchScalableTopics commandWatchScalableTopics) { + throw new UnsupportedOperationException(); + } + + protected void handleCommandWatchScalableTopicsUpdate( + org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsUpdate + commandWatchScalableTopicsUpdate) { + throw new UnsupportedOperationException(); + } + + protected void handleCommandWatchScalableTopicsClose( + org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsClose + commandWatchScalableTopicsClose) { + throw new UnsupportedOperationException(); + } + private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) { NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 20bd8ab818513..6d13529731777 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -940,6 +940,57 @@ message CommandScalableTopicAssignmentUpdate { required ScalableConsumerAssignment assignment = 2; } +// Multi-topic consumer watcher: subscribes to the union of scalable topics in a +// namespace that match a (possibly empty) set of property filters. The broker keeps +// pushing updates as topics enter or leave the matching set. See +// `multi-topic-consumer-design.md` for the full design. + +// Client -> Broker: open a watch session. +message CommandWatchScalableTopics { + required uint64 watch_id = 1; // Client-assigned watch ID + required string namespace = 2; // tenant/namespace + // Optional AND filters; empty list means "match all topics in the namespace". + repeated KeyValue property_filters = 3; + // Hash of the topics the client believes are currently in its set. Sent on + // reconnect; absent on first subscribe. If it matches the broker's freshly + // computed hash, the broker skips emitting the initial Snapshot — the client's + // local state is already correct and future Diffs will flow as usual. Same + // hash function as CommandGetTopicsOfNamespace (CRC32C over sorted topics). + optional string current_hash = 4; +} + +// Snapshot of the full matching set. Sent on initial subscribe and on every +// reconnect-resync. The client replaces its local set with this list. +message ScalableTopicsSnapshot { + repeated string topics = 1; +} + +// Incremental membership change. Apply removed before added when both are present. +message ScalableTopicsDiff { + repeated string added = 1; + repeated string removed = 2; +} + +// Broker -> Client: either Snapshot or Diff (mutually exclusive via oneof). When the +// initial subscribe fails, neither variant is set and `error`/`message` carry the +// failure reason. +message CommandWatchScalableTopicsUpdate { + required uint64 watch_id = 1; + + oneof event { + ScalableTopicsSnapshot snapshot = 2; + ScalableTopicsDiff diff = 3; + } + + optional ServerError error = 4; + optional string message = 5; +} + +// Client -> Broker: close the watch session. +message CommandWatchScalableTopicsClose { + required uint64 watch_id = 1; +} + message CommandGetSchema { required uint64 request_id = 1; required string topic = 2; @@ -1185,6 +1236,10 @@ message BaseCommand { SCALABLE_TOPIC_SUBSCRIBE = 73; SCALABLE_TOPIC_SUBSCRIBE_RESPONSE = 74; SCALABLE_TOPIC_ASSIGNMENT_UPDATE = 75; + + WATCH_SCALABLE_TOPICS = 76; + WATCH_SCALABLE_TOPICS_UPDATE = 77; + WATCH_SCALABLE_TOPICS_CLOSE = 78; } @@ -1276,4 +1331,8 @@ message BaseCommand { optional CommandScalableTopicSubscribe scalableTopicSubscribe = 73; optional CommandScalableTopicSubscribeResponse scalableTopicSubscribeResponse = 74; optional CommandScalableTopicAssignmentUpdate scalableTopicAssignmentUpdate = 75; + + optional CommandWatchScalableTopics watchScalableTopics = 76; + optional CommandWatchScalableTopicsUpdate watchScalableTopicsUpdate = 77; + optional CommandWatchScalableTopicsClose watchScalableTopicsClose = 78; }