Skip to content

Commit ed10ec3

Browse files
authored
[improve][broker]Part-2 Add Admin API to delete topic policies (apache#24602)
1 parent c3c55a7 commit ed10ec3

7 files changed

Lines changed: 146 additions & 9 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,6 +1203,42 @@ public void deleteTopic(
12031203

12041204
}
12051205

1206+
@DELETE
1207+
@Path("/{tenant}/{namespace}/{topic}/policies")
1208+
@ApiOperation(value = "Delete policies for a topic.")
1209+
@ApiResponses(value = {
1210+
@ApiResponse(code = 204, message = "Operation successful"),
1211+
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
1212+
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
1213+
@ApiResponse(code = 403, message = "Don't have admin permission"),
1214+
@ApiResponse(code = 404, message = "Namespace or topic does not exist"),
1215+
@ApiResponse(code = 500, message = "Internal server error")})
1216+
public void deleteTopicPolicies(
1217+
@Suspended AsyncResponse asyncResponse,
1218+
@ApiParam(value = "Specify the tenant", required = true)
1219+
@PathParam("tenant") String tenant,
1220+
@ApiParam(value = "Specify the namespace", required = true)
1221+
@PathParam("namespace") String namespace,
1222+
@ApiParam(value = "Specify topic name", required = true)
1223+
@PathParam("topic") @Encoded String encodedTopic,
1224+
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
1225+
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
1226+
validateTopicName(tenant, namespace, encodedTopic);
1227+
validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE)
1228+
.thenCompose(__ -> pulsar().getTopicPoliciesService().deleteTopicPoliciesAsync(topicName, false))
1229+
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
1230+
.exceptionally(ex -> {
1231+
Throwable t = FutureUtil.unwrapCompletionException(ex);
1232+
if (t instanceof IllegalStateException){
1233+
ex = new RestException(422/* Unprocessable entity*/, t.getMessage());
1234+
} else if (isNot307And4xxException(ex)) {
1235+
log.error("[{}] Failed to delete topic {}", clientAppId(), topicName, t);
1236+
}
1237+
resumeAsyncResponseExceptionally(asyncResponse, ex);
1238+
return null;
1239+
});
1240+
}
1241+
12061242
@GET
12071243
@Path("/{tenant}/{namespace}/{topic}/subscriptions")
12081244
@ApiOperation(

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -467,27 +467,34 @@ protected void verifyReplicationWorks(String topic) throws Exception {
467467

468468
protected void setTopicLevelClusters(String topic, List<String> clusters, PulsarAdmin admin,
469469
PulsarService pulsar) throws Exception {
470+
setTopicLevelClusters(topic, clusters, admin, pulsar, false);
471+
}
472+
473+
protected void setTopicLevelClusters(String topic, List<String> clusters, PulsarAdmin admin,
474+
PulsarService pulsar, boolean global) throws Exception {
470475
Set<String> expected = new HashSet<>(clusters);
471476
TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
472477
int partitions = ensurePartitionsAreSame(topic);
473-
admin.topics().setReplicationClusters(topic, clusters);
478+
admin.topicPolicies(global).setReplicationClusters(topic, clusters);
474479
Awaitility.await().untilAsserted(() -> {
475-
TopicPolicies policies = TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), topicName);
480+
TopicPolicies policies = TopicPolicyTestUtils.getTopicPolicies(pulsar.getTopicPoliciesService(), topicName,
481+
global);
476482
assertEquals(new HashSet<>(policies.getReplicationClusters()), expected);
477483
if (partitions == 0) {
478-
checkNonPartitionedTopicLevelClusters(topicName.toString(), clusters, admin, pulsar.getBrokerService());
484+
checkNonPartitionedTopicLevelClusters(topicName.toString(), clusters, admin, pulsar,
485+
global);
479486
} else {
480487
for (int i = 0; i < partitions; i++) {
481488
checkNonPartitionedTopicLevelClusters(topicName.getPartition(i).toString(), clusters, admin,
482-
pulsar.getBrokerService());
489+
pulsar, global);
483490
}
484491
}
485492
});
486493
}
487494

