From b1fd511bf8973efa3b901d19edb37d603e6da069 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 30 Apr 2026 18:45:05 -0700 Subject: [PATCH 1/6] [feat][broker] PIP-468: Namespace scalable-topics watcher protocol + broker session MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation for multi-topic QueueConsumer / StreamConsumer subscriptions filtered by topic properties. The wire layer plus the broker-side watcher session that holds the metadata listener and pushes Snapshot / Diff frames to the client. Protocol (lightproto) - New BaseCommand variants: WATCH_SCALABLE_TOPICS (76) client -> broker: open watch WATCH_SCALABLE_TOPICS_UPDATE (77) broker -> client: Snapshot or Diff WATCH_SCALABLE_TOPICS_CLOSE (78) client -> broker: close - Shapes: CommandWatchScalableTopics { watch_id, namespace, property_filters, consumer_name? } CommandWatchScalableTopicsUpdate { watch_id, snapshot? | diff? | error } ScalableTopicsSnapshot { topics: [string] } ScalableTopicsDiff { added: [string], removed: [string] } - Commands.java helpers + PulsarDecoder dispatch entries. - consumer_name is carried from day one as a hook for a future namespace-level subscription coordinator (see multi-topic-consumer-design.md). Empty for now. Broker session (ScalableTopicsWatcherSession) - Registers a metadata-store listener BEFORE computing the initial set, so events arriving mid-snapshot are captured and replayed via the same diff path. The redundant Add for a topic already in the snapshot is a no-op (set semantics on the client). - Filter evaluation server-side. Created/Modified events read the new value to test against AND filters; Deleted events emit Removed if the topic was in currentSet. - Coalescing window (50 ms) folds back-to-back events into one Diff frame. Add and Remove for the same topic in the same window cancel out (rapid remove-then-add of the same name). - Any broker can serve the role — every broker observes the same metadata events, so no namespace-level coordinator needed. ServerCnx wiring - Per-cnx scalableTopicsWatchers registry; closed in bulk on disconnect. - Authz at subscribe time: NamespaceOperation.GET_TOPICS, mirroring listScalableTopics. - Idempotent close. Helpers - ScalableTopicResources.namespacePath(ns) — used by the watcher to filter notifications to direct children of a namespace's topic prefix. --- .../resources/ScalableTopicResources.java | 9 + .../pulsar/broker/service/ServerCnx.java | 117 +++++++ .../ScalableTopicsWatcherSession.java | 290 ++++++++++++++++++ .../pulsar/common/protocol/Commands.java | 84 +++++ .../pulsar/common/protocol/PulsarDecoder.java | 32 ++ pulsar-common/src/main/proto/PulsarApi.proto | 52 ++++ 6 files changed, 584 insertions(+) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java index 3e01580847641..96d24f46b71ef 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java @@ -279,6 +279,15 @@ public String topicPath(TopicName tn) { return joinPath(SCALABLE_TOPIC_PATH, tn.getNamespace(), tn.getEncodedLocalName()); } + /** + * Path under which all scalable topic records for a namespace live as direct + * children. Used by namespace-wide watchers as the prefix to filter metadata + * notifications down to events that touch a topic record. + */ + public String namespacePath(NamespaceName ns) { + return joinPath(SCALABLE_TOPIC_PATH, ns.toString()); + } + public String subscriptionPath(TopicName tn, String subscription) { return joinPath(topicPath(tn), SUBSCRIPTIONS_SEGMENT, subscription); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2ed913561b2f7..5d85acb0974a0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -486,6 +486,17 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { }); dagWatchSessions.clear(); + // Same for namespace-wide scalable-topic watchers. + scalableTopicsWatchers.values().forEach(session -> { + try { + session.close(); + } catch (Exception e) { + log.warn().exceptionMessage(e) + .log("Error closing scalable-topics watcher on connection close"); + } + }); + scalableTopicsWatchers.clear(); + // Notify the scalable-topic controller that this connection's scalable consumers // have dropped. The controller marks them disconnected and starts the grace-period // timer; if they reconnect in time, their assignment is preserved. @@ -832,6 +843,112 @@ protected void handleCommandScalableTopicLookup( }); } + // --- Scalable topics namespace watcher --- + + private final java.util.concurrent.ConcurrentHashMap + scalableTopicsWatchers = new java.util.concurrent.ConcurrentHashMap<>(); + + @Override + protected void handleCommandWatchScalableTopics( + org.apache.pulsar.common.api.proto.CommandWatchScalableTopics cmd) { + checkArgument(state == State.Connected); + + final long watchId = cmd.getWatchId(); + final String namespaceStr = cmd.getNamespace(); + log.debug().attr("namespace", namespaceStr).attr("watchId", watchId) + .log("Received WatchScalableTopics"); + + if (!scalableTopicsEnabled) { + ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId, + ServerError.NotAllowedError, "Scalable topics are disabled on this broker")); + return; + } + + final NamespaceName namespaceName; + try { + namespaceName = NamespaceName.get(namespaceStr); + } catch (Exception e) { + log.warn().attr("namespace", namespaceStr).log("Invalid namespace in WatchScalableTopics"); + ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId, + ServerError.InvalidTopicName, "Invalid namespace: " + namespaceStr)); + return; + } + + final java.util.Map propertyFilters = new java.util.HashMap<>(); + for (int i = 0; i < cmd.getPropertyFiltersCount(); i++) { + var kv = cmd.getPropertyFilterAt(i); + propertyFilters.put(kv.getKey(), kv.getValue()); + } + + if (!this.service.getPulsar().isRunning()) { + log.warn("WatchScalableTopics rejected: broker not ready"); + ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId, + ServerError.ServiceNotReady, "Broker not ready")); + return; + } + + org.apache.pulsar.broker.resources.ScalableTopicResources resources = + service.getPulsar().getPulsarResources().getScalableTopicResources(); + if (resources == null) { + log.warn("WatchScalableTopics rejected: scalable topic resources not available"); + ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId, + ServerError.ServiceNotReady, "Scalable topic resources not available")); + return; + } + + isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS) + .thenAccept(isAuthorized -> { + if (!isAuthorized) { + final String msg = "Client is not authorized to WatchScalableTopics"; + log.warn().attr("principal", getPrincipal()).attr("namespace", namespaceName) + .log(msg); + ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId, + ServerError.AuthorizationError, msg)); + return; + } + var session = new org.apache.pulsar.broker.service.scalable + .ScalableTopicsWatcherSession(watchId, namespaceName, propertyFilters, + this, resources, service.getPulsar().getExecutor()); + scalableTopicsWatchers.put(watchId, session); + + session.start().exceptionally(ex -> { + Throwable cause = ex.getCause() != null ? ex.getCause() : ex; + log.warn().attr("namespace", namespaceName).exception(cause) + .log("WatchScalableTopics failed"); + scalableTopicsWatchers.remove(watchId); + session.close(); + ctx.executor().execute(() -> ctx.writeAndFlush( + Commands.newWatchScalableTopicsError(watchId, + ServerError.UnknownError, cause.getMessage()))); + return null; + }); + }) + .exceptionally(ex -> { + logNamespaceNameAuthException(remoteAddress, "watch-scalable-topics", + getPrincipal(), Optional.of(namespaceName), ex); + ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId, + ServerError.AuthorizationError, + "Exception occurred while authorizing WatchScalableTopics")); + return null; + }); + } + + @Override + protected void handleCommandWatchScalableTopicsClose( + org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsClose cmd) { + // Same idempotent-close semantics as DAG watch / consumer close: per-cnx + // session, originating subscribe was authorized at create time, no per-call + // authz needed. Unknown watchId is a no-op. + checkArgument(state == State.Connected); + long watchId = cmd.getWatchId(); + log.debug().attr("watchId", watchId).log("Received WatchScalableTopicsClose"); + var session = scalableTopicsWatchers.remove(watchId); + if (session != null) { + session.close(); + } + } + @Override protected void handleCommandScalableTopicClose( CommandScalableTopicClose commandScalableTopicClose) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java new file mode 100644 index 0000000000000..8b07d52e59840 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import io.github.merlimat.slog.Logger; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import lombok.Getter; +import org.apache.pulsar.broker.resources.ScalableTopicMetadata; +import org.apache.pulsar.broker.resources.ScalableTopicResources; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; + +/** + * Broker-side handler for a multi-topic consumer's namespace watch session. + * + *

Watches the metadata store for scalable-topic create / modify / delete events + * under a given namespace, evaluates them against a (possibly empty) set of property + * filters, and pushes the matching set to the client. The client receives: + *

    + *
  • One {@code Snapshot} on subscribe (and on every reconnect-resync).
  • + *
  • One {@code Diff} per coalescing window when membership changes.
  • + *
+ * + *

Tied to a connection — drop the session when the channel goes away. The client + * re-opens a fresh watch on reconnect; the new session emits a fresh snapshot and + * the client reconciles locally. + * + *

