Skip to content

Commit 27e132b

Browse files
authored
[feat] PIP-468: Filter scalable topic listing by multiple properties (AND) (#25639)
1 parent ee7dc68 commit 27e132b

7 files changed

Lines changed: 184 additions & 88 deletions

File tree

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -118,32 +118,57 @@ public CompletableFuture<List<String>> listScalableTopicsAsync(NamespaceName ns)
118118
}
119119

120120
/**
121-
* List scalable topics in a namespace whose {@code properties} map contains the given
122-
* key/value pair. On stores with native secondary index support (Oxia) this is served
123-
* by the index registered at create/update time; otherwise it falls back to a children
124-
* scan + per-record property check.
121+
* List scalable topics in a namespace whose {@code properties} map contains every
122+
* key/value pair in {@code propertyFilters} (AND semantics).
125123
*
126-
* @param ns the namespace to scope the query to
127-
* @param propertyKey property name to filter on
128-
* @param propertyValue exact property value to match
129-
* @return fully qualified scalable topic names matching the property
124+
* <p>Stores with native secondary-index support (Oxia) serve the most-restrictive
125+
* lookup via the index for one of the filters, then a record-level check rejects
126+
* anything that doesn't satisfy the rest. Stores without native index support fall
127+
* through to a children scan + the same predicate. An empty {@code propertyFilters}
128+
* map degenerates to {@link #listScalableTopicsAsync}.
129+
*
130+
* @param ns the namespace to scope the query to
131+
* @param propertyFilters property name/value pairs that all must match (AND)
132+
* @return fully qualified scalable topic names matching every filter
130133
*/
131-
public CompletableFuture<List<String>> findScalableTopicsByPropertyAsync(
132-
NamespaceName ns, String propertyKey, String propertyValue) {
134+
public CompletableFuture<List<String>> findScalableTopicsByPropertiesAsync(
135+
NamespaceName ns, Map<String, String> propertyFilters) {
136+
if (propertyFilters == null || propertyFilters.isEmpty()) {
137+
return listScalableTopicsAsync(ns);
138+
}
133139
String scanPathPrefix = joinPath(SCALABLE_TOPIC_PATH, ns.toString());
134140
ObjectMapper mapper = ObjectMapperFactory.getMapper().getObjectMapper();
135-
return getStore().findByIndex(scanPathPrefix, propertyKey, propertyValue, result -> {
136-
// Fallback path (no native index): re-check the property on the loaded record.
137-
try {
138-
ScalableTopicMetadata md =
139-
mapper.readValue(result.getValue(), ScalableTopicMetadata.class);
140-
return md.getProperties() != null
141-
&& propertyValue.equals(md.getProperties().get(propertyKey));
142-
} catch (IOException e) {
141+
142+
// Pick any single filter to drive the index lookup (native stores will use it
143+
// to narrow the candidate set; iteration order is acceptable since we don't
144+
// know index cardinalities up front). The predicate then enforces AND across
145+
// every filter on the loaded record.
146+
Map.Entry<String, String> indexFilter = propertyFilters.entrySet().iterator().next();
147+
java.util.function.Predicate<org.apache.pulsar.metadata.api.GetResult> matchesAll = result -> {
148+
try {
149+
ScalableTopicMetadata md =
150+
mapper.readValue(result.getValue(), ScalableTopicMetadata.class);
151+
Map<String, String> props = md.getProperties();
152+
if (props == null) {
153+
return false;
154+
}
155+
for (Map.Entry<String, String> e : propertyFilters.entrySet()) {
156+
if (!e.getValue().equals(props.get(e.getKey()))) {
143157
return false;
144158
}
145-
})
159+
}
160+
return true;
161+
} catch (IOException e) {
162+
return false;
163+
}
164+
};
165+
return getStore().findByIndex(scanPathPrefix,
166+
indexFilter.getKey(), indexFilter.getValue(), matchesAll)
167+
// Native-index implementations don't apply the fallback predicate, so
168+
// re-check here. On the fallback path this is a no-op (predicate already
169+
// applied) but cheap.
146170
.thenApply(results -> results.stream()
171+
.filter(matchesAll)
147172
.map(r -> {
148173
String path = r.getStat().getPath();
149174
String encoded = path.substring(path.lastIndexOf('/') + 1);

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.net.MalformedURLException;
2727
import java.net.URI;
2828
import java.net.URL;
29+
import java.util.List;
2930
import java.util.Map;
3031
import java.util.concurrent.CompletableFuture;
3132
import java.util.function.Function;
@@ -94,18 +95,16 @@ public void getList(
9495
@PathParam("tenant") String tenant,
9596
@ApiParam(value = "Specify the namespace", required = true)
9697
@PathParam("namespace") String namespace,
97-
@ApiParam(value = "Filter by topic property name (must be paired with propertyValue)")
98-
@QueryParam("propertyKey") String propertyKey,
99-
@ApiParam(value = "Filter by topic property value (must be paired with propertyKey)")
100-
@QueryParam("propertyValue") String propertyValue) {
98+
@ApiParam(value = "Filter to topics whose properties contain every key=value pair."
99+
+ " Each repetition of the parameter adds one filter (AND semantics).")
100+
@QueryParam("property") List<String> properties) {
101101
validateNamespaceName(tenant, namespace);
102-
boolean filterByProperty = propertyKey != null && !propertyKey.isEmpty()
103-
&& propertyValue != null;
102+
Map<String, String> propertyFilters = parseKeyValuePairs(properties);
104103
validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
105-
.thenCompose(__ -> filterByProperty
106-
? resources().findScalableTopicsByPropertyAsync(
107-
namespaceName, propertyKey, propertyValue)
108-
: resources().listScalableTopicsAsync(namespaceName))
104+
.thenCompose(__ -> propertyFilters.isEmpty()
105+
? resources().listScalableTopicsAsync(namespaceName)
106+
: resources().findScalableTopicsByPropertiesAsync(
107+
namespaceName, propertyFilters))
109108
.thenAccept(asyncResponse::resume)
110109
.exceptionally(ex -> {
111110
log.error().attr("clientAppId", clientAppId()).attr("namespace", namespaceName)
@@ -115,6 +114,30 @@ public void getList(
115114
});
116115
}
117116

117+
/**
118+
* Parse {@code key=value} entries from a list of query parameter values into a map.
119+
* Accepts {@code null} / empty input. Rejects malformed entries (no {@code =}, empty
120+
* key, or empty value) with a 412.
121+
*/
122+
private static Map<String, String> parseKeyValuePairs(List<String> entries) {
123+
if (entries == null || entries.isEmpty()) {
124+
return Map.of();
125+
}
126+
Map<String, String> result = new java.util.LinkedHashMap<>(entries.size());
127+
for (String entry : entries) {
128+
if (entry == null || entry.isEmpty()) {
129+
continue;
130+
}
131+
int eq = entry.indexOf('=');
132+
if (eq <= 0 || eq == entry.length() - 1) {
133+
throw new RestException(Response.Status.fromStatusCode(412),
134+
"property filter must be in the form key=value, got: " + entry);
135+
}
136+
result.put(entry.substring(0, eq), entry.substring(eq + 1));
137+
}
138+
return result;
139+
}
140+
118141
// --- Create ---
119142

120143
@PUT

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ScalableTopicsListByPropertyTest.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,28 +51,37 @@ public void listScalableTopicsFilteredByProperty() throws Exception {
5151
String bobTopic = topicName("bob");
5252
String carolTopic = topicName("carol");
5353

54-
// Each topic gets a different owner; alice and bob share a team. The filter
55-
// should be able to surface either subset on demand.
54+
// alice and bob share team=platform; alice and carol share owner=alice. We
55+
// can hit each consumer-driven slice via different filter combinations below.
5656
admin.scalableTopics().createScalableTopic(aliceTopic, 1,
5757
Map.of("owner", "alice", "team", "platform"));
5858
admin.scalableTopics().createScalableTopic(bobTopic, 1,
5959
Map.of("owner", "bob", "team", "platform"));
6060
admin.scalableTopics().createScalableTopic(carolTopic, 1,
61-
Map.of("owner", "carol", "team", "data"));
61+
Map.of("owner", "alice", "team", "data"));
6262

63-
// Filter by owner=alice — single match.
64-
List<String> alice = admin.scalableTopics()
65-
.listScalableTopicsByProperty(namespace(), "owner", "alice");
66-
assertEquals(alice, List.of(aliceTopic));
63+
// Single-property filter: owner=bob — single match.
64+
List<String> bob = admin.scalableTopics()
65+
.listScalableTopicsByProperties(namespace(), Map.of("owner", "bob"));
66+
assertEquals(bob, List.of(bobTopic));
6767

68-
// Filter by team=platform — alice + bob.
68+
// Single-property filter: team=platform — alice + bob.
6969
Set<String> platform = new HashSet<>(admin.scalableTopics()
70-
.listScalableTopicsByProperty(namespace(), "team", "platform"));
70+
.listScalableTopicsByProperties(namespace(), Map.of("team", "platform")));
7171
assertEquals(platform, Set.of(aliceTopic, bobTopic));
7272

73-
// Unmatched value — empty result.
73+
// Multi-property AND filter: owner=alice AND team=platform — narrows to
74+
// exactly aliceTopic, even though carol also has owner=alice and bob also
75+
// has team=platform.
76+
List<String> aliceOnPlatform = admin.scalableTopics()
77+
.listScalableTopicsByProperties(namespace(),
78+
Map.of("owner", "alice", "team", "platform"));
79+
assertEquals(aliceOnPlatform, List.of(aliceTopic));
80+
81+
// Unmatched combination — empty result.
7482
assertTrue(admin.scalableTopics()
75-
.listScalableTopicsByProperty(namespace(), "owner", "nonexistent")
83+
.listScalableTopicsByProperties(namespace(),
84+
Map.of("owner", "alice", "team", "ops"))
7685
.isEmpty());
7786

7887
// Sanity-check: the un-filtered listing still returns every topic in the namespace.

pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicPropertyIndexTest.java

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,11 @@
3535
import org.testng.annotations.Test;
3636

3737
/**
38-
* Coverage for the {@code findScalableTopicsByPropertyAsync} entry point on
38+
* Coverage for the {@code findScalableTopicsByPropertiesAsync} entry point on
3939
* {@link ScalableTopicResources}: verifies that topic properties registered at
4040
* create/update time are queryable via the secondary-index API, that updates
41-
* refresh the index, and that the filter is correctly scoped to a namespace.
41+
* refresh the index, that the filter is correctly scoped to a namespace, and
42+
* that multi-property filters AND the conditions together.
4243
*
4344
* <p>Uses {@link LocalMemoryMetadataStore} which does not implement native
4445
* secondary indexes, so this exercises the fallback scan + per-record property
@@ -87,19 +88,57 @@ public void findsTopicByExactPropertyValue() throws Exception {
8788
resources.createScalableTopicAsync(topicIn(ns, "t-carol"),
8889
metaWithProps(Map.of("owner", "carol", "team", "data"))).get();
8990

90-
// Filter by owner=alice — only t-alice matches.
91+
// Single-property filter: owner=alice — only t-alice matches.
9192
List<String> aliceOwned = resources
92-
.findScalableTopicsByPropertyAsync(ns, "owner", "alice").get();
93+
.findScalableTopicsByPropertiesAsync(ns, Map.of("owner", "alice")).get();
9394
assertEquals(aliceOwned, List.of("topic://tenant/ns-a/t-alice"));
9495

95-
// Filter by team=platform — both alice and bob.
96+
// Single-property filter: team=platform — both alice and bob.
9697
Set<String> platform = new HashSet<>(resources
97-
.findScalableTopicsByPropertyAsync(ns, "team", "platform").get());
98+
.findScalableTopicsByPropertiesAsync(ns, Map.of("team", "platform")).get());
9899
assertEquals(platform, Set.of(
99100
"topic://tenant/ns-a/t-alice",
100101
"topic://tenant/ns-a/t-bob"));
101102
}
102103

104+
@Test
105+
public void andsMultiplePropertyFilters() throws Exception {
106+
NamespaceName ns = NamespaceName.get("tenant/ns-and");
107+
108+
// alice/platform and bob/platform share team; alice/data and alice/platform
109+
// share owner. The AND of (team=platform, owner=alice) must narrow to the
110+
// single record that satisfies both.
111+
resources.createScalableTopicAsync(topicIn(ns, "t-1"),
112+
metaWithProps(Map.of("owner", "alice", "team", "platform"))).get();
113+
resources.createScalableTopicAsync(topicIn(ns, "t-2"),
114+
metaWithProps(Map.of("owner", "alice", "team", "data"))).get();
115+
resources.createScalableTopicAsync(topicIn(ns, "t-3"),
116+
metaWithProps(Map.of("owner", "bob", "team", "platform"))).get();
117+
118+
List<String> match = resources.findScalableTopicsByPropertiesAsync(ns,
119+
Map.of("owner", "alice", "team", "platform")).get();
120+
assertEquals(match, List.of("topic://tenant/ns-and/t-1"));
121+
122+
// No record satisfies both — empty result.
123+
assertTrue(resources.findScalableTopicsByPropertiesAsync(ns,
124+
Map.of("owner", "alice", "team", "ops")).get().isEmpty());
125+
}
126+
127+
@Test
128+
public void emptyFilterReturnsAllTopicsInNamespace() throws Exception {
129+
NamespaceName ns = NamespaceName.get("tenant/ns-empty-filter");
130+
resources.createScalableTopicAsync(topicIn(ns, "t-1"),
131+
metaWithProps(Map.of("owner", "alice"))).get();
132+
resources.createScalableTopicAsync(topicIn(ns, "t-2"),
133+
metaWithProps(Map.of("owner", "bob"))).get();
134+
135+
Set<String> all = new HashSet<>(resources
136+
.findScalableTopicsByPropertiesAsync(ns, Map.of()).get());
137+
assertEquals(all, Set.of(
138+
"topic://tenant/ns-empty-filter/t-1",
139+
"topic://tenant/ns-empty-filter/t-2"));
140+
}
141+
103142
@Test
104143
public void findIsScopedToNamespace() throws Exception {
105144
NamespaceName nsA = NamespaceName.get("tenant/ns-a");
@@ -112,11 +151,11 @@ public void findIsScopedToNamespace() throws Exception {
112151
metaWithProps(Map.of("owner", "alice"))).get();
113152

114153
List<String> inNsA = resources
115-
.findScalableTopicsByPropertyAsync(nsA, "owner", "alice").get();
154+
.findScalableTopicsByPropertiesAsync(nsA, Map.of("owner", "alice")).get();
116155
assertEquals(inNsA, List.of("topic://tenant/ns-a/t1"));
117156

118157
List<String> inNsB = resources
119-
.findScalableTopicsByPropertyAsync(nsB, "owner", "alice").get();
158+
.findScalableTopicsByPropertiesAsync(nsB, Map.of("owner", "alice")).get();
120159
assertEquals(inNsB, List.of("topic://tenant/ns-b/t2"));
121160
}
122161

@@ -127,11 +166,11 @@ public void noMatchReturnsEmptyList() throws Exception {
127166
metaWithProps(Map.of("owner", "alice"))).get();
128167

129168
// Wrong value
130-
assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "owner", "bob")
169+
assertTrue(resources.findScalableTopicsByPropertiesAsync(ns, Map.of("owner", "bob"))
131170
.get().isEmpty());
132171

133172
// Wrong key
134-
assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "team", "alice")
173+
assertTrue(resources.findScalableTopicsByPropertiesAsync(ns, Map.of("team", "alice"))
135174
.get().isEmpty());
136175
}
137176

@@ -142,7 +181,7 @@ public void updateRefreshesIndex() throws Exception {
142181

143182
resources.createScalableTopicAsync(tn,
144183
metaWithProps(Map.of("owner", "alice"))).get();
145-
assertEquals(resources.findScalableTopicsByPropertyAsync(ns, "owner", "alice").get(),
184+
assertEquals(resources.findScalableTopicsByPropertiesAsync(ns, Map.of("owner", "alice")).get(),
146185
List.of(tn.toString()));
147186

148187
// Reassign owner via update — the new owner must be queryable, and the
@@ -152,9 +191,9 @@ public void updateRefreshesIndex() throws Exception {
152191
return current;
153192
}).get();
154193

155-
assertEquals(resources.findScalableTopicsByPropertyAsync(ns, "owner", "bob").get(),
194+
assertEquals(resources.findScalableTopicsByPropertiesAsync(ns, Map.of("owner", "bob")).get(),
156195
List.of(tn.toString()));
157-
assertTrue(resources.findScalableTopicsByPropertyAsync(ns, "owner", "alice")
196+
assertTrue(resources.findScalableTopicsByPropertiesAsync(ns, Map.of("owner", "alice"))
158197
.get().isEmpty());
159198
}
160199

@@ -168,7 +207,7 @@ public void topicWithoutPropertiesIsNotMatched() throws Exception {
168207

169208
// Filtering by any property must skip the un-tagged record.
170209
List<String> matches = resources
171-
.findScalableTopicsByPropertyAsync(ns, "owner", "alice").get();
210+
.findScalableTopicsByPropertiesAsync(ns, Map.of("owner", "alice")).get();
172211
assertEquals(matches, List.of("topic://tenant/ns-noprops/t-tagged"));
173212
}
174213
}

pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,26 +51,26 @@ public interface ScalableTopics {
5151

5252
/**
5353
* Get the list of scalable topics under a namespace whose properties contain
54-
* the given key/value pair.
54+
* every key/value pair in {@code propertyFilters} (AND semantics).
5555
*
56-
* <p>Backed by a secondary index registered on the topic property at create/update
57-
* time, so the lookup is efficient and does not scan every topic in the namespace.
56+
* <p>Backed by the secondary index registered on the topic properties at
57+
* create/update time. On stores with native index support the lookup uses one
58+
* filter to narrow the candidate set and verifies the rest on the loaded record;
59+
* stores without index support fall back to a per-record check.
5860
*
59-
* @param namespace Namespace name in the format "tenant/namespace"
60-
* @param propertyKey Property name to filter on
61-
* @param propertyValue Exact property value to match
62-
* @return list of matching scalable topic names
61+
* @param namespace Namespace name in the format "tenant/namespace"
62+
* @param propertyFilters Property names and exact values that all must match
63+
* @return list of matching scalable topic names; an empty filter returns the full
64+
* namespace listing
6365
*/
64-
List<String> listScalableTopicsByProperty(String namespace, String propertyKey, String propertyValue)
66+
List<String> listScalableTopicsByProperties(String namespace, Map<String, String> propertyFilters)
6567
throws PulsarAdminException;
6668

6769
/**
68-
* Get the list of scalable topics under a namespace whose properties contain
69-
* the given key/value pair, asynchronously.
70+
* Async variant of {@link #listScalableTopicsByProperties(String, Map)}.
7071
*/
71-
CompletableFuture<List<String>> listScalableTopicsByPropertyAsync(String namespace,
72-
String propertyKey,
73-
String propertyValue);
72+
CompletableFuture<List<String>> listScalableTopicsByPropertiesAsync(String namespace,
73+
Map<String, String> propertyFilters);
7474

7575
/**
7676
* Create a new scalable topic.

0 commit comments

Comments
 (0)