Skip to content

Commit fa28d1c

Browse files
authored
[fix][broker] Fix REST API to produce messages to single-partitioned topics (#24450)
1 parent ced60a7 commit fa28d1c

2 files changed

Lines changed: 15 additions & 5 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ private boolean findOwnerBrokerForTopic(boolean authoritative, AsyncResponse asy
345345
List<String> redirectAddresses = Collections.synchronizedList(new ArrayList<>());
346346
CompletableFuture<Boolean> future = new CompletableFuture<>();
347347
List<CompletableFuture<Void>> lookupFutures = new ArrayList<>();
348-
if (!topicName.isPartitioned() && metadata.partitions > 1) {
348+
if (!topicName.isPartitioned() && metadata.partitions > 0) {
349349
// Partitioned topic with multiple partitions, need to do look up for each partition.
350350
for (int index = 0; index < metadata.partitions; index++) {
351351
lookupFutures.add(lookUpBrokerForTopic(topicName.getPartition(index),

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import org.testng.Assert;
9797
import org.testng.annotations.AfterMethod;
9898
import org.testng.annotations.BeforeMethod;
99+
import org.testng.annotations.DataProvider;
99100
import org.testng.annotations.Test;
100101

101102
public class TopicsTest extends MockedPulsarServiceBaseTest {
@@ -168,10 +169,19 @@ private static List<ProducerMessage> createMessages(String message) throws JsonP
168169
}).readValue(message);
169170
}
170171

171-
@Test
172-
public void testProduceToPartitionedTopic() throws Exception {
172+
@DataProvider(name = "partitionNumbers")
173+
public Object[][] partitionNumbers() {
174+
return new Object[][] {
175+
//produce to single-partitioned topic
176+
{1},
177+
{5},
178+
};
179+
}
180+
181+
@Test(dataProvider = "partitionNumbers")
182+
public void testProduceToPartitionedTopic(int numPartitions) throws Exception {
173183
admin.topics().createPartitionedTopic("persistent://" + testTenant + "/" + testNamespace
174-
+ "/" + testTopicName + "-p", 5);
184+
+ "/" + testTopicName + "-p", numPartitions);
175185
AsyncResponse asyncResponse = mock(AsyncResponse.class);
176186
Schema<String> schema = StringSchema.utf8();
177187
ProducerMessages producerMessages = new ProducerMessages();
@@ -210,7 +220,7 @@ public void testProduceToPartitionedTopic() throws Exception {
210220
}
211221
for (int index = 0; index < messagePerPartition.length; index++) {
212222
// We publish to each partition in round robin mode so each partition should get at most 2 message.
213-
Assert.assertTrue(messagePerPartition[index] <= 2);
223+
Assert.assertTrue(messagePerPartition[index] <= 10 / numPartitions);
214224
}
215225
}
216226

0 commit comments

Comments
 (0)