Skip to content

Commit 4f73b43

Browse files
authored
[feat][broker] PIP-468: Filter scalable topics by property via secondary index (#25632)
1 parent cd910c2 commit 4f73b43

10 files changed

Lines changed: 623 additions & 10 deletions

File tree

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
*/
1919
package org.apache.pulsar.broker.resources;
2020

21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import java.io.IOException;
2123
import java.util.List;
24+
import java.util.Map;
2225
import java.util.Optional;
2326
import java.util.concurrent.CompletableFuture;
2427
import java.util.function.Function;
@@ -28,6 +31,7 @@
2831
import org.apache.pulsar.common.naming.TopicName;
2932
import org.apache.pulsar.common.util.Codec;
3033
import org.apache.pulsar.common.util.FutureUtil;
34+
import org.apache.pulsar.common.util.ObjectMapperFactory;
3135
import org.apache.pulsar.metadata.api.MetadataCache;
3236
import org.apache.pulsar.metadata.api.MetadataStore;
3337
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -54,6 +58,16 @@ public class ScalableTopicResources extends BaseResources<ScalableTopicMetadata>
5458
private static final String SUBSCRIPTIONS_SEGMENT = "subscriptions";
5559
private static final String CONSUMERS_SEGMENT = "consumers";
5660

61+
/**
62+
* Use the topic's {@code properties} map verbatim as the secondary-index entries.
63+
* Each property {@code k -> v} is registered as the index named {@code k} with
64+
* secondary key {@code v}; querying by that key/value pair via
65+
* {@link MetadataStore#findByIndex} returns the record. Index names live in a
66+
* per-record-type namespace, so there's no need to disambiguate them with a prefix.
67+
*/
68+
private static final Function<ScalableTopicMetadata, Map<String, String>> PROPERTY_INDEX_EXTRACTOR =
69+
metadata -> metadata.getProperties() != null ? metadata.getProperties() : Map.of();
70+
5771
private final MetadataCache<SubscriptionMetadata> subscriptionCache;
5872
private final MetadataCache<ConsumerRegistration> consumerRegistrationCache;
5973

@@ -64,7 +78,7 @@ public ScalableTopicResources(MetadataStore store, int operationTimeoutSec) {
6478
}
6579

6680
public CompletableFuture<Void> createScalableTopicAsync(TopicName tn, ScalableTopicMetadata metadata) {
67-
return createAsync(topicPath(tn), metadata);
81+
return getCache().create(topicPath(tn), metadata, PROPERTY_INDEX_EXTRACTOR);
6882
}
6983

7084
public CompletableFuture<Optional<ScalableTopicMetadata>> getScalableTopicMetadataAsync(TopicName tn) {
@@ -82,7 +96,10 @@ public CompletableFuture<Optional<ScalableTopicMetadata>> getScalableTopicMetada
8296
public CompletableFuture<Void> updateScalableTopicAsync(TopicName tn,
8397
Function<ScalableTopicMetadata,
8498
ScalableTopicMetadata> updateFunction) {
85-
return setAsync(topicPath(tn), updateFunction);
99+
// Refresh property indexes on every update — the modify function may add or remove
100+
// properties and the underlying store needs to see the post-modification view.
101+
return getCache().readModifyUpdate(topicPath(tn), updateFunction, PROPERTY_INDEX_EXTRACTOR)
102+
.thenApply(__ -> null);
86103
}
87104

88105
public CompletableFuture<Void> deleteScalableTopicAsync(TopicName tn) {
@@ -100,6 +117,41 @@ public CompletableFuture<List<String>> listScalableTopicsAsync(NamespaceName ns)
100117
.collect(Collectors.toList()));
101118
}
102119

120+
/**
121+
* List scalable topics in a namespace whose {@code properties} map contains the given
122+
* key/value pair. On stores with native secondary index support (Oxia) this is served
123+
* by the index registered at create/update time; otherwise it falls back to a children
124+
* scan + per-record property check.
125+
*
126+
* @param ns the namespace to scope the query to
127+
* @param propertyKey property name to filter on
128+
* @param propertyValue exact property value to match
129+
* @return fully qualified scalable topic names matching the property
130+
*/
131+
public CompletableFuture<List<String>> findScalableTopicsByPropertyAsync(
132+
NamespaceName ns, String propertyKey, String propertyValue) {
133+
String scanPathPrefix = joinPath(SCALABLE_TOPIC_PATH, ns.toString());
134+
ObjectMapper mapper = ObjectMapperFactory.getMapper().getObjectMapper();
135+
return getStore().findByIndex(scanPathPrefix, propertyKey, propertyValue, result -> {
136+
// Fallback path (no native index): re-check the property on the loaded record.
137+
try {
138+
ScalableTopicMetadata md =
139+
mapper.readValue(result.getValue(), ScalableTopicMetadata.class);
140+
return md.getProperties() != null
141+
&& propertyValue.equals(md.getProperties().get(propertyKey));
142+
} catch (IOException e) {
143+
return false;
144+
}
145+
})
146+
.thenApply(results -> results.stream()
147+
.map(r -> {
148+
String path = r.getStat().getPath();
149+
String encoded = path.substring(path.lastIndexOf('/') + 1);
150+
return TopicName.get("topic", ns, Codec.decode(encoded)).toString();
151+
})
152+
.collect(Collectors.toList()));
153+
}
154+
103155
// --- Subscriptions ---
104156

105157
/**

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,19 @@ public void getList(
9393
@ApiParam(value = "Specify the tenant", required = true)
9494
@PathParam("tenant") String tenant,
9595
@ApiParam(value = "Specify the namespace", required = true)
96-
@PathParam("namespace") String namespace) {
96+
@PathParam("namespace") String namespace,
97+
@ApiParam(value = "Filter by topic property name (must be paired with propertyValue)")
98+
@QueryParam("propertyKey") String propertyKey,
99+
@ApiParam(value = "Filter by topic property value (must be paired with propertyKey)")
100+
@QueryParam("propertyValue") String propertyValue) {
97101
validateNamespaceName(tenant, namespace);
102+
boolean filterByProperty = propertyKey != null && !propertyKey.isEmpty()
103+
&& propertyValue != null;
98104
validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
99-
.thenCompose(__ -> resources().listScalableTopicsAsync(namespaceName))
105+
.thenCompose(__ -> filterByProperty
106+
? resources().findScalableTopicsByPropertyAsync(
107+
namespaceName, propertyKey, propertyValue)
108+
: resources().listScalableTopicsAsync(namespaceName))
100109
.thenAccept(asyncResponse::resume)
101110
.exceptionally(ex -> {
102111
log.error().attr("clientAppId", clientAppId()).attr("namespace", namespaceName)
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.admin;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertTrue;
23+
import java.util.HashSet;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.UUID;
28+
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
29+
import org.testng.annotations.Test;
30+
31+
/**
32+
* End-to-end coverage for the property-filtered list endpoint
33+
* ({@code GET /admin/v2/scalable/{tenant}/{namespace}?propertyKey&propertyValue}).
34+
* Drives the full HTTP path through the {@link org.apache.pulsar.client.admin.PulsarAdmin}
35+
* client against a real shared broker, verifying that topics created with
36+
* properties through the admin API are queryable via the secondary index.
37+
*/
38+
public class ScalableTopicsListByPropertyTest extends SharedPulsarBaseTest {
39+
40+
private String namespace() {
41+
return getNamespace();
42+
}
43+
44+
private String topicName(String suffix) {
45+
return "topic://" + namespace() + "/" + suffix + "-" + UUID.randomUUID().toString().substring(0, 8);
46+
}
47+
48+
@Test
49+
public void listScalableTopicsFilteredByProperty() throws Exception {
50+
String aliceTopic = topicName("alice");
51+
String bobTopic = topicName("bob");
52+
String carolTopic = topicName("carol");
53+
54+
// Each topic gets a different owner; alice and bob share a team. The filter
55+
// should be able to surface either subset on demand.
56+
admin.scalableTopics().createScalableTopic(aliceTopic, 1,
57+
Map.of("owner", "alice", "team", "platform"));
58+
admin.scalableTopics().createScalableTopic(bobTopic, 1,
59+
Map.of("owner", "bob", "team", "platform"));
60+
admin.scalableTopics().createScalableTopic(carolTopic, 1,
61+
Map.of("owner", "carol", "team", "data"));
62+
63+
// Filter by owner=alice — single match.
64+
List<String> alice = admin.scalableTopics()
65+
.listScalableTopicsByProperty(namespace(), "owner", "alice");
66+
assertEquals(alice, List.of(aliceTopic));
67+
68+
// Filter by team=platform — alice + bob.
69+
Set<String> platform = new HashSet<>(admin.scalableTopics()
70+
.listScalableTopicsByProperty(namespace(), "team", "platform"));
71+
assertEquals(platform, Set.of(aliceTopic, bobTopic));
72+
73+
// Unmatched value — empty result.
74+
assertTrue(admin.scalableTopics()
75+
.listScalableTopicsByProperty(namespace(), "owner", "nonexistent")
76+
.isEmpty());
77+
78+
// Sanity-check: the un-filtered listing still returns every topic in the namespace.
79+
Set<String> all = new HashSet<>(admin.scalableTopics().listScalableTopics(namespace()));
80+
assertTrue(all.containsAll(Set.of(aliceTopic, bobTopic, carolTopic)),
81+
"expected all three created topics to appear in the unfiltered list, got " + all);
82+
}
83+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.resources;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertTrue;
23+
import java.util.HashMap;
24+
import java.util.HashSet;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import org.apache.pulsar.common.naming.NamespaceName;
29+
import org.apache.pulsar.common.naming.TopicName;
30+
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
31+
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
32+
import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
33+
import org.testng.annotations.AfterMethod;
34+
import org.testng.annotations.BeforeMethod;
35+
import org.testng.annotations.Test;
36+
37+
/**
38+
* Coverage for the {@code findScalableTopicsByPropertyAsync} entry point on
39+
* {@link ScalableTopicResources}: verifies that topic properties registered at
40+
* create/update time are queryable via the secondary-index API, that updates
41+
* refresh the index, and that the filter is correctly scoped to a namespace.
42+
*
43+
* <p>Uses {@link LocalMemoryMetadataStore} which does not implement native
44+
* secondary indexes, so this exercises the fallback scan + per-record property
45+
* predicate path. The Oxia-native path (where the index is consulted directly)
46+
* is covered by the metadata-store-level secondary index tests.
47+
*/
48+
public class ScalableTopicPropertyIndexTest {
49+
50+
private MetadataStoreExtended store;
51+
private ScalableTopicResources resources;
52+
53+
@BeforeMethod
54+
public void setUp() throws Exception {
55+
store = new LocalMemoryMetadataStore("memory:local",
56+
MetadataStoreConfig.builder().build());
57+
resources = new ScalableTopicResources(store, 30);
58+
}
59+
60+
@AfterMethod(alwaysRun = true)
61+
public void tearDown() throws Exception {
62+
if (store != null) {
63+
store.close();
64+
}
65+
}
66+
67+
private TopicName topicIn(NamespaceName ns, String localName) {
68+
return TopicName.get("topic://" + ns + "/" + localName);
69+
}
70+
71+
private ScalableTopicMetadata metaWithProps(Map<String, String> props) {
72+
return ScalableTopicMetadata.builder()
73+
.epoch(0)
74+
.nextSegmentId(1)
75+
.properties(new HashMap<>(props))
76+
.build();
77+
}
78+
79+
@Test
80+
public void findsTopicByExactPropertyValue() throws Exception {
81+
NamespaceName ns = NamespaceName.get("tenant/ns-a");
82+
83+
resources.createScalableTopicAsync(topicIn(ns, "t-alice"),
84+
metaWithProps(Map.of("owner", "alice", "team", "platform"))).get();
85+
resources.createScalableTopicAsync(topicIn(ns, "t-bob"),
86+
metaWithProps(Map.of("owner", "bob", "team", "platform"))).get();
87+
resources.createScalableTopicAsync(topicIn(ns, "t-carol"),
88+
metaWithProps(Map.of("owner", "carol", "team", "data"))).get();
89+
90+
// Filter by owner=alice — only t-alice matches.
91+
List<String> aliceOwned = resources
92+
.findScalableTopicsByPropertyAsync(ns, "owner", "alice").get();
93+
assertEquals(aliceOwned, List.of("topic://tenant/ns-a/t-alice"));
94+
95+
// Filter by team=platform — both alice and bob.
96+
Set<String> platform = new HashSet<>(resources
97+
.findScalableTopicsByPropertyAsync(ns, "team", "platform").get());
98+
assertEquals(platform, Set.of(
99+
"topic://tenant/ns-a/t-alice",
100+
"topic://tenant/ns-a/t-bob"));
101+
}
102+
103+
@Test
104+
public void findIsScopedToNamespace() throws Exception {
105+
NamespaceName nsA = NamespaceName.get("tenant/ns-a");
106+
NamespaceName nsB = NamespaceName.get("tenant/ns-b");
107+
108+
// Same property in two namespaces — find must return only the one we asked for.
109+
resources.createScalableTopicAsync(topicIn(nsA, "t1"),
110+
metaWithProps(Map.of("owner", "alice"))).get();
111+
resources.createScalableTopicAsync(topicIn(nsB, "t2"),
112+
metaWithProps(Map.of("owner", "alice"))).get();
113+
114+
List<String> inNsA = resources
115+
.findScalableTopicsByPropertyAsync(nsA, "owner", "alice").get();
116+
assertEquals(inNsA, List.of("topic://tenant/ns-a/t1"));
117+
118+
List<String> inNsB = resources
119+
.findScalableTopicsByPropertyAsync(nsB, "owner", "alice").get();
120+
assertEquals(inNsB, List.of("topic://tenant/ns-b/t2"));
121+
}
122+
123+
@Test
124+
public void noMatchReturnsEmptyList() throws Exception {
125+
NamespaceName ns = NamespaceName.get("tenant/ns-empty");
126+
resources.createScalableTopicAsync(topicIn(ns, "t1"),
127+
metaWithProps(Map.of("owner", "alice"))).get();
128+
129+
// Wrong value
130+
assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "owner", "bob")
131+
.get().isEmpty());
132+
133+
// Wrong key
134+
assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "team", "alice")
135+
.get().isEmpty());
136+
}
137+
138+
@Test
139+
public void updateRefreshesIndex() throws Exception {
140+
NamespaceName ns = NamespaceName.get("tenant/ns-update");
141+
TopicName tn = topicIn(ns, "t-mutating");
142+
143+
resources.createScalableTopicAsync(tn,
144+
metaWithProps(Map.of("owner", "alice"))).get();
145+
assertEquals(resources.findScalableTopicsByPropertyAsync(ns, "owner", "alice").get(),
146+
List.of(tn.toString()));
147+
148+
// Reassign owner via update — the new owner must be queryable, and the
149+
// old owner's entry must no longer match this topic.
150+
resources.updateScalableTopicAsync(tn, current -> {
151+
current.getProperties().put("owner", "bob");
152+
return current;
153+
}).get();
154+
155+
assertEquals(resources.findScalableTopicsByPropertyAsync(ns, "owner", "bob").get(),
156+
List.of(tn.toString()));
157+
assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "owner", "alice")
158+
.get().isEmpty());
159+
}
160+
161+
@Test
162+
public void topicWithoutPropertiesIsNotMatched() throws Exception {
163+
NamespaceName ns = NamespaceName.get("tenant/ns-noprops");
164+
resources.createScalableTopicAsync(topicIn(ns, "t-anon"),
165+
metaWithProps(Map.of())).get();
166+
resources.createScalableTopicAsync(topicIn(ns, "t-tagged"),
167+
metaWithProps(Map.of("owner", "alice"))).get();
168+
169+
// Filtering by any property must skip the un-tagged record.
170+
List<String> matches = resources
171+
.findScalableTopicsByPropertyAsync(ns, "owner", "alice").get();
172+
assertEquals(matches, List.of("topic://tenant/ns-noprops/t-tagged"));
173+
}
174+
}

0 commit comments

Comments
 (0)