488495
protected void checkNonPartitionedTopicLevelClusters(String topic, List<String> clusters, PulsarAdmin admin,
489-
BrokerService broker) throws Exception {
490-
CompletableFuture<Optional<Topic>> future = broker.getTopic(topic, false);
496+
PulsarService pulsar, boolean global) throws Exception {
497+
CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopic(topic, false);
491498
if (future == null) {
492499
return;
493500
}
@@ -497,7 +504,8 @@ protected void checkNonPartitionedTopicLevelClusters(String topic, List<String>
497504
}
498505
PersistentTopic persistentTopic = (PersistentTopic) optional.get();
499506
Set<String> expected = new HashSet<>(clusters);
500-
Set<String> act = new HashSet<>(TopicPolicyTestUtils.getTopicPolicies(persistentTopic)
507+
Set<String> act = new HashSet<>(TopicPolicyTestUtils
508+
.getTopicPolicies(pulsar.getTopicPoliciesService(), TopicName.get(persistentTopic.topic), global)
501509
.getReplicationClusters());
502510
assertEquals(act, expected);
503511
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.apache.pulsar.broker.service.TopicPoliciesService.GetType.GLOBAL_ONLY;
2122
import static org.testng.Assert.assertEquals;
2223
import static org.testng.Assert.assertFalse;
2324
import static org.testng.Assert.assertNotNull;
@@ -40,6 +41,7 @@
4041
import org.apache.pulsar.client.api.Schema;
4142
import org.apache.pulsar.common.naming.TopicName;
4243
import org.apache.pulsar.common.policies.data.RetentionPolicies;
44+
import org.apache.pulsar.common.policies.data.TopicPolicies;
4345
import org.awaitility.Awaitility;
4446
import org.testng.annotations.AfterClass;
4547
import org.testng.annotations.BeforeClass;
@@ -317,9 +319,56 @@ public void testDeleteNonPartitionedTopic() throws Exception {
317319
super.testDeleteNonPartitionedTopic();
318320
}
319321

322+
@Override
320323
@Test
321324
public void testDeletePartitionedTopic() throws Exception {
322-
super.testDeletePartitionedTopic();
325+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
326+
admin1.topics().createPartitionedTopic(topicName, 2);
327+
328+
// Verify replicator works.
329+
verifyReplicationWorks(topicName);
330+
331+
// Remove remote cluster from remote cluster.
332+
setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, pulsar1, true);
333+
Awaitility.await().untilAsserted(() -> {
334+
assertTrue(pulsar1.getPulsarResources().getTopicResources()
335+
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
336+
assertTrue(pulsar1.getPulsarResources().getTopicResources()
337+
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
338+
assertFalse(pulsar2.getPulsarResources().getTopicResources()
339+
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
340+
assertFalse(pulsar2.getPulsarResources().getTopicResources()
341+
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
342+
});
343+
344+
345+
// Delete topic.
346+
admin1.topics().deletePartitionedTopic(topicName);
347+
Awaitility.await().untilAsserted(() -> {
348+
assertFalse(pulsar1.getPulsarResources().getTopicResources()
349+
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
350+
assertFalse(pulsar1.getPulsarResources().getTopicResources()
351+
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
352+
assertFalse(pulsar2.getPulsarResources().getTopicResources()
353+
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
354+
assertFalse(pulsar2.getPulsarResources().getTopicResources()
355+
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
356+
});
357+
358+
Awaitility.await().untilAsserted(() -> {
359+
Optional<TopicPolicies> op1 = pulsar1.getTopicPoliciesService()
360+
.getTopicPoliciesAsync(TopicName.get(topicName), GLOBAL_ONLY).join();
361+
assertFalse(op1.isPresent());
362+
Optional<TopicPolicies> op2 = pulsar2.getTopicPoliciesService()
363+
.getTopicPoliciesAsync(TopicName.get(topicName), GLOBAL_ONLY).join();
364+
assertTrue(op2.isPresent());
365+
});
366+
admin2.topicPolicies().deleteTopicPolicies(topicName);
367+
Awaitility.await().untilAsserted(() -> {
368+
Optional<TopicPolicies> op2 = pulsar2.getTopicPoliciesService()
369+
.getTopicPoliciesAsync(TopicName.get(topicName), GLOBAL_ONLY).join();
370+
assertFalse(op2.isPresent());
371+
});
323372
}
324373

325374
@Test(enabled = false)

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ public static TopicPolicies getTopicPolicies(TopicPoliciesService topicPoliciesS
4848
.orElse(null);
4949
}
5050

51+
public static TopicPolicies getTopicPolicies(TopicPoliciesService topicPoliciesService, TopicName topicName,
52+
boolean global) throws ExecutionException, InterruptedException {
53+
TopicPoliciesService.GetType getType = global ? TopicPoliciesService.GetType.GLOBAL_ONLY
54+
: TopicPoliciesService.GetType.LOCAL_ONLY;
55+
return topicPoliciesService.getTopicPoliciesAsync(topicName, getType).get()
56+
.orElse(null);
57+
}
58+
5159
public static TopicPolicies getLocalTopicPolicies(TopicPoliciesService topicPoliciesService, TopicName topicName)
5260
throws ExecutionException, InterruptedException {
5361
return topicPoliciesService.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY).get()

pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1936,7 +1936,18 @@ AutoSubscriptionCreationOverride getAutoSubscriptionCreation(String topic,
19361936
*/
19371937
CompletableFuture<Void> setReplicationClusters(String topic, List<String> clusterIds);
19381938

1939+
/**
1940+
* get the replication clusters for the topic.
1941+
*/
19391942
Set<String> getReplicationClusters(String topic, boolean applied) throws PulsarAdminException;
19401943

1944+
/**
1945+
* get the replication clusters for the topic.
1946+
*/
19411947
void removeReplicationClusters(String topic) throws PulsarAdminException;
1948+
1949+
/**
1950+
* Delete topic policies, it works even if the topic has been deleted.
1951+
*/
1952+
void deleteTopicPolicies(String topic) throws PulsarAdminException;
19421953
}

pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,6 +1312,17 @@ public CompletableFuture<Void> removeReplicationClustersAsync(String topic) {
13121312
return asyncDeleteRequest(path);
13131313
}
13141314

1315+
@Override
1316+
public void deleteTopicPolicies(String topic) throws PulsarAdminException {
1317+
sync(() -> deleteTopicPoliciesAsync(topic));
1318+
}
1319+
1320+
public CompletableFuture<Void> deleteTopicPoliciesAsync(String topic) {
1321+
TopicName tn = validateTopic(topic);
1322+
WebTarget path = topicPath(tn, "policies");
1323+
return asyncDeleteRequest(path);
1324+
}
1325+
13151326
/*
13161327
* returns topic name with encoded Local Name
13171328
*/

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class CmdTopicPolicies extends CmdBase {
6060

6161
public CmdTopicPolicies(Supplier<PulsarAdmin> admin) {
6262
super("topicPolicies", admin);
63-
63+
addCommand("delete", new DeletePolicies());
6464
addCommand("get-message-ttl", new GetMessageTTL());
6565
addCommand("set-message-ttl", new SetMessageTTL());
6666
addCommand("remove-message-ttl", new RemoveMessageTTL());
@@ -2058,6 +2058,20 @@ void run() throws PulsarAdminException {
20582058
}
20592059
}
20602060

2061+
@Command(description = "Remove the all policies for a topic, it will not remove policies from the remote"
2062+
+ "cluster")
2063+
private class DeletePolicies extends CliCommand {
2064+
2065+
@Parameters(description = "persistent://tenant/namespace/topic", arity = "1")
2066+
private String topicName;
2067+
2068+
@Override
2069+
void run() throws PulsarAdminException {
2070+
String persistentTopic = validatePersistentTopic(topicName);
2071+
getTopicPolicies(false).deleteTopicPolicies(persistentTopic);
2072+
}
2073+
}
2074+
20612075
private TopicPolicies getTopicPolicies(boolean isGlobal) {
20622076
return getAdmin().topicPolicies(isGlobal);
20632077
}

0 commit comments

Comments
 (0)