Skip to content

Commit f5fd290

Browse files
committed
avro support for stub runner executor
relates to #2404 Signed-off-by: Emanuel Trandafir <emanueltrandafir1993@gmail.com>
1 parent 9dfe0be commit f5fd290

9 files changed

Lines changed: 198 additions & 70 deletions

File tree

spring-cloud-contract-stub-runner/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,12 @@
174174
<version>4.0.2</version>
175175
<optional>true</optional>
176176
</dependency>
177+
<dependency>
178+
<groupId>org.apache.avro</groupId>
179+
<artifactId>avro</artifactId>
180+
<version>1.12.1</version>
181+
<scope>test</scope>
182+
</dependency>
177183
<dependency>
178184
<groupId>cglib</groupId>
179185
<artifactId>cglib</artifactId>

spring-cloud-contract-stub-runner/src/main/java/org/springframework/cloud/contract/stubrunner/StubRunnerExecutor.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ private void sendMessage(Contract groovyDsl) {
251251
setMessageType(contract, ContractVerifierMessageMetadata.MessageType.OUTPUT);
252252

253253
Object payload = null;
254-
if (body != null && body.getClientValue() instanceof FromFileProperty) {
254+
if (isFromFileProperty(body)) {
255255
FromFileProperty fromFile = (FromFileProperty) body.getClientValue();
256256
if (fromFile.isByte()) {
257257
payload = fromFile.asBytes();
@@ -260,6 +260,10 @@ private void sendMessage(Contract groovyDsl) {
260260
payload = fromFile.asString();
261261
}
262262
}
263+
else if (isAvroContract(contract)) {
264+
log.info("Avro contract detected — passing raw body as Map, skipping JSON serialization");
265+
payload = BodyExtractor.extractClientValueFromBody(body == null ? null : body.getClientValue());
266+
}
263267
else {
264268
payload = JsonOutput
265269
.toJson(BodyExtractor.extractClientValueFromBody(body == null ? null : body.getClientValue()));
@@ -269,6 +273,25 @@ private void sendMessage(Contract groovyDsl) {
269273
outputMessage.getSentTo().getClientValue(), contract);
270274
}
271275

276+
private boolean isFromFileProperty(DslProperty<?> body) {
277+
return body != null && body.getClientValue() instanceof FromFileProperty;
278+
}
279+
280+
private boolean isAvroContract(YamlContract contract) {
281+
if (contract == null || contract.metadata == null) {
282+
return false;
283+
}
284+
Object kafkaMeta = contract.metadata.get("kafka");
285+
if (!(kafkaMeta instanceof Map)) {
286+
return false;
287+
}
288+
Object avroMeta = ((Map<String, Object>) kafkaMeta).get("avro");
289+
if (!(avroMeta instanceof Map)) {
290+
return false;
291+
}
292+
return ((Map<String, Object>) avroMeta).get("schema") != null;
293+
}
294+
272295
private void setMessageType(YamlContract contract, ContractVerifierMessageMetadata.MessageType output) {
273296
contract.metadata.put(ContractVerifierMessageMetadata.METADATA_KEY,
274297
new ContractVerifierMessageMetadata(output));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2013-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.contract.stubrunner
18+
19+
import org.apache.avro.generic.GenericRecord
20+
import spock.lang.Specification
21+
22+
import org.springframework.cloud.contract.verifier.messaging.avro.KafkaAvroMessageVerifierSender
23+
import org.springframework.kafka.core.KafkaTemplate
24+
25+
class StubRunnerExecutorAvroSpec extends Specification {
26+
27+
private KafkaTemplate<String, Object> kafkaTemplate = Mock()
28+
private KafkaAvroMessageVerifierSender sender = new KafkaAvroMessageVerifierSender(kafkaTemplate)
29+
30+
def 'should send Avro-serialized GenericRecord to Kafka for Avro contracts (bug #2404)'() {
31+
given:
32+
def tmpContractDir = saveTmpContract("""
33+
label: book_returned
34+
input:
35+
triggeredBy: publishBookReturned()
36+
outputMessage:
37+
sentTo: book.returned
38+
headers:
39+
X-Correlation-Id: abc-123-def
40+
body:
41+
isbn: "978-1234567890"
42+
title: "Contract Testing for Dummies"
43+
metadata:
44+
kafka:
45+
avro:
46+
schema: >
47+
{
48+
"type": "record",
49+
"name": "Book",
50+
"fields": [
51+
{"name": "isbn", "type": "string"},
52+
{"name": "title", "type": "string"}
53+
]
54+
}
55+
""")
56+
StubRunnerExecutor executor = new StubRunnerExecutor(new AvailablePortScanner(18000, 18999), sender, [])
57+
executor.runStubs(
58+
new StubRunnerOptionsBuilder().build(),
59+
new StubRepository(tmpContractDir, [], new StubRunnerOptionsBuilder().build(), null),
60+
new StubConfiguration('avro', 'avro', 'avro', ''))
61+
when:
62+
executor.trigger('book_returned')
63+
then:
64+
1 * kafkaTemplate.send({
65+
it.topic() == "book.returned" &&
66+
it.value() instanceof GenericRecord &&
67+
it.value()["schema"] != null &&
68+
it.value()["isbn"] == "978-1234567890" &&
69+
it.value()["title"] == "Contract Testing for Dummies" &&
70+
header(it, "X-Correlation-Id") == "abc-123-def"
71+
})
72+
cleanup:
73+
executor.shutdown()
74+
tmpContractDir.deleteDir()
75+
}
76+
77+
private File saveTmpContract(String contractYaml) {
78+
File contractDir = File.createTempDir()
79+
new File(contractDir, "book_returned.yml").text = contractYaml
80+
contractDir
81+
}
82+
83+
private String header(it, String key) {
84+
new String(it.headers().lastHeader(key).value())
85+
}
86+
87+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
label: book_returned
2+
input:
3+
triggeredBy: publishBookReturned()
4+
outputMessage:
5+
sentTo: book.returned
6+
headers:
7+
X-Correlation-Id: abc-123-def
8+
body:
9+
isbn: "978-1234567890"
10+
title: "Contract Testing for Dummies"
11+
metadata:
12+
kafka:
13+
avro:
14+
schema: >
15+
{
16+
"type": "record",
17+
"name": "Book",
18+
"fields": [
19+
{"name": "isbn", "type": "string"},
20+
{"name": "title", "type": "string"}
21+
]
22+
}

spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/AvroMetadata.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020
* Avro serialization metadata for a Kafka contract message.
2121
*
2222
* <p>
23-
* Example contract YAML:
24-
* <pre>
23+
* Example contract YAML: <pre>
2524
* metadata:
2625
* kafka:
2726
* avro:

spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/KafkaAvroContractVerifierConfiguration.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@
1919
import java.util.HashMap;
2020
import java.util.Map;
2121

22+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
23+
import io.confluent.kafka.serializers.KafkaAvroSerializer;
2224
import org.apache.avro.specific.SpecificRecordBase;
2325
import org.apache.kafka.clients.producer.ProducerConfig;
2426
import org.apache.kafka.common.serialization.StringSerializer;
27+
import tools.jackson.databind.json.JsonMapper;
28+
2529
import org.springframework.beans.factory.ObjectProvider;
2630
import org.springframework.beans.factory.annotation.Qualifier;
2731
import org.springframework.beans.factory.annotation.Value;
@@ -33,11 +37,6 @@
3337
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3438
import org.springframework.kafka.core.KafkaTemplate;
3539

36-
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
37-
38-
import io.confluent.kafka.serializers.KafkaAvroSerializer;
39-
import tools.jackson.databind.json.JsonMapper;
40-
4140
/**
4241
* Auto-configuration for Avro support in Spring Cloud Contract. Activates when
4342
* {@code org.apache.avro.specific.SpecificRecordBase} is on the classpath.
@@ -51,21 +50,17 @@ public class KafkaAvroContractVerifierConfiguration {
5150

5251
@Bean
5352
@ConditionalOnMissingBean
54-
ContractVerifierObjectMapper avroContractVerifierObjectMapper(
55-
ObjectProvider<JsonMapper> jsonMapper) {
56-
JsonMapper mapper = jsonMapper.getIfAvailable(JsonMapper::new).rebuild()
57-
.addMixIn(SpecificRecordBase.class, IgnoreAvroMixin.class).build();
53+
ContractVerifierObjectMapper avroContractVerifierObjectMapper(ObjectProvider<JsonMapper> jsonMapper) {
54+
JsonMapper mapper = jsonMapper.getIfAvailable(JsonMapper::new)
55+
.rebuild()
56+
.addMixIn(SpecificRecordBase.class, IgnoreAvroMixin.class)
57+
.build();
5858
return new ContractVerifierObjectMapper(mapper);
5959
}
6060

61-
@JsonIgnoreProperties({ "schema", "specificData", "classSchema", "conversion" })
62-
interface IgnoreAvroMixin {
63-
}
64-
6561
@Bean
6662
@ConditionalOnMissingBean(name = "avroKafkaTemplate")
67-
KafkaTemplate<String, Object> avroKafkaTemplate(
68-
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers,
63+
KafkaTemplate<String, Object> avroKafkaTemplate(@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers,
6964
@Value("${spring.cloud.contract.avro.schema-registry-url}") String schemaRegistryUrl) {
7065
Map<String, Object> props = new HashMap<>();
7166
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -82,4 +77,9 @@ KafkaAvroMessageVerifierSender kafkaAvroMessageVerifierSender(
8277
return new KafkaAvroMessageVerifierSender(avroKafkaTemplate);
8378
}
8479

80+
@JsonIgnoreProperties({ "schema", "specificData", "classSchema", "conversion" })
81+
interface IgnoreAvroMixin {
82+
83+
}
84+
8585
}

spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/avro/KafkaAvroMessageVerifierSender.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ public void send(Object message, String destination, @Nullable YamlContract cont
6363
}
6464

6565
@Override
66-
public <T> void send(T payload, Map<String, Object> headers, String destination,
67-
@Nullable YamlContract contract) {
66+
public <T> void send(T payload, Map<String, Object> headers, String destination, @Nullable YamlContract contract) {
6867
if (contract == null || contract.metadata == null) {
6968
throw new IllegalArgumentException(
7069
"Contract or its metadata is null — cannot perform Avro serialization for destination ["
@@ -116,9 +115,10 @@ private GenericRecord buildRecord(Schema schema, Object payload) {
116115
}
117116
Map<String, Object> payloadMap = (Map<String, Object>) payload;
118117
GenericRecordBuilder builder = new GenericRecordBuilder(schema);
119-
schema.getFields().stream()
120-
.filter(field -> payloadMap.containsKey(field.name()))
121-
.forEach(field -> builder.set(field, payloadMap.get(field.name())));
118+
schema.getFields()
119+
.stream()
120+
.filter(field -> payloadMap.containsKey(field.name()))
121+
.forEach(field -> builder.set(field, payloadMap.get(field.name())));
122122
return builder.build();
123123
}
124124

spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/noop/NoOpContractVerifierAutoConfiguration.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,7 @@ public ContractVerifierMessaging<Object> contractVerifierMessaging() {
8989

9090
@Bean
9191
@ConditionalOnMissingBean
92-
public ContractVerifierObjectMapper contractVerifierObjectMapper(
93-
ObjectProvider<JsonMapper> jsonMapper) {
92+
public ContractVerifierObjectMapper contractVerifierObjectMapper(ObjectProvider<JsonMapper> jsonMapper) {
9493
JsonMapper mapper = jsonMapper.getIfAvailable(JsonMapper::new);
9594
return new ContractVerifierObjectMapper(mapper);
9695
}

0 commit comments

Comments
 (0)