Skip to content

Commit 66fda61

Browse files
authored
[fix][broker] Terminate the async call chain when the condition isn't met for resetCursor (#19541)
1 parent 2d90089 commit 66fda61

2 files changed

Lines changed: 17 additions & 1 deletion

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2672,7 +2672,7 @@ protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String
26722672
if (topicMetadata.partitions > 0) {
26732673
log.warn("[{}] Not supported operation on partitioned-topic {} {}",
26742674
clientAppId(), topicName, subName);
2675-
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
2675+
throw new CompletionException(new RestException(Status.METHOD_NOT_ALLOWED,
26762676
"Reset-cursor at position is not allowed for partitioned-topic"));
26772677
}
26782678
return CompletableFuture.completedFuture(null);

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,22 @@ public void testResetCursorOnPosition(String namespaceName) throws Exception {
661661
setup();
662662
}
663663

664+
@Test
665+
public void shouldNotSupportResetOnPartitionedTopic() throws PulsarAdminException, PulsarClientException {
666+
final String partitionedTopicName = "persistent://prop-xyz/ns1/" + BrokerTestUtil.newUniqueName("parttopic");
667+
admin.topics().createPartitionedTopic(partitionedTopicName, 4);
668+
@Cleanup
669+
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub")
670+
.subscriptionType(SubscriptionType.Shared).subscribe();
671+
try {
672+
admin.topics().resetCursor(partitionedTopicName, "my-sub", MessageId.earliest);
673+
fail();
674+
} catch (PulsarAdminException.NotAllowedException e) {
675+
assertTrue(e.getMessage().contains("Reset-cursor at position is not allowed for partitioned-topic"),
676+
"Condition doesn't match. Actual message:" + e.getMessage());
677+
}
678+
}
679+
664680
private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
665681
Producer<byte[]> producer = pulsarClient.newProducer()
666682
.topic(topicName)

0 commit comments

Comments
 (0)