Any broker can serve this role: every broker observes the same metadata events + * via the registered listener, so no coordinator is needed at the namespace level. + */ +public class ScalableTopicsWatcherSession { + + private static final Logger LOG = Logger.get(ScalableTopicsWatcherSession.class); + /** + * Window over which back-to-back metadata events are batched into one Diff frame. + * Small enough to feel "live", large enough to amortise rapid bursts (e.g. test + * setups that create N topics in a tight loop). + */ + private static final Duration COALESCE_WINDOW = Duration.ofMillis(50); + + private final Logger log; + + @Getter + private final long watchId; + private final NamespaceName namespace; + private final Map propertyFilters; + private final ServerCnx cnx; + private final ScalableTopicResources resources; + private final ScheduledExecutorService scheduler; + + /** {@code /topics//} — direct children are scalable topic records. */ + private final String basePath; + private final Consumer notificationListener = this::onNotification; + + /** + * Topics currently in the matching set. Maintained server-side so we can detect + * actual membership flips on Modified events (filter changes a topic in/out). + * Topic names are fully-qualified ({@code topic://tenant/ns/name}). + */ + private final Set currentSet = Collections.synchronizedSet(new HashSet<>()); + + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicBoolean snapshotEmitted = new AtomicBoolean(); + + // --- Coalescing state. All three fields guarded by `coalesceLock`. --- + private final Object coalesceLock = new Object(); + private final LinkedHashSet pendingAdded = new LinkedHashSet<>(); + private final LinkedHashSet pendingRemoved = new LinkedHashSet<>(); + private boolean flushScheduled = false; + + public ScalableTopicsWatcherSession(long watchId, + NamespaceName namespace, + Map propertyFilters, + ServerCnx cnx, + ScalableTopicResources resources, + ScheduledExecutorService scheduler) { + this.watchId = watchId; + this.namespace = namespace; + this.propertyFilters = propertyFilters == null ? Map.of() : propertyFilters; + this.cnx = cnx; + this.resources = resources; + this.scheduler = scheduler; + this.basePath = resources.namespacePath(namespace); + this.log = LOG.with() + .attr("namespace", namespace) + .attr("watchId", watchId) + .attr("filters", this.propertyFilters) + .build(); + } + + /** + * Start the watch: register the metadata listener first (so events during snapshot + * computation are queued, not lost), compute the initial filtered set, then emit + * the {@code Snapshot} frame. After that, deltas flow through the listener. + */ + public CompletableFuture start() { + // Register listener BEFORE computing the initial set: any event that arrives + // mid-snapshot is captured by the listener and either (a) already in the + // initial set we're about to emit, in which case the redundant Add is a no-op + // on the client (set semantics), or (b) genuinely newer than the snapshot, in + // which case it correctly flows through as a Diff after the Snapshot frame. + resources.getStore().registerListener(notificationListener); + + return resources.findScalableTopicsByPropertiesAsync(namespace, propertyFilters) + .thenAccept(initialTopics -> { + if (closed.get()) { + return; + } + // Replace currentSet under sync to avoid races with onNotification. + synchronized (currentSet) { + currentSet.clear(); + currentSet.addAll(initialTopics); + } + snapshotEmitted.set(true); + log.info().attr("topics", initialTopics.size()).log("Initial snapshot"); + cnx.ctx().writeAndFlush( + Commands.newWatchScalableTopicsSnapshot(watchId, initialTopics)); + }); + } + + /** + * Visible for testing — invoked by the metadata-store listener registered in + * {@link #start()}. Hot-path: filter quickly to events under our namespace's base + * path, ignore subtree paths (subscriptions, controller lock, etc.), then evaluate. + */ + void onNotification(Notification notification) { + if (closed.get()) { + return; + } + String path = notification.getPath(); + // Direct child only: /topics///. Skip sub-paths + // like ...//subscriptions/... or ...//controller. + if (!path.startsWith(basePath + "/")) { + return; + } + String rest = path.substring(basePath.length() + 1); + if (rest.indexOf('/') >= 0) { + return; + } + + String topicName = TopicName.get("topic", namespace, Codec.decode(rest)).toString(); + + if (notification.getType() == NotificationType.Deleted) { + if (currentSet.remove(topicName)) { + enqueueRemoved(topicName); + } + return; + } + + // Created or Modified — fetch the new value to evaluate the filter against. + TopicName tn = TopicName.get(topicName); + resources.getScalableTopicMetadataAsync(tn, true) + .whenComplete((optMd, ex) -> { + if (closed.get()) { + return; + } + if (ex != null) { + log.warn().attr("topic", topicName).exceptionMessage(ex) + .log("Failed to load scalable topic metadata for filter eval"); + return; + } + boolean wasInSet = currentSet.contains(topicName); + boolean shouldBeInSet = optMd.isPresent() && matchesFilters(optMd.get()); + if (!wasInSet && shouldBeInSet) { + currentSet.add(topicName); + enqueueAdded(topicName); + } else if (wasInSet && !shouldBeInSet) { + currentSet.remove(topicName); + enqueueRemoved(topicName); + } + }); + } + + private boolean matchesFilters(ScalableTopicMetadata metadata) { + if (propertyFilters.isEmpty()) { + return true; + } + Map p = metadata.getProperties(); + if (p == null) { + return false; + } + for (var e : propertyFilters.entrySet()) { + if (!e.getValue().equals(p.get(e.getKey()))) { + return false; + } + } + return true; + } + + private void enqueueAdded(String topic) { + synchronized (coalesceLock) { + // Cancel out a pending remove (e.g. rapid remove-then-add of same topic). + pendingRemoved.remove(topic); + pendingAdded.add(topic); + scheduleFlush(); + } + } + + private void enqueueRemoved(String topic) { + synchronized (coalesceLock) { + pendingAdded.remove(topic); + pendingRemoved.add(topic); + scheduleFlush(); + } + } + + private void scheduleFlush() { + // Caller holds coalesceLock. + if (flushScheduled) { + return; + } + flushScheduled = true; + scheduler.schedule(this::flushPending, COALESCE_WINDOW.toMillis(), TimeUnit.MILLISECONDS); + } + + private void flushPending() { + if (closed.get()) { + return; + } + Set added; + Set removed; + synchronized (coalesceLock) { + added = pendingAdded.isEmpty() ? Set.of() : new LinkedHashSet<>(pendingAdded); + removed = pendingRemoved.isEmpty() ? Set.of() : new LinkedHashSet<>(pendingRemoved); + pendingAdded.clear(); + pendingRemoved.clear(); + flushScheduled = false; + } + if (added.isEmpty() && removed.isEmpty()) { + return; + } + // Wait until the initial snapshot was emitted: any deltas that fire before the + // snapshot is sent have already been folded into currentSet via onNotification's + // wasInSet check (same set we built the snapshot from), so they're correctly + // represented. We just need to send them AFTER the snapshot frame. + if (!snapshotEmitted.get()) { + // Re-defer: the snapshot future is short-lived, retry shortly. + scheduler.schedule(this::flushPending, COALESCE_WINDOW.toMillis(), TimeUnit.MILLISECONDS); + // Re-enqueue what we drained, since the next flush will rebuild from pending. + synchronized (coalesceLock) { + pendingAdded.addAll(added); + pendingRemoved.addAll(removed); + flushScheduled = true; + } + return; + } + log.info().attr("added", added.size()).attr("removed", removed.size()).log("Pushing diff"); + cnx.ctx().writeAndFlush(Commands.newWatchScalableTopicsDiff(watchId, added, removed)); + } + + /** + * Drop the session. The metadata-store listener stays registered (the store API + * does not expose unregister), but {@link #onNotification} short-circuits on the + * closed flag, so further events are inert. + */ + public void close() { + closed.set(true); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 02e03a491d071..bbbe8276193f6 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1774,6 +1774,90 @@ public static ByteBuf newScalableTopicAssignmentUpdate(long consumerId, return serializeWithSize(cmd); } + /** + * Client -> Broker: open a scalable-topics watch session. + * + * @param watchId client-assigned watch identifier + * @param namespace tenant/namespace to scope the watch to + * @param consumerName optional caller identity (carried for a future namespace + * coordinator); pass {@code null} if not yet assigned + * @param propertyFilters AND filters; empty / null = match all topics in the namespace + */ + public static ByteBuf newWatchScalableTopics(long watchId, String namespace, + String consumerName, + java.util.Map propertyFilters) { + BaseCommand cmd = localCmd(Type.WATCH_SCALABLE_TOPICS); + org.apache.pulsar.common.api.proto.CommandWatchScalableTopics watch = + cmd.setWatchScalableTopics() + .setWatchId(watchId) + .setNamespace(namespace); + if (consumerName != null) { + watch.setConsumerName(consumerName); + } + if (propertyFilters != null) { + for (var entry : propertyFilters.entrySet()) { + watch.addPropertyFilter() + .setKey(entry.getKey()) + .setValue(entry.getValue()); + } + } + return serializeWithSize(cmd); + } + + public static ByteBuf newWatchScalableTopicsClose(long watchId) { + BaseCommand cmd = localCmd(Type.WATCH_SCALABLE_TOPICS_CLOSE); + cmd.setWatchScalableTopicsClose().setWatchId(watchId); + return serializeWithSize(cmd); + } + + /** + * Broker -> Client: emit a full snapshot of the matching topic set. Sent on initial + * subscribe and on every reconnect-resync; the client replaces its local state. + */ + public static ByteBuf newWatchScalableTopicsSnapshot(long watchId, + java.util.Collection topics) { + BaseCommand cmd = new BaseCommand().setType(Type.WATCH_SCALABLE_TOPICS_UPDATE); + var update = cmd.setWatchScalableTopicsUpdate().setWatchId(watchId); + var snapshot = update.setSnapshot(); + for (String t : topics) { + snapshot.addTopic(t); + } + return serializeWithSize(cmd); + } + + /** + * Broker -> Client: emit an incremental membership change. Either {@code added} or + * {@code removed} (or both) may be empty. Apply removed before added when both + * appear together. + */ + public static ByteBuf newWatchScalableTopicsDiff(long watchId, + java.util.Collection added, + java.util.Collection removed) { + BaseCommand cmd = new BaseCommand().setType(Type.WATCH_SCALABLE_TOPICS_UPDATE); + var update = cmd.setWatchScalableTopicsUpdate().setWatchId(watchId); + var diff = update.setDiff(); + if (added != null) { + for (String t : added) { + diff.addAdded(t); + } + } + if (removed != null) { + for (String t : removed) { + diff.addRemoved(t); + } + } + return serializeWithSize(cmd); + } + + public static ByteBuf newWatchScalableTopicsError(long watchId, ServerError error, String message) { + BaseCommand cmd = new BaseCommand().setType(Type.WATCH_SCALABLE_TOPICS_UPDATE); + cmd.setWatchScalableTopicsUpdate() + .setWatchId(watchId) + .setError(error) + .setMessage(message); + return serializeWithSize(cmd); + } + public static ByteBuf serializeWithSize(BaseCommand cmd) { return serializeWithPrecalculatedSerializedSize(cmd, cmd.getSerializedSize()); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index 0eb2b1b9670dd..bca87683f279c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -514,6 +514,21 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception handleCommandScalableTopicAssignmentUpdate(cmd.getScalableTopicAssignmentUpdate()); break; + case WATCH_SCALABLE_TOPICS: + checkArgument(cmd.hasWatchScalableTopics()); + handleCommandWatchScalableTopics(cmd.getWatchScalableTopics()); + break; + + case WATCH_SCALABLE_TOPICS_UPDATE: + checkArgument(cmd.hasWatchScalableTopicsUpdate()); + handleCommandWatchScalableTopicsUpdate(cmd.getWatchScalableTopicsUpdate()); + break; + + case WATCH_SCALABLE_TOPICS_CLOSE: + checkArgument(cmd.hasWatchScalableTopicsClose()); + handleCommandWatchScalableTopicsClose(cmd.getWatchScalableTopicsClose()); + break; + default: break; } @@ -807,6 +822,23 @@ protected void handleCommandScalableTopicAssignmentUpdate( throw new UnsupportedOperationException(); } + protected void handleCommandWatchScalableTopics( + org.apache.pulsar.common.api.proto.CommandWatchScalableTopics commandWatchScalableTopics) { + throw new UnsupportedOperationException(); + } + + protected void handleCommandWatchScalableTopicsUpdate( + org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsUpdate + commandWatchScalableTopicsUpdate) { + throw new UnsupportedOperationException(); + } + + protected void handleCommandWatchScalableTopicsClose( + org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsClose + commandWatchScalableTopicsClose) { + throw new UnsupportedOperationException(); + } + private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) { NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 20bd8ab818513..cba80ebe5d4b6 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -940,6 +940,50 @@ message CommandScalableTopicAssignmentUpdate { required ScalableConsumerAssignment assignment = 2; } +// Multi-topic consumer watcher: subscribes to the union of scalable topics in a +// namespace that match a (possibly empty) set of property filters. The broker keeps +// pushing updates as topics enter or leave the matching set. See +// `multi-topic-consumer-design.md` for the full design. + +// Client -> Broker: open a watch session. +message CommandWatchScalableTopics { + required uint64 watch_id = 1; // Client-assigned watch ID + required string namespace = 2; // tenant/namespace + // Optional AND filters; empty list means "match all topics in the namespace". + repeated KeyValue property_filters = 3; + // Caller identity. Carried so a future namespace-level subscription coordinator + // can key per-consumer assignments without a protocol bump. + optional string consumer_name = 4; +} + +// Snapshot of the full matching set. Sent on initial subscribe and on every +// reconnect-resync. The client replaces its local set with this list. +message ScalableTopicsSnapshot { + repeated string topics = 1; +} + +// Incremental membership change. Apply removed before added when both are present. +message ScalableTopicsDiff { + repeated string added = 1; + repeated string removed = 2; +} + +// Broker -> Client: either Snapshot or Diff; error fields are populated only if the +// initial subscribe failed (snapshot/diff absent in that case). +message CommandWatchScalableTopicsUpdate { + required uint64 watch_id = 1; + optional ScalableTopicsSnapshot snapshot = 2; + optional ScalableTopicsDiff diff = 3; + + optional ServerError error = 4; + optional string message = 5; +} + +// Client -> Broker: close the watch session. +message CommandWatchScalableTopicsClose { + required uint64 watch_id = 1; +} + message CommandGetSchema { required uint64 request_id = 1; required string topic = 2; @@ -1185,6 +1229,10 @@ message BaseCommand { SCALABLE_TOPIC_SUBSCRIBE = 73; SCALABLE_TOPIC_SUBSCRIBE_RESPONSE = 74; SCALABLE_TOPIC_ASSIGNMENT_UPDATE = 75; + + WATCH_SCALABLE_TOPICS = 76; + WATCH_SCALABLE_TOPICS_UPDATE = 77; + WATCH_SCALABLE_TOPICS_CLOSE = 78; } @@ -1276,4 +1324,8 @@ message BaseCommand { optional CommandScalableTopicSubscribe scalableTopicSubscribe = 73; optional CommandScalableTopicSubscribeResponse scalableTopicSubscribeResponse = 74; optional CommandScalableTopicAssignmentUpdate scalableTopicAssignmentUpdate = 75; + + optional CommandWatchScalableTopics watchScalableTopics = 76; + optional CommandWatchScalableTopicsUpdate watchScalableTopicsUpdate = 77; + optional CommandWatchScalableTopicsClose watchScalableTopicsClose = 78; } From 2703e35c6955953c10ac8cfc10b61e10a4d7f66a Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 30 Apr 2026 20:24:58 -0700 Subject: [PATCH 2/6] [feat][client] PIP-468: V5 client-side namespace scalable-topics watcher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the long-lived client session that opens a CommandWatchScalableTopics on a broker connection, hands the initial Snapshot to start()'s future, and forwards subsequent Snapshot / Diff frames to a Listener. Client wiring - New ScalableTopicsWatcherSession callback interface in pulsar-client (broker-side adds it to ClientCnx via cnx.scalableTopicsWatchers, parallel to dagWatchSessions / scalableConsumerSessions). Closed in bulk on channel-inactive; cleared in the same teardown path as the existing watch sessions. - ClientCnx.handleCommandWatchScalableTopicsUpdate dispatches by Snapshot / Diff / Error (terminal). Exposes register/remove for the watcher. - LightProto generates getAddedsCount / getRemovedsCount for the diff fields (always-append-s rule); commented at the call site. V5 ScalableTopicsWatcher - Mirrors DagWatchClient in shape: opens connection to service URL, registers itself on the cnx, sends WatchScalableTopics, awaits the first WatchScalableTopicsUpdate (Snapshot). - start() resolves with the initial topic set so callers can populate their per-topic-consumer map before the listener attaches; subsequent Snapshots flow through onSnapshot() on the listener. - Reconnect path: connectionClosed() schedules a retry with Backoff (100 ms initial, 30 s cap). On reconnect the broker pushes a fresh Snapshot which the listener applies as a full-state replacement — self-healing across any disconnect duration. - Carries optional consumerName from day one as a hook for the future namespace-level subscription coordinator (see design doc). Tests - V5ScalableTopicsWatcherTest covers create/delete event delivery, AND property filtering (and that updating a topic out of the filter doesn't surface), and that the initial Snapshot reflects pre-existing topics. Drives the package-private ScalableTopicsWatcher via reflection (the multi-topic consumer wrappers come in a follow-up PR; once those land the test will go through the public builder API). --- .../api/v5/V5ScalableTopicsWatcherTest.java | 234 ++++++++++++++ .../client/impl/v5/ScalableTopicsWatcher.java | 300 ++++++++++++++++++ .../apache/pulsar/client/impl/ClientCnx.java | 76 +++++ .../impl/ScalableTopicsWatcherSession.java | 60 ++++ 4 files changed, 670 insertions(+) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java create mode 100644 pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableTopicsWatcherSession.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java new file mode 100644 index 0000000000000..20df7d68616aa --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.NamespaceName; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +/** + * End-to-end coverage for the broker-side namespace scalable-topics watcher. + * + *

Uses reflection to construct a {@code ScalableTopicsWatcher} (package-private + * in the v5 module) directly against the test cluster's v4 client. Verifies: + *

    + *
  • Initial snapshot reflects pre-existing topics.
  • + *
  • Topic create / delete fires {@code Diff} events with the right names.
  • + *
  • Property filters narrow the matching set, and updating a topic's properties + * to fall outside the filter emits {@code Removed}.
  • + *
+ */ +public class V5ScalableTopicsWatcherTest extends V5ClientBaseTest { + + /** Captures every Snapshot/Diff so tests can assert on the cumulative state. */ + private static final class CapturingListener { + final Set currentSet = ConcurrentHashMap.newKeySet(); + final List> snapshots = new CopyOnWriteArrayList<>(); + // (added, removed) per diff + final List, List>> diffs = new CopyOnWriteArrayList<>(); + + void onSnapshot(List topics) { + snapshots.add(List.copyOf(topics)); + currentSet.clear(); + currentSet.addAll(topics); + } + + void onDiff(List added, List removed) { + diffs.add(Map.entry(List.copyOf(added), List.copyOf(removed))); + currentSet.removeAll(removed); + currentSet.addAll(added); + } + } + + @Test + public void watcherEmitsCreateAndDeleteEvents() throws Exception { + NamespaceName ns = NamespaceName.get(getNamespace()); + + WatcherHandle handle = openWatcher(ns, Map.of(), null); + try { + // Initial snapshot should be empty (or whatever pre-existing topics; the + // shared cluster gives us a fresh per-test namespace so it should be empty). + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .until(() -> !handle.listener.snapshots.isEmpty()); + assertEquals(handle.listener.snapshots.get(0).size(), 0, + "fresh namespace should yield empty initial snapshot"); + + String topicA = ns + "/a-" + UUID.randomUUID().toString().substring(0, 8); + String topicB = ns + "/b-" + UUID.randomUUID().toString().substring(0, 8); + admin.scalableTopics().createScalableTopic("topic://" + topicA, 1); + admin.scalableTopics().createScalableTopic("topic://" + topicB, 1); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(handle.listener.currentSet, Set.of( + "topic://" + topicA, "topic://" + topicB))); + + admin.scalableTopics().deleteScalableTopic("topic://" + topicA, true); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(handle.listener.currentSet, Set.of("topic://" + topicB))); + } finally { + handle.close(); + } + } + + @Test + public void watcherFiltersByProperty() throws Exception { + NamespaceName ns = NamespaceName.get(getNamespace()); + + WatcherHandle handle = openWatcher(ns, Map.of("owner", "alice"), null); + try { + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .until(() -> !handle.listener.snapshots.isEmpty()); + + String aliceTopic = ns + "/alice-" + UUID.randomUUID().toString().substring(0, 8); + String bobTopic = ns + "/bob-" + UUID.randomUUID().toString().substring(0, 8); + admin.scalableTopics().createScalableTopic("topic://" + aliceTopic, 1, + Map.of("owner", "alice")); + admin.scalableTopics().createScalableTopic("topic://" + bobTopic, 1, + Map.of("owner", "bob")); + + // Only alice's topic should surface — bob is filtered out. + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(handle.listener.currentSet, Set.of("topic://" + aliceTopic))); + + // bob's topic exists but never reaches the watcher's set. + assertTrue(!handle.listener.currentSet.contains("topic://" + bobTopic)); + } finally { + handle.close(); + } + } + + @Test + public void watcherEmptyFilterSubscribesToEveryTopic() throws Exception { + NamespaceName ns = NamespaceName.get(getNamespace()); + + // Pre-create a topic before the watcher opens so the initial snapshot has work + // to do (proves the snapshot path, not just the live-event path). + String preTopic = ns + "/pre-" + UUID.randomUUID().toString().substring(0, 8); + admin.scalableTopics().createScalableTopic("topic://" + preTopic, 1); + + WatcherHandle handle = openWatcher(ns, Map.of(), null); + try { + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(handle.listener.currentSet, Set.of("topic://" + preTopic))); + } finally { + handle.close(); + } + } + + // --- Helpers --- + + /** Bundle of watcher state needed to clean up at the end of a test. */ + private static final class WatcherHandle implements AutoCloseable { + final Object watcher; + final CapturingListener listener; + final CountDownLatch ready; + + WatcherHandle(Object watcher, CapturingListener listener, CountDownLatch ready) { + this.watcher = watcher; + this.listener = listener; + this.ready = ready; + } + + @Override + public void close() throws Exception { + Method closeMethod = watcher.getClass().getDeclaredMethod("close"); + closeMethod.setAccessible(true); + closeMethod.invoke(watcher); + } + } + + /** + * Construct a {@code ScalableTopicsWatcher} via reflection (it's package-private + * inside the v5 module, but we drive it directly from the broker tests). Wire up + * a {@link CapturingListener}, call {@code start()}, and return a handle. The + * initial snapshot is delivered via the future + {@code onSnapshot} on the + * listener for uniform handling — see the watcher's javadoc. + */ + private WatcherHandle openWatcher(NamespaceName ns, Map filters, + String consumerName) throws Exception { + // Pull the underlying v4 client out of v5Client so we can construct the + // (package-private) watcher directly. + Field v4Field = v5Client.getClass().getDeclaredField("v4Client"); + v4Field.setAccessible(true); + PulsarClientImpl v4 = (PulsarClientImpl) v4Field.get(v5Client); + + Class watcherCls = Class.forName( + "org.apache.pulsar.client.impl.v5.ScalableTopicsWatcher"); + Constructor ctor = watcherCls.getDeclaredConstructor( + PulsarClientImpl.class, NamespaceName.class, Map.class, String.class); + ctor.setAccessible(true); + Object watcher = ctor.newInstance(v4, ns, filters, consumerName); + + CapturingListener listener = new CapturingListener(); + + // The watcher's Listener is a package-private nested interface; use a Proxy. + Class listenerCls = Class.forName( + "org.apache.pulsar.client.impl.v5.ScalableTopicsWatcher$Listener"); + Object listenerProxy = java.lang.reflect.Proxy.newProxyInstance( + listenerCls.getClassLoader(), new Class[]{listenerCls}, (proxy, method, args) -> { + switch (method.getName()) { + case "onSnapshot": + listener.onSnapshot((List) args[0]); + break; + case "onDiff": + listener.onDiff((List) args[0], (List) args[1]); + break; + default: + // Object methods (toString, hashCode, equals) — defaults are fine. + } + return null; + }); + + Method setListener = watcherCls.getDeclaredMethod("setListener", listenerCls); + setListener.setAccessible(true); + setListener.invoke(watcher, listenerProxy); + + Method start = watcherCls.getDeclaredMethod("start"); + start.setAccessible(true); + java.util.concurrent.CompletableFuture> startFut = + (java.util.concurrent.CompletableFuture>) start.invoke(watcher); + + CountDownLatch ready = new CountDownLatch(1); + startFut.thenAccept(initialTopics -> { + // start()'s future delivers the initial snapshot — feed it into the listener + // for uniform handling with subsequent snapshots. + listener.onSnapshot(initialTopics); + ready.countDown(); + }); + if (!ready.await(10, TimeUnit.SECONDS)) { + throw new AssertionError("watcher did not deliver initial snapshot in 10s"); + } + return new WatcherHandle(watcher, listener, ready); + } +} diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java new file mode 100644 index 0000000000000..905a502907435 --- /dev/null +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.v5; + +import io.github.merlimat.slog.Logger; +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.ScalableTopicsWatcherSession; +import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.Backoff; + +/** + * Client-side manager for a namespace-wide scalable-topics watch session. + * + *

Opens a {@code CommandWatchScalableTopics} on attach, awaits the initial + * {@code Snapshot}, and forwards subsequent {@code Snapshot} / {@code Diff} events + * to a {@link Listener}. On connection drop, schedules a reconnect with backoff; + * when the new session lands, the broker pushes a fresh snapshot and the client + * applies it as a state replacement (no missed events). + * + *

Mirrors {@link DagWatchClient} in shape; the wire path is different — watch + * subscribe is fire-and-forget (no request/response), with the initial snapshot + * arriving as the first {@code WatchScalableTopicsUpdate}. + */ +final class ScalableTopicsWatcher implements ScalableTopicsWatcherSession, AutoCloseable { + + private static final Logger LOG = Logger.get(ScalableTopicsWatcher.class); + private static final AtomicLong WATCH_ID_GENERATOR = new AtomicLong(0); + + /** + * Listener for membership events. The watcher delivers events on the netty IO + * thread; implementations should not block. + */ + interface Listener { + /** Full set; replace any local state. */ + void onSnapshot(List topics); + + /** Apply removed before added (covers rapid remove-then-add of same name). */ + void onDiff(List added, List removed); + } + + private final Logger log; + private final PulsarClientImpl v4Client; + private final NamespaceName namespace; + private final Map propertyFilters; + private final String consumerName; + private final long watchId; + private final Backoff reconnectBackoff; + + private final CompletableFuture> initialSnapshotFuture = new CompletableFuture<>(); + private volatile Listener listener; + private volatile ClientCnx cnx; + private volatile boolean closed = false; + + /** + * @param v4Client the underlying v4 client used to open broker connections + * @param namespace namespace to watch + * @param propertyFilters AND filters; empty map = match all + * @param consumerName optional caller identity (carried for a future namespace + * coordinator); pass {@code null} if not yet assigned + */ + ScalableTopicsWatcher(PulsarClientImpl v4Client, NamespaceName namespace, + Map propertyFilters, String consumerName) { + this.v4Client = v4Client; + this.namespace = namespace; + this.propertyFilters = propertyFilters == null ? Map.of() : propertyFilters; + this.consumerName = consumerName; + this.watchId = WATCH_ID_GENERATOR.incrementAndGet(); + this.reconnectBackoff = Backoff.builder() + .initialDelay(Duration.ofMillis(100)) + .maxBackoff(Duration.ofSeconds(30)) + .build(); + this.log = LOG.with() + .attr("namespace", namespace) + .attr("watchId", watchId) + .attr("filters", this.propertyFilters) + .build(); + } + + /** + * Open the watch session on a connection to the configured service URL. + * Resolves with the initial snapshot's topic list. After this returns, every + * subsequent {@code Snapshot} / {@code Diff} flows through {@link #setListener}. + */ + CompletableFuture> start() { + v4Client.getConnectionToServiceUrl() + .thenAccept(this::attach) + .exceptionally(ex -> { + initialSnapshotFuture.completeExceptionally(ex); + return null; + }); + return initialSnapshotFuture; + } + + private void attach(ClientCnx newCnx) { + if (closed) { + return; + } + this.cnx = newCnx; + if (!newCnx.isSupportsScalableTopics()) { + initialSnapshotFuture.completeExceptionally( + new PulsarClientException.FeatureNotSupportedException( + "Broker does not support scalable topics", + PulsarClientException.FailedFeatureCheck.SupportsScalableTopics)); + return; + } + newCnx.registerScalableTopicsWatcher(watchId, this); + newCnx.ctx().writeAndFlush(Commands.newWatchScalableTopics( + watchId, namespace.toString(), consumerName, propertyFilters)) + .addListener(writeFuture -> { + if (!writeFuture.isSuccess()) { + newCnx.removeScalableTopicsWatcher(watchId); + if (!initialSnapshotFuture.isDone()) { + initialSnapshotFuture.completeExceptionally( + new PulsarClientException(writeFuture.cause())); + } + } + }); + } + + @Override + public void onSnapshot(List topics) { + if (closed) { + return; + } + log.info().attr("topics", topics.size()).log("Snapshot received"); + // Reset backoff on every successful snapshot — that's the broker confirming + // the session is live and our local state is consistent. + reconnectBackoff.reset(); + if (!initialSnapshotFuture.isDone()) { + initialSnapshotFuture.complete(topics); + // The listener is set by the caller AFTER start() resolves, so the initial + // snapshot is delivered via the future, not via onSnapshot's fan-out. + return; + } + Listener l = listener; + if (l != null) { + try { + l.onSnapshot(topics); + } catch (Exception e) { + log.error().exception(e).log("Listener threw on snapshot"); + } + } + } + + @Override + public void onDiff(List added, List removed) { + if (closed) { + return; + } + log.info().attr("added", added.size()).attr("removed", removed.size()) + .log("Diff received"); + Listener l = listener; + if (l != null) { + try { + l.onDiff(added, removed); + } catch (Exception e) { + log.error().exception(e).log("Listener threw on diff"); + } + } + } + + @Override + public void onError(ServerError error, String message) { + log.warn().attr("error", error).attr("message", message) + .log("WatchScalableTopics rejected"); + if (!initialSnapshotFuture.isDone()) { + initialSnapshotFuture.completeExceptionally( + new PulsarClientException("WatchScalableTopics failed: " + error + + (message != null ? " - " + message : ""))); + } + } + + @Override + public void connectionClosed() { + log.warn("Scalable-topics watcher connection closed"); + cnx = null; + if (closed) { + return; + } + if (!initialSnapshotFuture.isDone()) { + // Initial subscribe never completed — surface the failure rather than + // retrying silently behind the caller. + initialSnapshotFuture.completeExceptionally( + new PulsarClientException( + "Connection closed before initial scalable-topics snapshot arrived")); + return; + } + scheduleReconnect(); + } + + private void scheduleReconnect() { + if (closed) { + return; + } + long delayMs = reconnectBackoff.next().toMillis(); + log.info().attr("delayMs", delayMs).log("Scheduling watcher reconnect"); + v4Client.timer().newTimeout(timeout -> reconnect(), + delayMs, TimeUnit.MILLISECONDS); + } + + private void reconnect() { + if (closed) { + return; + } + v4Client.getConnectionToServiceUrl() + .thenAccept(newCnx -> { + if (closed) { + return; + } + if (!newCnx.isSupportsScalableTopics()) { + log.warn().log("Watcher reconnect: broker doesn't support scalable topics"); + scheduleReconnect(); + return; + } + this.cnx = newCnx; + newCnx.registerScalableTopicsWatcher(watchId, this); + newCnx.ctx().writeAndFlush(Commands.newWatchScalableTopics( + watchId, namespace.toString(), consumerName, propertyFilters)) + .addListener(writeFuture -> { + if (!writeFuture.isSuccess()) { + newCnx.removeScalableTopicsWatcher(watchId); + log.warn().exceptionMessage(writeFuture.cause()) + .log("Watcher reconnect write failed"); + scheduleReconnect(); + } + // On success the broker will push a fresh Snapshot; the + // client converts it to a full-state replace via the + // listener (or the listener absorbs it as part of the + // multi-topic-consumer reconciliation). + }); + }) + .exceptionally(ex -> { + log.warn().exceptionMessage(ex).log("Watcher reconnect failed"); + scheduleReconnect(); + return null; + }); + } + + /** + * Set the listener that receives {@code Snapshot} / {@code Diff} events. Should + * be called after {@link #start()} resolves — the initial snapshot is delivered + * via that future, not via the listener. + */ + void setListener(Listener listener) { + this.listener = listener; + } + + /** + * Visible for testing — exposes the current set the broker has emitted so far. + * In production, the listener is the source of truth. + */ + Set currentSetForTesting() { + return new HashSet<>(); + } + + long watchId() { + return watchId; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + ClientCnx c = cnx; + if (c != null) { + c.removeScalableTopicsWatcher(watchId); + c.ctx().writeAndFlush(Commands.newWatchScalableTopicsClose(watchId)); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index f45fc2c91f4fd..e9762c0d22fee 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -178,6 +178,18 @@ public class ClientCnx extends PulsarHandler { .concurrencyLevel(1) .build(); + /** + * Per-watcher namespace scalable-topics watch sessions, keyed by the + * {@code watchId} chosen by the client. The broker tags every + * {@link org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsUpdate} with + * this id. + */ + private final ConcurrentLongHashMap scalableTopicsWatchers = + ConcurrentLongHashMap.newBuilder() + .expectedItems(4) + .concurrencyLevel(1) + .build(); + private final CompletableFuture connectionFuture = new CompletableFuture(); private final ConcurrentLinkedQueue requestTimeoutQueue = new ConcurrentLinkedQueue<>(); @@ -375,6 +387,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { topicListWatchers.forEach((__, watcher) -> watcher.connectionClosed(this)); dagWatchSessions.forEach((__, session) -> session.connectionClosed()); scalableConsumerSessions.forEach((__, session) -> session.connectionClosed()); + scalableTopicsWatchers.forEach((__, session) -> session.connectionClosed()); waitingLookupRequests.clear(); @@ -383,6 +396,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { topicListWatchers.clear(); dagWatchSessions.clear(); scalableConsumerSessions.clear(); + scalableTopicsWatchers.clear(); timeoutTask.cancel(true); } @@ -1427,6 +1441,68 @@ public void removeScalableConsumerSession(long consumerId) { scalableConsumerSessions.remove(consumerId); } + @Override + protected void handleCommandWatchScalableTopicsUpdate( + org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsUpdate cmd) { + checkArgument(state == State.Ready); + + long watchId = cmd.getWatchId(); + log.debug().attr("watchId", watchId).log("Received WatchScalableTopicsUpdate"); + + if (cmd.hasError()) { + // Error response — terminal for this watch (subscribe was rejected or the + // server failed to compute the initial set). Drop the local registration so + // a retry from the caller starts fresh. + ScalableTopicsWatcherSession session = scalableTopicsWatchers.remove(watchId); + if (session != null) { + session.onError(cmd.getError(), cmd.hasMessage() ? cmd.getMessage() : null); + } else { + log.warn().attr("watchId", watchId) + .log("Received scalable-topics watch error for unknown watcher"); + } + return; + } + + ScalableTopicsWatcherSession session = scalableTopicsWatchers.get(watchId); + if (session == null) { + log.warn().attr("watchId", watchId) + .log("Received scalable-topics watch update for unknown watcher"); + return; + } + if (cmd.hasSnapshot()) { + var snapshot = cmd.getSnapshot(); + java.util.List topics = new java.util.ArrayList<>(snapshot.getTopicsCount()); + for (int i = 0; i < snapshot.getTopicsCount(); i++) { + topics.add(snapshot.getTopicAt(i)); + } + session.onSnapshot(topics); + } else if (cmd.hasDiff()) { + var diff = cmd.getDiff(); + // LightProto pluralises count accessors with a trailing 's' on the field name, + // hence `getAddedsCount` / `getRemovedsCount`. + java.util.List added = new java.util.ArrayList<>(diff.getAddedsCount()); + for (int i = 0; i < diff.getAddedsCount(); i++) { + added.add(diff.getAddedAt(i)); + } + java.util.List removed = new java.util.ArrayList<>(diff.getRemovedsCount()); + for (int i = 0; i < diff.getRemovedsCount(); i++) { + removed.add(diff.getRemovedAt(i)); + } + session.onDiff(added, removed); + } else { + log.warn().attr("watchId", watchId) + .log("Received scalable-topics watch update with neither snapshot nor diff"); + } + } + + public void registerScalableTopicsWatcher(long watchId, ScalableTopicsWatcherSession watcher) { + scalableTopicsWatchers.put(watchId, watcher); + } + + public void removeScalableTopicsWatcher(long watchId) { + scalableTopicsWatchers.remove(watchId); + } + /** * check serverError and take appropriate action. *

    diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableTopicsWatcherSession.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableTopicsWatcherSession.java new file mode 100644 index 0000000000000..7b71dc208ec38 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableTopicsWatcherSession.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.List; +import org.apache.pulsar.common.api.proto.ServerError; + +/** + * Client-side callback for a namespace-wide scalable-topics watch session. + * + *

    The broker pushes either a full {@code Snapshot} (initial subscribe and on every + * reconnect resync) or an incremental {@code Diff}. Implementations apply the + * snapshot as a full state replacement and the diff as a set delta — both are + * idempotent and self-healing across reconnects. + * + *

    Implemented by the V5 client's {@code ScalableTopicsWatcher}. + */ +public interface ScalableTopicsWatcherSession { + + /** + * Full set of topics currently matching the watch's filters. The implementation + * should replace any local state derived from prior events with this snapshot. + */ + void onSnapshot(List topics); + + /** + * Incremental membership change. Apply {@code removed} before {@code added} + * (covers a rapid remove-then-add of the same topic name within a coalescing + * window). + */ + void onDiff(List added, List removed); + + /** + * The broker rejected the watch (e.g. authz, broker shutting down). + * Implementations should fail any pending start future and stop emitting events. + */ + void onError(ServerError error, String message); + + /** + * The underlying connection dropped. Implementations should treat any local set + * as stale until the next snapshot arrives. + */ + void connectionClosed(); +} From 8d718d364936b7ad0beac49c52b0edea98641702 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 30 Apr 2026 21:42:19 -0700 Subject: [PATCH 3/6] [feat] PIP-468: Hash-based snapshot skip on watcher reconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Optimisation suggested in the design review: instead of always re-sending the full topic list when a multi-topic watcher reconnects, the client tracks a hash of its current set and passes it back. The broker computes the same hash over its freshly-evaluated set and stays silent when they match — the watch is live, the client's local state is correct, and any future Diffs flow as usual. For the common short-blip reconnect where membership didn't change, the wire cost collapses to a single inbound WatchScalableTopics frame and zero outbound. Protocol - CommandWatchScalableTopics gains optional current_hash. First subscribe sends it absent (broker emits initial Snapshot unconditionally); reconnect sends the cached hash. - Hash function reuses TopicList.calculateHash (CRC32C of sorted topic names) for parity with the existing CommandGetTopicsOfNamespace watch. Broker - ScalableTopicsWatcherSession.start() takes the optional clientHash. If it equals the hash of the freshly computed initial set, start() skips emitting the Snapshot and just marks the snapshot phase done so deltas can flow. Client - ScalableTopicsWatcher mirrors the broker's view in a synchronised set, updated on every Snapshot replace and Diff apply. On reconnect it hands the hash of that set to the broker. - attach() (first subscribe) keeps sending null hash so the broker emits the initial Snapshot unconditionally — start()'s future depends on it. Tests - ScalableTopicsWatcherSessionHashTest covers all three branches: no hash → snapshot emitted; matching hash → no snapshot; differing hash → snapshot emitted. Uses mocked ServerCnx + LocalMemoryMetadataStore. - Existing V5ScalableTopicsWatcherTest is unaffected — first-subscribe still passes null and gets a Snapshot, which is what the start() future depends on. Design doc updated to describe the hash short-circuit. --- multi-topic-consumer-design.md | 227 ++++++++++++++++++ .../pulsar/broker/service/ServerCnx.java | 3 +- .../ScalableTopicsWatcherSession.java | 21 ++ .../ScalableTopicsWatcherSessionHashTest.java | 147 ++++++++++++ .../client/impl/v5/ScalableTopicsWatcher.java | 62 ++++- .../pulsar/common/protocol/Commands.java | 11 +- pulsar-common/src/main/proto/PulsarApi.proto | 6 + 7 files changed, 466 insertions(+), 11 deletions(-) create mode 100644 multi-topic-consumer-design.md create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSessionHashTest.java diff --git a/multi-topic-consumer-design.md b/multi-topic-consumer-design.md new file mode 100644 index 0000000000000..24968ef0075e3 --- /dev/null +++ b/multi-topic-consumer-design.md @@ -0,0 +1,227 @@ +# PIP-468: Multi-topic consumers via namespace + property filters + +## Goal + +Let `QueueConsumer` and `StreamConsumer` subscribe to the union of scalable topics +in a namespace that match a (possibly empty) set of property filters. The +matching set must follow live: when topics enter or leave the filter, the +consumer attaches/detaches automatically. Reliable across reconnects and broker +bounces. + +Out of scope: `CheckpointConsumer` (stays single-topic), partitioned and +non-partitioned legacy topics. + +## Protocol + +New broker command family, parallel to the existing `DagWatchSession` / +`ScalableConsumerSession`. Lives in `ClientCnx.scalableTopicsWatchers` keyed by +watch id. + +``` +CommandWatchScalableTopics { + request_id + watch_id + consumer_name // future namespace-coordinator hook + namespace // tenant/ns + property_filters: Map // empty == match all +} +CommandWatchScalableTopicsClose { watch_id } + +ScalableTopicsUpdate { + watch_id + event: oneof { + Snapshot { topics: [string] } // initial + every reconnect resync + Diff { added: [string], removed: [string] } + } +} +``` + +Wire size: snapshot bounded by the 5 MB frame limit. Defer pagination — we'll +revisit if a namespace ever exceeds. + +## Reliability + +Resync-on-reconnect, no durable event log on the broker. + +1. **Subscribe.** Broker registers a metadata listener on `/topics//` + *first*, computes the initial filtered set, emits `Snapshot{topics}`, + populates server-side `currentSet`. Events that arrived during snapshot + computation are replayed after via the same dedup logic the deltas use. +2. **Steady state.** Each metadata event is filter-evaluated. If membership + changes relative to `currentSet`, broker emits `Diff{added, removed}` and + updates `currentSet`. Server-side coalescing window (~50–100 ms) batches + nearby events into one `Diff`. +3. **Disconnect.** Broker drops the session. +4. **Reconnect with hash short-circuit.** Client maintains a hash over its + current set (CRC32C of the sorted topic names — same function used by + `CommandGetTopicsOfNamespace`). On reconnect it re-opens the watch and + passes that hash via `CommandWatchScalableTopics.current_hash`. The broker + computes the same hash over its freshly evaluated set: + - **Hash matches** → broker emits **nothing**. The watch is live, the + client's local state is correct, future deltas flow as usual. + - **Hash differs** → broker emits a fresh `Snapshot`. Client reconciles + locally — anything in the new snapshot it didn't know about → open + per-topic consumer; anything it knew but missing → close + flush. + + First subscribe sends an empty / absent hash, which the broker treats as + "no prior state" and unconditionally emits the initial `Snapshot`. + +Properties: idempotent (snapshot is full-set replace; diffs are set ops); +self-healing across any disconnect duration; no broker affinity (any broker can +serve, every broker has the same metadata events). For the common short-blip +reconnect where membership didn't change, the wire cost collapses to a single +inbound `WatchScalableTopics` frame and zero outbound. + +Apply order on `Diff`: `removed` before `added` — covers rapid remove-then-add +of the same topic name. + +## Filter evaluation: broker-side + +Initial set is computed via the existing `findScalableTopicsByPropertiesAsync`. +Each metadata event: +- `Created` / `Modified`: read the new value, evaluate filter, emit `Diff` if + membership changed vs `currentSet`. +- `Deleted`: no new value; emit `Removed` if topic was in `currentSet`. + +Cost: one filter evaluation per metadata event per watcher. Filters are tiny; +fine. + +## Per-topic consumer failure handling + +When `Diff{added: [t]}` fires, the multi-topic consumer opens a +`PerTopicConsumer` for `t`. If the per-topic subscribe fails (broker transient, +topic auth, etc.), retry forever with exponential backoff capped at 15–30 min. +Surface a single warn log per topic per backoff cycle, no user-visible error. +Matches v4 consumer reconnect semantics. + +## Consumer-side wiring + +``` +MultiTopicConsumer + ├─ ScalableTopicsWatcher (long-lived broker session) + ├─ Map // open/close on Diff + ├─ shared LinkedTransferQueue // multiplex from per-topic queues + └─ event handler: + Snapshot(topics) → diff against current keyset; close stale, open new + Diff(added, removed) → close removed (flush acks first), then open added + per-topic add failure → background retry with exp backoff (15–30 min cap) +``` + +`PerTopicConsumer`: +- For QueueConsumer: existing `ScalableQueueConsumer`. +- For StreamConsumer: existing `ScalableStreamConsumer`. + +Same subscription name on every topic. The existing per-topic +`SubscriptionCoordinator` handles per-topic segment assignment. + +## Ack routing + +### QueueConsumer (no cumulative ack) + +Each enqueued `MessageV5` already carries `(topic, segmentId, msgId)`. On +`acknowledge(msg)`: + +```java +PerTopicQueueConsumer ptc = perTopicConsumers.get(msg.topic()); +if (ptc != null) { + ptc.acknowledge(msg); // delegates to existing per-segment routing +} +``` + +No position vector, no snapshots. + +### StreamConsumer (cumulative ack) + +State on the multi-topic consumer: + +```java +Map> latestDeliveredPerTopicSegment; +``` + +On per-topic message arrival into the multiplexed queue: +1. Atomically update `latestDeliveredPerTopicSegment[topic][seg] = msgId`. +2. Snapshot the whole map (deep copy). +3. Attach snapshot to `MessageV5` before enqueue. + +On `acknowledgeCumulative(msg)`: + +```java +for ((topic, vector) : msg.snapshot) { + PerTopicConsumer ptc = perTopicConsumers.get(topic); + if (ptc != null) { + ptc.ackUpToVector(vector); // best-effort, async — fire and forget + } + // ptc null = topic was Removed since msg was enqueued; flushed at Remove +} +``` + +The position vector means: +- For msg's own topic: vector covers up to `msg` itself. +- For every other topic: vector covers whatever was last delivered from that + topic at the moment `msg` was enqueued. Topics with nothing delivered before + `msg`: empty / absent inner map → no-op. + +`ackUpToVector(Map)` is the existing single-topic +cumulative-ack walk extracted as a public entry point on +`ScalableStreamConsumer`. + +### Topic Removed mid-stream + +Cleanup order: flush pending acks up to `latestDelivered[topic]` → close +per-topic consumer → remove from the map. Future messages won't reference the +topic in their snapshots. If the topic re-appears later (re-Added), a fresh +per-topic consumer subscribes and resumes from broker-side cursor (Stream) or +new shared subscription state (Queue). + +### Cost note (StreamConsumer) + +Snapshot size = O(topics × segments-per-topic-currently-assigned). For 100 +topics × 1 segment each ≈ 100 entries ≈ a few KB per message. Fine for the +regime we're targeting; revisit if hot. + +## API + +Add to `QueueConsumerBuilder` and `StreamConsumerBuilder`: + +```java +Builder namespace(String namespace); +Builder namespace(String namespace, Map propertyFilters); +``` + +Mutually exclusive with `.topic(name)`; calling both fails at `subscribe()`. +Filters can't be set outside these methods, so users can't accidentally pass +filters without a namespace. + +While there: +- Drop `topicPattern(...)` from the v5 builders. +- Tighten `topic(String...)` → `topic(String)`. + +## Cross-topic load balancing — deferred + +Today: every multi-topic consumer in `(namespace, filter, subscriptionName)` +subscribes to *all* matching topics. Each topic's per-topic +`SubscriptionCoordinator` independently picks one consumer per segment — for +StreamConsumer (Exclusive) the assignment is randomized across consumers, in +expectation roughly balanced but not deterministic. + +Future: namespace-level `MultiTopicSubscriptionCoordinator` per +`(namespace, subscriptionName)` that: +- Persists `Map` for the group. +- On consumer attach: assigns a slice of the matching topic set. +- On topic Added/Removed: rebalances. +- Server-side filters the watcher's emitted set to the consumer's slice. + +The hook in this design: `consumer_name` is part of `CommandWatchScalableTopics` +*now*, so the future coordinator has the identity it needs. Client code doesn't +change — the watcher just emits a constrained `Snapshot` / `Diff`. + +## Authz + +`CommandWatchScalableTopics`: requires namespace-level READ +(`NamespaceOperation.GET_TOPICS`, same as `listScalableTopics`). Per-topic +subscribe that follows: normal topic-level authz applies. + +## Open question kept for later + +Property change events that don't move a topic in/out of the filter are silent. +If a future use case needs them, add a `PropertiesChanged` variant. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 5d85acb0974a0..109cf1b489bc5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -880,6 +880,7 @@ protected void handleCommandWatchScalableTopics( var kv = cmd.getPropertyFilterAt(i); propertyFilters.put(kv.getKey(), kv.getValue()); } + final String clientHash = cmd.hasCurrentHash() ? cmd.getCurrentHash() : null; if (!this.service.getPulsar().isRunning()) { log.warn("WatchScalableTopics rejected: broker not ready"); @@ -909,7 +910,7 @@ protected void handleCommandWatchScalableTopics( } var session = new org.apache.pulsar.broker.service.scalable .ScalableTopicsWatcherSession(watchId, namespaceName, propertyFilters, - this, resources, service.getPulsar().getExecutor()); + clientHash, this, resources, service.getPulsar().getExecutor()); scalableTopicsWatchers.put(watchId, session); session.start().exceptionally(ex -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java index 8b07d52e59840..91a8bddc67010 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java @@ -37,6 +37,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.topics.TopicList; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; @@ -75,6 +76,13 @@ public class ScalableTopicsWatcherSession { private final long watchId; private final NamespaceName namespace; private final Map propertyFilters; + /** + * Hash of the topic set the client believes it has. Optional: present on + * reconnect, absent on first subscribe. When equal to the freshly-computed + * hash on the broker, {@link #start()} skips emitting the initial snapshot — + * the client's state is already correct. + */ + private final String clientHash; private final ServerCnx cnx; private final ScalableTopicResources resources; private final ScheduledExecutorService scheduler; @@ -102,12 +110,14 @@ public class ScalableTopicsWatcherSession { public ScalableTopicsWatcherSession(long watchId, NamespaceName namespace, Map propertyFilters, + String clientHash, ServerCnx cnx, ScalableTopicResources resources, ScheduledExecutorService scheduler) { this.watchId = watchId; this.namespace = namespace; this.propertyFilters = propertyFilters == null ? Map.of() : propertyFilters; + this.clientHash = clientHash; this.cnx = cnx; this.resources = resources; this.scheduler = scheduler; @@ -143,6 +153,17 @@ public CompletableFuture start() { currentSet.addAll(initialTopics); } snapshotEmitted.set(true); + // Hash short-circuit: if the client tells us it already has this + // exact set (reconnect within an unchanged window), don't waste + // bytes on the wire. Future Diffs flow as usual. + if (clientHash != null) { + String serverHash = TopicList.calculateHash(initialTopics); + if (clientHash.equals(serverHash)) { + log.info().attr("topics", initialTopics.size()) + .log("Reconnect hash matched; skipping snapshot"); + return; + } + } log.info().attr("topics", initialTopics.size()).log("Initial snapshot"); cnx.ctx().writeAndFlush( Commands.newWatchScalableTopicsSnapshot(watchId, initialTopics)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSessionHashTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSessionHashTest.java new file mode 100644 index 0000000000000..1e9d84897cdd2 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSessionHashTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultChannelPromise; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.concurrent.ImmediateEventExecutor; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.pulsar.broker.resources.ScalableTopicMetadata; +import org.apache.pulsar.broker.resources.ScalableTopicResources; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Coverage for the reconnect-hash short-circuit in + * {@link ScalableTopicsWatcherSession}: when the client's reported hash matches + * the freshly-computed server hash, {@code start()} returns without writing a + * Snapshot frame; otherwise it emits Snapshot as usual. + */ +public class ScalableTopicsWatcherSessionHashTest { + + private MetadataStoreExtended store; + private ScalableTopicResources resources; + private ServerCnx cnx; + private ChannelHandlerContext ctx; + private ScheduledExecutorService scheduler; + + @BeforeMethod + public void setUp() throws Exception { + store = new LocalMemoryMetadataStore("memory:local", + MetadataStoreConfig.builder().build()); + resources = new ScalableTopicResources(store, 30); + cnx = mock(ServerCnx.class); + ctx = mock(ChannelHandlerContext.class); + when(cnx.ctx()).thenReturn(ctx); + // writeAndFlush returns a ChannelFuture; provide a no-op promise to keep the + // call chain happy. The ScalableTopicsWatcherSession ignores the return. + when(ctx.writeAndFlush(org.mockito.ArgumentMatchers.any())) + .thenReturn(new DefaultChannelPromise(new EmbeddedChannel(), + ImmediateEventExecutor.INSTANCE)); + scheduler = Executors.newSingleThreadScheduledExecutor(); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws Exception { + if (scheduler != null) { + scheduler.shutdownNow(); + } + if (store != null) { + store.close(); + } + } + + private ScalableTopicMetadata meta() { + return ScalableTopicMetadata.builder().epoch(0).nextSegmentId(1) + .properties(Map.of()).build(); + } + + @Test + public void noHashOnFirstSubscribeEmitsSnapshot() throws Exception { + NamespaceName ns = NamespaceName.get("tenant/ns-fresh-" + suffix()); + TopicName tn = TopicName.get("topic://" + ns + "/t1"); + resources.createScalableTopicAsync(tn, meta()).get(); + + var session = new ScalableTopicsWatcherSession(1L, ns, Map.of(), + /* clientHash= */ null, cnx, resources, scheduler); + session.start().get(); + + // First subscribe: no hash → broker must emit one Snapshot frame. + verify(ctx, atLeastOnce()).writeAndFlush(org.mockito.ArgumentMatchers.any(ByteBuf.class)); + } + + @Test + public void matchingHashOnReconnectSkipsSnapshot() throws Exception { + NamespaceName ns = NamespaceName.get("tenant/ns-match-" + suffix()); + TopicName tn = TopicName.get("topic://" + ns + "/t1"); + resources.createScalableTopicAsync(tn, meta()).get(); + + // Client believes the namespace contains exactly this one topic. Compute the + // matching hash with the same function the broker uses (TopicList.crc32c). + String matchingHash = TopicList.calculateHash(List.of(tn.toString())); + + var session = new ScalableTopicsWatcherSession(2L, ns, Map.of(), + /* clientHash= */ matchingHash, cnx, resources, scheduler); + session.start().get(); + + // Hash matched → no Snapshot frame written. Future Diffs would still flow + // through writeAndFlush, but start() itself must stay silent. + verify(ctx, never()).writeAndFlush(org.mockito.ArgumentMatchers.any(ByteBuf.class)); + } + + @Test + public void differingHashOnReconnectStillEmitsSnapshot() throws Exception { + NamespaceName ns = NamespaceName.get("tenant/ns-diff-" + suffix()); + TopicName tn = TopicName.get("topic://" + ns + "/t1"); + resources.createScalableTopicAsync(tn, meta()).get(); + + // Client thinks the set is something different. Broker must emit a fresh + // Snapshot so the client can reconcile. + String staleHash = TopicList.calculateHash(List.of("topic://" + ns + "/something-else")); + + var session = new ScalableTopicsWatcherSession(3L, ns, Map.of(), + /* clientHash= */ staleHash, cnx, resources, scheduler); + session.start().get(); + + verify(ctx, atLeastOnce()).writeAndFlush(org.mockito.ArgumentMatchers.any(ByteBuf.class)); + } + + private static String suffix() { + return UUID.randomUUID().toString().substring(0, 8); + } +} diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java index 905a502907435..8f46074126ebe 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java @@ -20,6 +20,7 @@ import io.github.merlimat.slog.Logger; import java.time.Duration; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -34,6 +35,7 @@ import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.topics.TopicList; import org.apache.pulsar.common.util.Backoff; /** @@ -75,6 +77,14 @@ interface Listener { private final Backoff reconnectBackoff; private final CompletableFuture> initialSnapshotFuture = new CompletableFuture<>(); + /** + * Mirrors the broker's view of the matching set so we can hand a hash on + * reconnect — when the set hasn't changed, the broker skips emitting a + * fresh snapshot. Updated on every Snapshot replace and Diff apply. + * Synchronised because Snapshot / Diff arrive on the netty thread but the + * hash may be read on a reconnect callback running elsewhere. + */ + private final Set currentSet = Collections.synchronizedSet(new HashSet<>()); private volatile Listener listener; private volatile ClientCnx cnx; private volatile boolean closed = false; @@ -132,8 +142,11 @@ private void attach(ClientCnx newCnx) { return; } newCnx.registerScalableTopicsWatcher(watchId, this); + // First subscribe: send no hash so the broker emits the initial snapshot + // unconditionally. snapshot is what populates initialSnapshotFuture. newCnx.ctx().writeAndFlush(Commands.newWatchScalableTopics( - watchId, namespace.toString(), consumerName, propertyFilters)) + watchId, namespace.toString(), consumerName, propertyFilters, + /* currentHash= */ null)) .addListener(writeFuture -> { if (!writeFuture.isSuccess()) { newCnx.removeScalableTopicsWatcher(watchId); @@ -154,6 +167,11 @@ public void onSnapshot(List topics) { // Reset backoff on every successful snapshot — that's the broker confirming // the session is live and our local state is consistent. reconnectBackoff.reset(); + // Replace local set so the next reconnect computes the right hash. + synchronized (currentSet) { + currentSet.clear(); + currentSet.addAll(topics); + } if (!initialSnapshotFuture.isDone()) { initialSnapshotFuture.complete(topics); // The listener is set by the caller AFTER start() resolves, so the initial @@ -177,6 +195,11 @@ public void onDiff(List added, List removed) { } log.info().attr("added", added.size()).attr("removed", removed.size()) .log("Diff received"); + // Apply removed before added — covers rapid remove-then-add of the same name. + synchronized (currentSet) { + currentSet.removeAll(removed); + currentSet.addAll(added); + } Listener l = listener; if (l != null) { try { @@ -187,6 +210,21 @@ public void onDiff(List added, List removed) { } } + /** + * Snapshot the current set under lock so the hash + the watch frame agree on + * the same view. CRC32C of sorted topic names — same function used by + * {@code CommandGetTopicsOfNamespace} so behaviour matches the existing + * topic-list watch on the wire. + */ + private String currentSetHash() { + synchronized (currentSet) { + if (currentSet.isEmpty()) { + return TopicList.calculateHash(java.util.List.of()); + } + return TopicList.calculateHash(new HashSet<>(currentSet)); + } + } + @Override public void onError(ServerError error, String message) { log.warn().attr("error", error).attr("message", message) @@ -242,8 +280,15 @@ private void reconnect() { } this.cnx = newCnx; newCnx.registerScalableTopicsWatcher(watchId, this); + // Reconnect: send the hash of our current set. If the broker's + // freshly-computed hash matches, it skips emitting a Snapshot — + // the watch is live and our local state is correct. Future + // Diffs flow as usual; if the hash differs the broker sends a + // Snapshot which we apply as a full-state replace. + String hash = currentSetHash(); newCnx.ctx().writeAndFlush(Commands.newWatchScalableTopics( - watchId, namespace.toString(), consumerName, propertyFilters)) + watchId, namespace.toString(), consumerName, + propertyFilters, hash)) .addListener(writeFuture -> { if (!writeFuture.isSuccess()) { newCnx.removeScalableTopicsWatcher(watchId); @@ -251,10 +296,6 @@ private void reconnect() { .log("Watcher reconnect write failed"); scheduleReconnect(); } - // On success the broker will push a fresh Snapshot; the - // client converts it to a full-state replace via the - // listener (or the listener absorbs it as part of the - // multi-topic-consumer reconciliation). }); }) .exceptionally(ex -> { @@ -274,11 +315,14 @@ void setListener(Listener listener) { } /** - * Visible for testing — exposes the current set the broker has emitted so far. - * In production, the listener is the source of truth. + * Visible for testing — snapshot of the current set. In production, the + * listener is the source of truth; this method exists so tests can poke the + * watcher's hash-tracking state directly. */ Set currentSetForTesting() { - return new HashSet<>(); + synchronized (currentSet) { + return new HashSet<>(currentSet); + } } long watchId() { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index bbbe8276193f6..0f0ef5c5065af 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1783,9 +1783,15 @@ public static ByteBuf newScalableTopicAssignmentUpdate(long consumerId, * coordinator); pass {@code null} if not yet assigned * @param propertyFilters AND filters; empty / null = match all topics in the namespace */ + /** + * @param currentHash optional hash of the client's currently-known topic set. + * Pass on reconnect to let the broker skip the snapshot when + * state hasn't changed; pass {@code null} on first subscribe. + */ public static ByteBuf newWatchScalableTopics(long watchId, String namespace, String consumerName, - java.util.Map propertyFilters) { + java.util.Map propertyFilters, + String currentHash) { BaseCommand cmd = localCmd(Type.WATCH_SCALABLE_TOPICS); org.apache.pulsar.common.api.proto.CommandWatchScalableTopics watch = cmd.setWatchScalableTopics() @@ -1801,6 +1807,9 @@ public static ByteBuf newWatchScalableTopics(long watchId, String namespace, .setValue(entry.getValue()); } } + if (currentHash != null) { + watch.setCurrentHash(currentHash); + } return serializeWithSize(cmd); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index cba80ebe5d4b6..327e360a550a2 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -954,6 +954,12 @@ message CommandWatchScalableTopics { // Caller identity. Carried so a future namespace-level subscription coordinator // can key per-consumer assignments without a protocol bump. optional string consumer_name = 4; + // Hash of the topics the client believes are currently in its set. Sent on + // reconnect; absent on first subscribe. If it matches the broker's freshly + // computed hash, the broker skips emitting the initial Snapshot — the client's + // local state is already correct and future Diffs will flow as usual. Same + // hash function as CommandGetTopicsOfNamespace (CRC32C over sorted topics). + optional string current_hash = 5; } // Snapshot of the full matching set. Sent on initial subscribe and on every From 0352e99fe9f82ea5eb4cf83001dc5a8d48f4681f Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 1 May 2026 09:13:38 -0700 Subject: [PATCH 4/6] =?UTF-8?q?Drop=20design=20doc=20from=20PR=20=E2=80=94?= =?UTF-8?q?=20content=20folded=20into=20PR=20description?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- multi-topic-consumer-design.md | 227 --------------------------------- 1 file changed, 227 deletions(-) delete mode 100644 multi-topic-consumer-design.md diff --git a/multi-topic-consumer-design.md b/multi-topic-consumer-design.md deleted file mode 100644 index 24968ef0075e3..0000000000000 --- a/multi-topic-consumer-design.md +++ /dev/null @@ -1,227 +0,0 @@ -# PIP-468: Multi-topic consumers via namespace + property filters - -## Goal - -Let `QueueConsumer` and `StreamConsumer` subscribe to the union of scalable topics -in a namespace that match a (possibly empty) set of property filters. The -matching set must follow live: when topics enter or leave the filter, the -consumer attaches/detaches automatically. Reliable across reconnects and broker -bounces. - -Out of scope: `CheckpointConsumer` (stays single-topic), partitioned and -non-partitioned legacy topics. - -## Protocol - -New broker command family, parallel to the existing `DagWatchSession` / -`ScalableConsumerSession`. Lives in `ClientCnx.scalableTopicsWatchers` keyed by -watch id. - -``` -CommandWatchScalableTopics { - request_id - watch_id - consumer_name // future namespace-coordinator hook - namespace // tenant/ns - property_filters: Map // empty == match all -} -CommandWatchScalableTopicsClose { watch_id } - -ScalableTopicsUpdate { - watch_id - event: oneof { - Snapshot { topics: [string] } // initial + every reconnect resync - Diff { added: [string], removed: [string] } - } -} -``` - -Wire size: snapshot bounded by the 5 MB frame limit. Defer pagination — we'll -revisit if a namespace ever exceeds. - -## Reliability - -Resync-on-reconnect, no durable event log on the broker. - -1. **Subscribe.** Broker registers a metadata listener on `/topics//` - *first*, computes the initial filtered set, emits `Snapshot{topics}`, - populates server-side `currentSet`. Events that arrived during snapshot - computation are replayed after via the same dedup logic the deltas use. -2. **Steady state.** Each metadata event is filter-evaluated. If membership - changes relative to `currentSet`, broker emits `Diff{added, removed}` and - updates `currentSet`. Server-side coalescing window (~50–100 ms) batches - nearby events into one `Diff`. -3. **Disconnect.** Broker drops the session. -4. **Reconnect with hash short-circuit.** Client maintains a hash over its - current set (CRC32C of the sorted topic names — same function used by - `CommandGetTopicsOfNamespace`). On reconnect it re-opens the watch and - passes that hash via `CommandWatchScalableTopics.current_hash`. The broker - computes the same hash over its freshly evaluated set: - - **Hash matches** → broker emits **nothing**. The watch is live, the - client's local state is correct, future deltas flow as usual. - - **Hash differs** → broker emits a fresh `Snapshot`. Client reconciles - locally — anything in the new snapshot it didn't know about → open - per-topic consumer; anything it knew but missing → close + flush. - - First subscribe sends an empty / absent hash, which the broker treats as - "no prior state" and unconditionally emits the initial `Snapshot`. - -Properties: idempotent (snapshot is full-set replace; diffs are set ops); -self-healing across any disconnect duration; no broker affinity (any broker can -serve, every broker has the same metadata events). For the common short-blip -reconnect where membership didn't change, the wire cost collapses to a single -inbound `WatchScalableTopics` frame and zero outbound. - -Apply order on `Diff`: `removed` before `added` — covers rapid remove-then-add -of the same topic name. - -## Filter evaluation: broker-side - -Initial set is computed via the existing `findScalableTopicsByPropertiesAsync`. -Each metadata event: -- `Created` / `Modified`: read the new value, evaluate filter, emit `Diff` if - membership changed vs `currentSet`. -- `Deleted`: no new value; emit `Removed` if topic was in `currentSet`. - -Cost: one filter evaluation per metadata event per watcher. Filters are tiny; -fine. - -## Per-topic consumer failure handling - -When `Diff{added: [t]}` fires, the multi-topic consumer opens a -`PerTopicConsumer` for `t`. If the per-topic subscribe fails (broker transient, -topic auth, etc.), retry forever with exponential backoff capped at 15–30 min. -Surface a single warn log per topic per backoff cycle, no user-visible error. -Matches v4 consumer reconnect semantics. - -## Consumer-side wiring - -``` -MultiTopicConsumer - ├─ ScalableTopicsWatcher (long-lived broker session) - ├─ Map // open/close on Diff - ├─ shared LinkedTransferQueue // multiplex from per-topic queues - └─ event handler: - Snapshot(topics) → diff against current keyset; close stale, open new - Diff(added, removed) → close removed (flush acks first), then open added - per-topic add failure → background retry with exp backoff (15–30 min cap) -``` - -`PerTopicConsumer`: -- For QueueConsumer: existing `ScalableQueueConsumer`. -- For StreamConsumer: existing `ScalableStreamConsumer`. - -Same subscription name on every topic. The existing per-topic -`SubscriptionCoordinator` handles per-topic segment assignment. - -## Ack routing - -### QueueConsumer (no cumulative ack) - -Each enqueued `MessageV5` already carries `(topic, segmentId, msgId)`. On -`acknowledge(msg)`: - -```java -PerTopicQueueConsumer ptc = perTopicConsumers.get(msg.topic()); -if (ptc != null) { - ptc.acknowledge(msg); // delegates to existing per-segment routing -} -``` - -No position vector, no snapshots. - -### StreamConsumer (cumulative ack) - -State on the multi-topic consumer: - -```java -Map> latestDeliveredPerTopicSegment; -``` - -On per-topic message arrival into the multiplexed queue: -1. Atomically update `latestDeliveredPerTopicSegment[topic][seg] = msgId`. -2. Snapshot the whole map (deep copy). -3. Attach snapshot to `MessageV5` before enqueue. - -On `acknowledgeCumulative(msg)`: - -```java -for ((topic, vector) : msg.snapshot) { - PerTopicConsumer ptc = perTopicConsumers.get(topic); - if (ptc != null) { - ptc.ackUpToVector(vector); // best-effort, async — fire and forget - } - // ptc null = topic was Removed since msg was enqueued; flushed at Remove -} -``` - -The position vector means: -- For msg's own topic: vector covers up to `msg` itself. -- For every other topic: vector covers whatever was last delivered from that - topic at the moment `msg` was enqueued. Topics with nothing delivered before - `msg`: empty / absent inner map → no-op. - -`ackUpToVector(Map)` is the existing single-topic -cumulative-ack walk extracted as a public entry point on -`ScalableStreamConsumer`. - -### Topic Removed mid-stream - -Cleanup order: flush pending acks up to `latestDelivered[topic]` → close -per-topic consumer → remove from the map. Future messages won't reference the -topic in their snapshots. If the topic re-appears later (re-Added), a fresh -per-topic consumer subscribes and resumes from broker-side cursor (Stream) or -new shared subscription state (Queue). - -### Cost note (StreamConsumer) - -Snapshot size = O(topics × segments-per-topic-currently-assigned). For 100 -topics × 1 segment each ≈ 100 entries ≈ a few KB per message. Fine for the -regime we're targeting; revisit if hot. - -## API - -Add to `QueueConsumerBuilder` and `StreamConsumerBuilder`: - -```java -Builder namespace(String namespace); -Builder namespace(String namespace, Map propertyFilters); -``` - -Mutually exclusive with `.topic(name)`; calling both fails at `subscribe()`. -Filters can't be set outside these methods, so users can't accidentally pass -filters without a namespace. - -While there: -- Drop `topicPattern(...)` from the v5 builders. -- Tighten `topic(String...)` → `topic(String)`. - -## Cross-topic load balancing — deferred - -Today: every multi-topic consumer in `(namespace, filter, subscriptionName)` -subscribes to *all* matching topics. Each topic's per-topic -`SubscriptionCoordinator` independently picks one consumer per segment — for -StreamConsumer (Exclusive) the assignment is randomized across consumers, in -expectation roughly balanced but not deterministic. - -Future: namespace-level `MultiTopicSubscriptionCoordinator` per -`(namespace, subscriptionName)` that: -- Persists `Map` for the group. -- On consumer attach: assigns a slice of the matching topic set. -- On topic Added/Removed: rebalances. -- Server-side filters the watcher's emitted set to the consumer's slice. - -The hook in this design: `consumer_name` is part of `CommandWatchScalableTopics` -*now*, so the future coordinator has the identity it needs. Client code doesn't -change — the watcher just emits a constrained `Snapshot` / `Diff`. - -## Authz - -`CommandWatchScalableTopics`: requires namespace-level READ -(`NamespaceOperation.GET_TOPICS`, same as `listScalableTopics`). Per-topic -subscribe that follows: normal topic-level authz applies. - -## Open question kept for later - -Property change events that don't move a topic in/out of the filter are silent. -If a future use case needs them, add a `PropertiesChanged` variant. From e18f9757b44a8f9107d4a532c76332f8fb33e0c3 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 1 May 2026 09:57:50 -0700 Subject: [PATCH 5/6] [fix] PIP-468: Drop consumer_name; switch update event to proto oneof MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review on #25648: - Remove the optional consumer_name field on CommandWatchScalableTopics. Originally carried as a future-coordinator hook, but YAGNI — dropping it now keeps the wire surface minimal; the eventual namespace-level coordinator can add the field back when it actually needs it. - Switch ScalableTopicsSnapshot / ScalableTopicsDiff on CommandWatchScalableTopicsUpdate from two parallel optionals to a proto oneof, so the protocol enforces mutual exclusivity. Client-side dispatch becomes a switch on getEventCase() with explicit handling for SNAPSHOT, DIFF, and NOT_SET. Knock-ons - Commands.newWatchScalableTopics drops the consumerName parameter. - ScalableTopicsWatcher's constructor matches. - V5ScalableTopicsWatcherTest's openWatcher helper drops the consumerName plumbing. --- .../api/v5/V5ScalableTopicsWatcherTest.java | 14 +++--- .../client/impl/v5/ScalableTopicsWatcher.java | 10 ++-- .../apache/pulsar/client/impl/ClientCnx.java | 49 +++++++++++-------- .../pulsar/common/protocol/Commands.java | 4 -- pulsar-common/src/main/proto/PulsarApi.proto | 17 ++++--- 5 files changed, 47 insertions(+), 47 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java index 20df7d68616aa..ffdf0b0d277aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java @@ -74,7 +74,7 @@ void onDiff(List added, List removed) { public void watcherEmitsCreateAndDeleteEvents() throws Exception { NamespaceName ns = NamespaceName.get(getNamespace()); - WatcherHandle handle = openWatcher(ns, Map.of(), null); + WatcherHandle handle = openWatcher(ns, Map.of()); try { // Initial snapshot should be empty (or whatever pre-existing topics; the // shared cluster gives us a fresh per-test namespace so it should be empty). @@ -105,7 +105,7 @@ public void watcherEmitsCreateAndDeleteEvents() throws Exception { public void watcherFiltersByProperty() throws Exception { NamespaceName ns = NamespaceName.get(getNamespace()); - WatcherHandle handle = openWatcher(ns, Map.of("owner", "alice"), null); + WatcherHandle handle = openWatcher(ns, Map.of("owner", "alice")); try { Awaitility.await().atMost(10, TimeUnit.SECONDS) .until(() -> !handle.listener.snapshots.isEmpty()); @@ -137,7 +137,7 @@ public void watcherEmptyFilterSubscribesToEveryTopic() throws Exception { String preTopic = ns + "/pre-" + UUID.randomUUID().toString().substring(0, 8); admin.scalableTopics().createScalableTopic("topic://" + preTopic, 1); - WatcherHandle handle = openWatcher(ns, Map.of(), null); + WatcherHandle handle = openWatcher(ns, Map.of()); try { Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(handle.listener.currentSet, Set.of("topic://" + preTopic))); @@ -175,8 +175,8 @@ public void close() throws Exception { * initial snapshot is delivered via the future + {@code onSnapshot} on the * listener for uniform handling — see the watcher's javadoc. */ - private WatcherHandle openWatcher(NamespaceName ns, Map filters, - String consumerName) throws Exception { + private WatcherHandle openWatcher(NamespaceName ns, Map filters) + throws Exception { // Pull the underlying v4 client out of v5Client so we can construct the // (package-private) watcher directly. Field v4Field = v5Client.getClass().getDeclaredField("v4Client"); @@ -186,9 +186,9 @@ private WatcherHandle openWatcher(NamespaceName ns, Map filters, Class watcherCls = Class.forName( "org.apache.pulsar.client.impl.v5.ScalableTopicsWatcher"); Constructor ctor = watcherCls.getDeclaredConstructor( - PulsarClientImpl.class, NamespaceName.class, Map.class, String.class); + PulsarClientImpl.class, NamespaceName.class, Map.class); ctor.setAccessible(true); - Object watcher = ctor.newInstance(v4, ns, filters, consumerName); + Object watcher = ctor.newInstance(v4, ns, filters); CapturingListener listener = new CapturingListener(); diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java index 8f46074126ebe..e7b1143adb306 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java @@ -72,7 +72,6 @@ interface Listener { private final PulsarClientImpl v4Client; private final NamespaceName namespace; private final Map propertyFilters; - private final String consumerName; private final long watchId; private final Backoff reconnectBackoff; @@ -93,15 +92,12 @@ interface Listener { * @param v4Client the underlying v4 client used to open broker connections * @param namespace namespace to watch * @param propertyFilters AND filters; empty map = match all - * @param consumerName optional caller identity (carried for a future namespace - * coordinator); pass {@code null} if not yet assigned */ ScalableTopicsWatcher(PulsarClientImpl v4Client, NamespaceName namespace, - Map propertyFilters, String consumerName) { + Map propertyFilters) { this.v4Client = v4Client; this.namespace = namespace; this.propertyFilters = propertyFilters == null ? Map.of() : propertyFilters; - this.consumerName = consumerName; this.watchId = WATCH_ID_GENERATOR.incrementAndGet(); this.reconnectBackoff = Backoff.builder() .initialDelay(Duration.ofMillis(100)) @@ -145,7 +141,7 @@ private void attach(ClientCnx newCnx) { // First subscribe: send no hash so the broker emits the initial snapshot // unconditionally. snapshot is what populates initialSnapshotFuture. newCnx.ctx().writeAndFlush(Commands.newWatchScalableTopics( - watchId, namespace.toString(), consumerName, propertyFilters, + watchId, namespace.toString(), propertyFilters, /* currentHash= */ null)) .addListener(writeFuture -> { if (!writeFuture.isSuccess()) { @@ -287,7 +283,7 @@ private void reconnect() { // Snapshot which we apply as a full-state replace. String hash = currentSetHash(); newCnx.ctx().writeAndFlush(Commands.newWatchScalableTopics( - watchId, namespace.toString(), consumerName, + watchId, namespace.toString(), propertyFilters, hash)) .addListener(writeFuture -> { if (!writeFuture.isSuccess()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index e9762c0d22fee..40c4e12aa0533 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1469,29 +1469,36 @@ protected void handleCommandWatchScalableTopicsUpdate( .log("Received scalable-topics watch update for unknown watcher"); return; } - if (cmd.hasSnapshot()) { - var snapshot = cmd.getSnapshot(); - java.util.List topics = new java.util.ArrayList<>(snapshot.getTopicsCount()); - for (int i = 0; i < snapshot.getTopicsCount(); i++) { - topics.add(snapshot.getTopicAt(i)); - } - session.onSnapshot(topics); - } else if (cmd.hasDiff()) { - var diff = cmd.getDiff(); - // LightProto pluralises count accessors with a trailing 's' on the field name, - // hence `getAddedsCount` / `getRemovedsCount`. - java.util.List added = new java.util.ArrayList<>(diff.getAddedsCount()); - for (int i = 0; i < diff.getAddedsCount(); i++) { - added.add(diff.getAddedAt(i)); + // Snapshot and diff are mutually exclusive via the proto oneof; switch on the + // generated event-case enum and unpack accordingly. + switch (cmd.getEventCase()) { + case SNAPSHOT -> { + var snapshot = cmd.getSnapshot(); + java.util.List topics = new java.util.ArrayList<>(snapshot.getTopicsCount()); + for (int i = 0; i < snapshot.getTopicsCount(); i++) { + topics.add(snapshot.getTopicAt(i)); + } + session.onSnapshot(topics); } - java.util.List removed = new java.util.ArrayList<>(diff.getRemovedsCount()); - for (int i = 0; i < diff.getRemovedsCount(); i++) { - removed.add(diff.getRemovedAt(i)); + case DIFF -> { + var diff = cmd.getDiff(); + // LightProto pluralises count accessors with a trailing 's' on the field name, + // hence `getAddedsCount` / `getRemovedsCount`. + java.util.List added = new java.util.ArrayList<>(diff.getAddedsCount()); + for (int i = 0; i < diff.getAddedsCount(); i++) { + added.add(diff.getAddedAt(i)); + } + java.util.List removed = new java.util.ArrayList<>(diff.getRemovedsCount()); + for (int i = 0; i < diff.getRemovedsCount(); i++) { + removed.add(diff.getRemovedAt(i)); + } + session.onDiff(added, removed); } - session.onDiff(added, removed); - } else { - log.warn().attr("watchId", watchId) - .log("Received scalable-topics watch update with neither snapshot nor diff"); + case NOT_SET -> log.warn().attr("watchId", watchId) + .log("Received scalable-topics watch update with no event payload"); + default -> log.warn().attr("watchId", watchId) + .attr("case", cmd.getEventCase()) + .log("Received scalable-topics watch update with unknown event case"); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 0f0ef5c5065af..2c0c74fb378b8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1789,7 +1789,6 @@ public static ByteBuf newScalableTopicAssignmentUpdate(long consumerId, * state hasn't changed; pass {@code null} on first subscribe. */ public static ByteBuf newWatchScalableTopics(long watchId, String namespace, - String consumerName, java.util.Map propertyFilters, String currentHash) { BaseCommand cmd = localCmd(Type.WATCH_SCALABLE_TOPICS); @@ -1797,9 +1796,6 @@ public static ByteBuf newWatchScalableTopics(long watchId, String namespace, cmd.setWatchScalableTopics() .setWatchId(watchId) .setNamespace(namespace); - if (consumerName != null) { - watch.setConsumerName(consumerName); - } if (propertyFilters != null) { for (var entry : propertyFilters.entrySet()) { watch.addPropertyFilter() diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 327e360a550a2..6d13529731777 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -951,15 +951,12 @@ message CommandWatchScalableTopics { required string namespace = 2; // tenant/namespace // Optional AND filters; empty list means "match all topics in the namespace". repeated KeyValue property_filters = 3; - // Caller identity. Carried so a future namespace-level subscription coordinator - // can key per-consumer assignments without a protocol bump. - optional string consumer_name = 4; // Hash of the topics the client believes are currently in its set. Sent on // reconnect; absent on first subscribe. If it matches the broker's freshly // computed hash, the broker skips emitting the initial Snapshot — the client's // local state is already correct and future Diffs will flow as usual. Same // hash function as CommandGetTopicsOfNamespace (CRC32C over sorted topics). - optional string current_hash = 5; + optional string current_hash = 4; } // Snapshot of the full matching set. Sent on initial subscribe and on every @@ -974,12 +971,16 @@ message ScalableTopicsDiff { repeated string removed = 2; } -// Broker -> Client: either Snapshot or Diff; error fields are populated only if the -// initial subscribe failed (snapshot/diff absent in that case). +// Broker -> Client: either Snapshot or Diff (mutually exclusive via oneof). When the +// initial subscribe fails, neither variant is set and `error`/`message` carry the +// failure reason. message CommandWatchScalableTopicsUpdate { required uint64 watch_id = 1; - optional ScalableTopicsSnapshot snapshot = 2; - optional ScalableTopicsDiff diff = 3; + + oneof event { + ScalableTopicsSnapshot snapshot = 2; + ScalableTopicsDiff diff = 3; + } optional ServerError error = 4; optional string message = 5; From 710a2e94c8c29114b8461c9ea64ed3e308f20af6 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 1 May 2026 14:29:08 -0700 Subject: [PATCH 6/6] =?UTF-8?q?[fix]=20PIP-468:=20Address=20review=20on=20?= =?UTF-8?q?watcher=20PR=20=E2=80=94=20listener=20leak,=20backoff=20reset?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two real bugs caught in the Claude Code review on #25648: 1. Metadata listener leak per ScalableTopicsWatcherSession. start() called resources.getStore().registerListener(...) for every watcher; close() only flipped a closed flag. MetadataStore exposes no unregisterListener, so every closed session left a stale listener registered for the broker's lifetime, and every metadata notification fanned out to all of them — linear dispatch tax. Fix mirrors the TopicResources pattern for TopicListener: register one shared listener at ScalableTopicResources construction time, maintain a Map registry. start() registers the session via resources.registerNamespaceListener(this); close() deregisters. The single fan-out filters down to direct children of the listener's namespace base path before dispatching. ScalableTopicsWatcherSession now implements ScalableTopicResources.NamespaceListener so the resources-level fan-out can call back into it directly. 2. ScalableTopicsWatcher reconnect backoff never reset on hash-matched reconnect. reconnectBackoff.reset() ran only inside onSnapshot. For the hash-skip path (the common short-blip happy case), the broker emits no Snapshot, so the backoff stayed at its last value and successive short reconnects compounded into an unbounded delay before the next attempt. Reset on write-success of the reconnect frame instead — that's the moment we know the connection is healthy regardless of whether the broker will follow up with a Snapshot or stay silent. --- .../resources/ScalableTopicResources.java | 91 +++++++++++++++++++ .../ScalableTopicsWatcherSession.java | 62 +++++++------ .../client/impl/v5/ScalableTopicsWatcher.java | 8 ++ 3 files changed, 132 insertions(+), 29 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java index 96d24f46b71ef..550212a4a1932 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; import lombok.CustomLog; @@ -35,6 +36,8 @@ import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; /** * Metadata store access for scalable topic metadata. @@ -71,10 +74,98 @@ public class ScalableTopicResources extends BaseResources private final MetadataCache subscriptionCache; private final MetadataCache consumerRegistrationCache; + /** + * Per-namespace listeners for scalable topic create / modify / delete events. + * Keyed by listener so each subscriber can deregister cleanly on close. The map + * is consulted from the single store-level listener registered at construction + * time, eliminating the listener leak that would otherwise occur every time a + * watcher session ends (the metadata store API has no + * {@code unregisterListener}). Mirrors the {@link TopicResources} + * pattern for {@code TopicListener}. + */ + private final Map namespaceListeners = + new ConcurrentHashMap<>(); + public ScalableTopicResources(MetadataStore store, int operationTimeoutSec) { super(store, ScalableTopicMetadata.class, operationTimeoutSec); this.subscriptionCache = store.getMetadataCache(SubscriptionMetadata.class); this.consumerRegistrationCache = store.getMetadataCache(ConsumerRegistration.class); + // Single shared metadata-store listener fans out to every registered watcher. + // Per-watcher registration happens via registerNamespaceListener; close() calls + // deregisterNamespaceListener so closed watchers are not on the dispatch list. + if (store instanceof MetadataStoreExtended ext) { + ext.registerListener(this::handleNotification); + } else { + store.registerListener(this::handleNotification); + } + } + + // --- Namespace-level scalable-topics listeners --- + + /** + * Listener for scalable-topic create / modify / delete events under a single + * namespace. The fan-out in {@link ScalableTopicResources} filters notifications + * to the listener's namespace and to direct topic records (skipping subtree paths + * like {@code /subscriptions/...} or {@code /controller}). + */ + public interface NamespaceListener { + /** Namespace this listener is scoped to. */ + NamespaceName getNamespaceName(); + + /** Called for every metadata event affecting a topic record in the namespace. */ + void onNotification(Notification notification); + } + + /** + * Register a per-namespace listener. The listener will receive every + * Created / Modified / Deleted event whose path is a direct child of + * {@code /topics//}. Idempotent — re-registering the same listener + * just updates the namespace mapping. + */ + public void registerNamespaceListener(NamespaceListener listener) { + namespaceListeners.put(listener, listener.getNamespaceName()); + } + + /** + * Deregister a previously-registered namespace listener. Safe to call multiple + * times or for listeners that were never registered. + */ + public void deregisterNamespaceListener(NamespaceListener listener) { + namespaceListeners.remove(listener); + } + + /** + * Single fan-out path: for each registered listener, emit the notification iff + * its path is a direct child of the listener's namespace base path. Filters out + * subtree events (subscriptions, controller lock) up front. + */ + void handleNotification(Notification notification) { + if (namespaceListeners.isEmpty()) { + return; + } + String path = notification.getPath(); + if (!path.startsWith(SCALABLE_TOPIC_PATH + "/")) { + return; + } + for (Map.Entry entry : namespaceListeners.entrySet()) { + String basePath = namespacePath(entry.getValue()); + if (!path.startsWith(basePath + "/")) { + continue; + } + // Direct child only — strip the prefix and check there's no further '/'. + String rest = path.substring(basePath.length() + 1); + if (rest.indexOf('/') >= 0) { + continue; + } + try { + entry.getKey().onNotification(notification); + } catch (Exception e) { + log.warn().attr("listener", entry.getKey()) + .attr("path", path) + .exceptionMessage(e) + .log("Failed to dispatch scalable-topic notification"); + } + } } public CompletableFuture createScalableTopicAsync(TopicName tn, ScalableTopicMetadata metadata) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java index 91a8bddc67010..8598170d96977 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java @@ -29,7 +29,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import lombok.Getter; import org.apache.pulsar.broker.resources.ScalableTopicMetadata; import org.apache.pulsar.broker.resources.ScalableTopicResources; @@ -60,7 +59,7 @@ *

    Any broker can serve this role: every broker observes the same metadata events * via the registered listener, so no coordinator is needed at the namespace level. */ -public class ScalableTopicsWatcherSession { +public class ScalableTopicsWatcherSession implements ScalableTopicResources.NamespaceListener { private static final Logger LOG = Logger.get(ScalableTopicsWatcherSession.class); /** @@ -89,7 +88,6 @@ public class ScalableTopicsWatcherSession { /** {@code /topics//} — direct children are scalable topic records. */ private final String basePath; - private final Consumer notificationListener = this::onNotification; /** * Topics currently in the matching set. Maintained server-side so we can detect @@ -129,18 +127,24 @@ public ScalableTopicsWatcherSession(long watchId, .build(); } + @Override + public NamespaceName getNamespaceName() { + return namespace; + } + /** - * Start the watch: register the metadata listener first (so events during snapshot - * computation are queued, not lost), compute the initial filtered set, then emit - * the {@code Snapshot} frame. After that, deltas flow through the listener. + * Start the watch: register on the namespace listener registry first (so events + * during snapshot computation are queued, not lost), compute the initial filtered + * set, then emit the {@code Snapshot} frame. After that, deltas flow through the + * listener. */ public CompletableFuture start() { - // Register listener BEFORE computing the initial set: any event that arrives - // mid-snapshot is captured by the listener and either (a) already in the - // initial set we're about to emit, in which case the redundant Add is a no-op - // on the client (set semantics), or (b) genuinely newer than the snapshot, in - // which case it correctly flows through as a Diff after the Snapshot frame. - resources.getStore().registerListener(notificationListener); + // Register BEFORE computing the initial set: any event that arrives mid-snapshot + // is captured by the listener and either (a) already in the initial set we're + // about to emit, in which case the redundant Add is a no-op on the client + // (set semantics), or (b) genuinely newer than the snapshot, in which case it + // correctly flows through as a Diff after the Snapshot frame. + resources.registerNamespaceListener(this); return resources.findScalableTopicsByPropertiesAsync(namespace, propertyFilters) .thenAccept(initialTopics -> { @@ -171,24 +175,21 @@ public CompletableFuture start() { } /** - * Visible for testing — invoked by the metadata-store listener registered in - * {@link #start()}. Hot-path: filter quickly to events under our namespace's base - * path, ignore subtree paths (subscriptions, controller lock, etc.), then evaluate. + * Invoked by {@link ScalableTopicResources} for every metadata event whose path + * is a direct child of this watcher's namespace base path. The resources-level + * fan-out has already done the namespace + direct-child filtering, so we go + * straight to evaluating the filter and updating the matching set. */ - void onNotification(Notification notification) { + @Override + public void onNotification(Notification notification) { if (closed.get()) { return; } String path = notification.getPath(); - // Direct child only: /topics///. Skip sub-paths - // like ...//subscriptions/... or ...//controller. - if (!path.startsWith(basePath + "/")) { - return; - } - String rest = path.substring(basePath.length() + 1); - if (rest.indexOf('/') >= 0) { - return; - } + // Resources-level fan-out guarantees direct-child paths under basePath, but + // re-derive the encoded local name defensively. + String rest = path.startsWith(basePath + "/") + ? path.substring(basePath.length() + 1) : path; String topicName = TopicName.get("topic", namespace, Codec.decode(rest)).toString(); @@ -301,11 +302,14 @@ private void flushPending() { } /** - * Drop the session. The metadata-store listener stays registered (the store API - * does not expose unregister), but {@link #onNotification} short-circuits on the - * closed flag, so further events are inert. + * Drop the session. Deregister from the resources' namespace listener registry so + * the per-event fan-out skips us — no listener leak, no per-notification dispatch + * tax for the broker's lifetime. */ public void close() { - closed.set(true); + if (!closed.compareAndSet(false, true)) { + return; + } + resources.deregisterNamespaceListener(this); } } diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java index e7b1143adb306..0350d83df52eb 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java @@ -291,7 +291,15 @@ private void reconnect() { log.warn().exceptionMessage(writeFuture.cause()) .log("Watcher reconnect write failed"); scheduleReconnect(); + return; } + // Write reached the broker — connection is healthy. + // Reset the backoff so the next disconnect starts + // fresh. Crucial for the hash-skip path: the broker + // emits no Snapshot, so onSnapshot's reset never + // fires; without this, a chain of short blips keeps + // the backoff at its peak forever. + reconnectBackoff.reset(); }); }) .exceptionally(ex -> {