Skip to content

Commit ffa4973

Browse files
[improve][client] allow override of default global jsr310 conversion (apache#24311)
1 parent cc0eef9 commit ffa4973

4 files changed

Lines changed: 126 additions & 6 deletions

File tree

pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.impl.schema;
2020

21-
import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.getJsr310ConversionEnabledFromSchemaInfo;
21+
import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.getJsr310ConversionEnabled;
2222
import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo;
2323
import java.util.Map;
2424
import lombok.extern.slf4j.Slf4j;
@@ -53,9 +53,9 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
5353
private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
5454
super(schemaInfo);
5555
this.pojoClassLoader = pojoClassLoader;
56-
boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
56+
boolean jsr310ConversionEnabled = getJsr310ConversionEnabled(schemaInfo);
5757
setReader(new MultiVersionAvroReader<>(schema, pojoClassLoader,
58-
getJsr310ConversionEnabledFromSchemaInfo(schemaInfo)));
58+
getJsr310ConversionEnabled(schemaInfo)));
5959
setWriter(new AvroWriter<>(schema, jsr310ConversionEnabled));
6060
}
6161

pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/MultiVersionAvroReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package org.apache.pulsar.client.impl.schema.reader;
2020

21-
import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.getJsr310ConversionEnabledFromSchemaInfo;
21+
import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.getJsr310ConversionEnabled;
2222
import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema;
2323
import org.apache.avro.Schema;
2424
import org.apache.pulsar.client.api.schema.SchemaReader;
@@ -49,7 +49,7 @@ protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
4949
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
5050
schemaInfo.getSchemaDefinition(), schemaInfo);
5151
}
52-
boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
52+
boolean jsr310ConversionEnabled = getJsr310ConversionEnabled(schemaInfo);
5353
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()),
5454
readerSchema, pojoClassLoader, jsr310ConversionEnabled);
5555
} else {

pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,16 @@
3333

3434
public class SchemaUtil {
3535

36-
public static boolean getJsr310ConversionEnabledFromSchemaInfo(SchemaInfo schemaInfo) {
36+
private static Boolean globalJsr310ConversionEnabled = null;
37+
38+
public static void setGlobalJsr310ConversionEnabled(Boolean globalJsr310ConversionEnabled) {
39+
SchemaUtil.globalJsr310ConversionEnabled = globalJsr310ConversionEnabled;
40+
}
41+
42+
public static boolean getJsr310ConversionEnabled(SchemaInfo schemaInfo) {
43+
if (globalJsr310ConversionEnabled != null) {
44+
return globalJsr310ConversionEnabled;
45+
}
3746
if (schemaInfo != null) {
3847
return Boolean.parseBoolean(schemaInfo.getProperties()
3948
.getOrDefault(SchemaDefinitionBuilderImpl.JSR310_CONVERSION_ENABLED, "false"));
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.impl.schema.util;
20+
21+
import static org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl.JSR310_CONVERSION_ENABLED;
22+
import static org.testng.Assert.assertFalse;
23+
import static org.testng.Assert.assertTrue;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import org.apache.pulsar.common.schema.SchemaInfo;
27+
import org.apache.pulsar.common.schema.SchemaType;
28+
import org.testng.annotations.Test;
29+
30+
public class SchemaUtilTest {
31+
32+
@Test
33+
public void schemaWithoutJsr310EnabledPropertyReturnsFalse() {
34+
SchemaUtil.setGlobalJsr310ConversionEnabled(null);
35+
SchemaInfo schemaInfo = emptyPropertiesSchema();
36+
boolean isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(schemaInfo);
37+
assertFalse(isJsr310Enabled);
38+
}
39+
40+
@Test
41+
public void schemaWithJsr310DisabledPropertyReturnsFalse() {
42+
SchemaUtil.setGlobalJsr310ConversionEnabled(null);
43+
SchemaInfo schemaInfo = disabledJsr310PropertiesSchema();
44+
boolean isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(schemaInfo);
45+
assertFalse(isJsr310Enabled);
46+
}
47+
48+
@Test
49+
public void schemaWithJsr310EnabledPropertyReturnsTrue() {
50+
SchemaUtil.setGlobalJsr310ConversionEnabled(null);
51+
SchemaInfo schemaInfo = enabledJsr310PropertiesSchema();
52+
boolean isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(schemaInfo);
53+
assertTrue(isJsr310Enabled);
54+
}
55+
56+
@Test
57+
public void globalJsr310DisabledAlwaysReturnsFalse() {
58+
SchemaUtil.setGlobalJsr310ConversionEnabled(false);
59+
boolean isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(emptyPropertiesSchema());
60+
assertFalse(isJsr310Enabled);
61+
isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(disabledJsr310PropertiesSchema());
62+
assertFalse(isJsr310Enabled);
63+
isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(enabledJsr310PropertiesSchema());
64+
assertFalse(isJsr310Enabled);
65+
}
66+
67+
@Test
68+
public void globalJsr310EnabledAlwaysReturnsTrue() {
69+
SchemaUtil.setGlobalJsr310ConversionEnabled(true);
70+
boolean isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(emptyPropertiesSchema());
71+
assertTrue(isJsr310Enabled);
72+
isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(disabledJsr310PropertiesSchema());
73+
assertTrue(isJsr310Enabled);
74+
isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(enabledJsr310PropertiesSchema());
75+
assertTrue(isJsr310Enabled);
76+
}
77+
78+
private static SchemaInfo emptyPropertiesSchema() {
79+
return SchemaInfo.builder()
80+
.schema("{\"type\": \"string\"}".getBytes())
81+
.type(SchemaType.AVRO)
82+
.name("unitTest")
83+
.properties(new HashMap<>())
84+
.build();
85+
}
86+
87+
private static SchemaInfo disabledJsr310PropertiesSchema() {
88+
Map<String, String> properties = new HashMap<>();
89+
properties.put(JSR310_CONVERSION_ENABLED, "false");
90+
91+
return SchemaInfo.builder()
92+
.schema("{\"type\": \"string\"}".getBytes())
93+
.type(SchemaType.AVRO)
94+
.name("unitTest")
95+
.properties(properties)
96+
.build();
97+
}
98+
99+
private static SchemaInfo enabledJsr310PropertiesSchema() {
100+
Map<String, String> properties = new HashMap<>();
101+
properties.put(JSR310_CONVERSION_ENABLED, "true");
102+
103+
return SchemaInfo.builder()
104+
.schema("{\"type\": \"string\"}".getBytes())
105+
.type(SchemaType.AVRO)
106+
.name("unitTest")
107+
.properties(properties)
108+
.build();
109+
}
110+
111+
}

0 commit comments

Comments
 (0)