Skip to content

Commit be57e9a

Browse files
[fix][schema]Fix AutoProduceBytes producer can not be created to a topic with ProtoBuf schema (#19767)
### Motivation 1. There is a topic1 with a protobuf schema. 2. Create a producer1 with AutoProduceBytes schema. 3. The producer1 will be created failed because the way to get the schema of protobuf schema is not supported. ### ### ### Modification Because the Protobuf schema is implemented from the AvroBaseStructSchema. So we add a way to get Protobuf schema just like the AvroSchema.
1 parent 0214745 commit be57e9a

2 files changed

Lines changed: 25 additions & 1 deletion

File tree

pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.apache.pulsar.client.api.schema.GenericRecord;
7171
import org.apache.pulsar.client.api.schema.SchemaDefinition;
7272
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
73+
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
7374
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
7475
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
7576
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
@@ -113,6 +114,22 @@ public void cleanup() throws Exception {
113114
super.internalCleanup();
114115
}
115116

117+
@Test
118+
public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws Exception{
119+
final String tenant = PUBLIC_TENANT;
120+
final String namespace = "test-namespace-" + randomName(16);
121+
final String topic = tenant + "/" + namespace + "/test-getSchema";
122+
admin.namespaces().createNamespace(
123+
tenant + "/" + namespace,
124+
Sets.newHashSet(CLUSTER_NAME)
125+
);
126+
127+
ProtobufSchema<org.apache.pulsar.client.api.schema.proto.Test.TestMessage> protobufSchema =
128+
ProtobufSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessage.class);
129+
pulsarClient.newProducer(protobufSchema).topic(topic).create();
130+
pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
131+
}
132+
116133
@Test
117134
public void testMultiTopicSetSchemaProvider() throws Exception {
118135
final String tenant = PUBLIC_TENANT;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
7171
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
7272
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
73+
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
7374
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
7475
import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl;
7576
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
@@ -81,6 +82,7 @@
8182
import org.apache.pulsar.common.naming.TopicName;
8283
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
8384
import org.apache.pulsar.common.schema.SchemaInfo;
85+
import org.apache.pulsar.common.schema.SchemaType;
8486
import org.apache.pulsar.common.topics.TopicList;
8587
import org.apache.pulsar.common.util.FutureUtil;
8688
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -350,7 +352,12 @@ public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurat
350352
return lookup.getSchema(TopicName.get(conf.getTopicName()))
351353
.thenCompose(schemaInfoOptional -> {
352354
if (schemaInfoOptional.isPresent()) {
353-
autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfoOptional.get()));
355+
SchemaInfo schemaInfo = schemaInfoOptional.get();
356+
if (schemaInfo.getType() == SchemaType.PROTOBUF) {
357+
autoProduceBytesSchema.setSchema(new GenericAvroSchema(schemaInfo));
358+
} else {
359+
autoProduceBytesSchema.setSchema(Schema.getSchema(schemaInfo));
360+
}
354361
} else {
355362
autoProduceBytesSchema.setSchema(Schema.BYTES);
356363
}

0 commit comments

Comments
 (0)