Skip to content

Commit fce6e73

Browse files
authored
[improve][schema] Add admin cli for testCompatibility (#19974)
<!-- ### Contribution Checklist - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. --> <!-- Either this PR fixes an issue, --> <!-- or this PR is one task of an issue --> <!-- If the PR belongs to a PIP, please add the PIP link here --> <!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/wiki/proposals/PIP.md --> ### Motivation 1. add admin cli `testCompatibility` 2. add `admin.schemas().testCompatibility()` test ### Modifications 1. add admin cli `testCompatibility` 2. add `admin.schemas().testCompatibility()` test ### Verifying this change add test for it - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [x] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: <!-- ENTER URL HERE --> congbobo184#14 <!-- After opening this PR, the build in apache/pulsar will fail and instructions will be provided for opening a PR in the PR author's forked repository. apache/pulsar pull requests should be first tested in your own fork since the apache/pulsar CI based on GitHub Actions has constrained resources and quota. GitHub Actions provides separate quota for pull requests that are executed in a forked repository. The tests will be run in the forked repository until all PR review comments have been handled, the tests pass and the PR is approved by a reviewer. -->
1 parent a20d5e9 commit fce6e73

4 files changed

Lines changed: 51 additions & 1 deletion

File tree

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
5050
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
5151
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
52+
import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse;
53+
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
5254
import org.apache.pulsar.common.schema.SchemaInfo;
5355
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
5456
import org.apache.pulsar.common.schema.SchemaType;
@@ -438,4 +440,31 @@ public void testGetSchemaCompatibilityStrategyWhenSetBrokerLevelAndSchemaAutoUpd
438440
admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace),
439441
SchemaCompatibilityStrategy.UNDEFINED));
440442
}
443+
444+
@Test
445+
public void testCompatibility() throws Exception {
446+
String topicName = schemaCompatibilityNamespace + "/testCompatibility";
447+
try {
448+
admin.schemas().getSchemaInfo(topicName);
449+
fail();
450+
} catch (PulsarAdminException.NotFoundException e) {
451+
assertEquals(e.getMessage(), "Schema not found");
452+
}
453+
Map<String, String> properties = new HashMap<>();
454+
PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
455+
admin.schemas().createSchema(topicName, postSchemaPayload);
456+
IsCompatibilityResponse isCompatibilityResponse =
457+
admin.schemas().testCompatibility(topicName, postSchemaPayload);
458+
459+
assertTrue(isCompatibilityResponse.isCompatibility());
460+
assertEquals(isCompatibilityResponse.getSchemaCompatibilityStrategy(), SchemaCompatibilityStrategy.FULL.name());
461+
postSchemaPayload = new PostSchemaPayload("INT8", "", properties);
462+
try {
463+
admin.schemas().testCompatibility(topicName, postSchemaPayload);
464+
fail();
465+
} catch (Exception e) {
466+
assertTrue(e instanceof PulsarAdminException.ServerSideErrorException);
467+
assertTrue(e.getMessage().contains("Incompatible schema: exists schema type STRING, new schema type INT8"));
468+
}
469+
}
441470
}

pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/protocol/schema/IsCompatibilityResponse.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,4 @@
3333
public class IsCompatibilityResponse {
3434
boolean isCompatibility;
3535
String schemaCompatibilityStrategy;
36-
3736
}

pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2360,6 +2360,11 @@ void schemas() throws Exception {
23602360
PostSchemaPayload input = new ObjectMapper().readValue(new File(schemaFile), PostSchemaPayload.class);
23612361
verify(schemas).createSchema("persistent://tn1/ns1/tp1", input);
23622362

2363+
cmdSchemas = new CmdSchemas(() -> admin);
2364+
cmdSchemas.run(split("compatibility -f " + schemaFile + " persistent://tn1/ns1/tp1"));
2365+
input = new ObjectMapper().readValue(new File(schemaFile), PostSchemaPayload.class);
2366+
verify(schemas).testCompatibility("persistent://tn1/ns1/tp1", input);
2367+
23632368
cmdSchemas = new CmdSchemas(() -> admin);
23642369
String jarFile = PulsarAdminToolTest.class.getClassLoader()
23652370
.getResource("dummyexamples.jar").getFile();

pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public CmdSchemas(Supplier<PulsarAdmin> admin) {
4242
jcommander.addCommand("delete", new DeleteSchema());
4343
jcommander.addCommand("upload", new UploadSchema());
4444
jcommander.addCommand("extract", new ExtractSchema());
45+
jcommander.addCommand("compatibility", new TestCompatibility());
4546
}
4647

4748
@Parameters(commandDescription = "Get the schema for a topic")
@@ -164,4 +165,20 @@ void run() throws Exception {
164165
}
165166
}
166167

168+
@Parameters(commandDescription = "Test schema compatibility")
169+
private class TestCompatibility extends CliCommand {
170+
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
171+
private java.util.List<String> params;
172+
173+
@Parameter(names = { "-f", "--filename" }, description = "filename", required = true)
174+
private String schemaFileName;
175+
176+
@Override
177+
void run() throws Exception {
178+
String topic = validateTopicName(params);
179+
PostSchemaPayload input = MAPPER.readValue(new File(schemaFileName), PostSchemaPayload.class);
180+
getAdmin().schemas().testCompatibility(topic, input);
181+
}
182+
}
183+
167184
}

0 commit comments

Comments
 (0)