Skip to content

Commit 45d07cb

Browse files
committed
sample kafka avro producer + consumer
Signed-off-by: Emanuel Trandafir <emanueltrandafir1993@gmail.com>
1 parent 73fdcf9 commit 45d07cb

17 files changed

Lines changed: 770 additions & 0 deletions

File tree

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>org.springframework.boot</groupId>
8+
<artifactId>spring-boot-starter-parent</artifactId>
9+
<version>4.0.1</version>
10+
<relativePath/> <!-- lookup parent from repository -->
11+
</parent>
12+
13+
<groupId>org.springframework.cloud</groupId>
14+
<artifactId>spring-cloud-contract-sample-kafka</artifactId>
15+
<version>5.0.2-SNAPSHOT</version>
16+
<packaging>pom</packaging>
17+
18+
<properties>
19+
<spring-boot.version>4.0.1</spring-boot.version>
20+
<kafka-avro-serializer.version>8.1.1</kafka-avro-serializer.version>
21+
<spring-cloud.version>2025.1.0</spring-cloud.version>
22+
<testcontainers.version>1.20.4</testcontainers.version>
23+
</properties>
24+
25+
<modules>
26+
<module>sample-kafka-avro-schema</module>
27+
<module>sample-kafka-avro-producer</module>
28+
<module>sample-kafka-avro-consumer</module>
29+
</modules>
30+
31+
<repositories>
32+
<repository>
33+
<id>confluent</id>
34+
<name>Confluent Maven Repository</name>
35+
<url>https://packages.confluent.io/maven/</url>
36+
</repository>
37+
<repository>
38+
<id>spring-snapshots</id>
39+
<name>Spring Snapshots</name>
40+
<url>https://repo.spring.io/snapshot</url>
41+
<snapshots>
42+
<enabled>true</enabled>
43+
</snapshots>
44+
</repository>
45+
</repositories>
46+
47+
<pluginRepositories>
48+
<pluginRepository>
49+
<id>spring-snapshots</id>
50+
<name>Spring Snapshots</name>
51+
<url>https://repo.spring.io/snapshot</url>
52+
<snapshots>
53+
<enabled>true</enabled>
54+
</snapshots>
55+
</pluginRepository>
56+
<pluginRepository>
57+
<id>spring-plugin-snapshots</id>
58+
<name>Spring Snapshots</name>
59+
<url>https://repo.spring.io/snapshot</url>
60+
<snapshots>
61+
<enabled>true</enabled>
62+
</snapshots>
63+
</pluginRepository>
64+
</pluginRepositories>
65+
66+
<dependencyManagement>
67+
<dependencies>
68+
<dependency>
69+
<groupId>org.springframework.cloud</groupId>
70+
<artifactId>spring-cloud-dependencies</artifactId>
71+
<version>${spring-cloud.version}</version>
72+
<type>pom</type>
73+
<scope>import</scope>
74+
</dependency>
75+
<dependency>
76+
<groupId>org.testcontainers</groupId>
77+
<artifactId>testcontainers-bom</artifactId>
78+
<version>1.20.4</version>
79+
<type>pom</type>
80+
<scope>import</scope>
81+
</dependency>
82+
<dependency>
83+
<groupId>org.springframework.boot</groupId>
84+
<artifactId>spring-boot-starter-kafka</artifactId>
85+
<version>4.0.1</version>
86+
</dependency>
87+
<dependency>
88+
<groupId>io.confluent</groupId>
89+
<artifactId>kafka-avro-serializer</artifactId>
90+
<version>${kafka-avro-serializer.version}</version>
91+
</dependency>
92+
</dependencies>
93+
</dependencyManagement>
94+
95+
<build>
96+
<pluginManagement>
97+
<plugins>
98+
<plugin>
99+
<groupId>org.springframework.boot</groupId>
100+
<artifactId>spring-boot-maven-plugin</artifactId>
101+
<version>4.0.1</version>
102+
</plugin>
103+
</plugins>
104+
</pluginManagement>
105+
</build>
106+
</project>
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>org.springframework.cloud</groupId>
8+
<artifactId>spring-cloud-contract-sample-kafka</artifactId>
9+
<version>5.0.2-SNAPSHOT</version>
10+
<relativePath>../pom.xml</relativePath>
11+
</parent>
12+
13+
<artifactId>sample-kafka-avro-consumer</artifactId>
14+
<packaging>jar</packaging>
15+
16+
<dependencies>
17+
<dependency>
18+
<groupId>org.springframework.cloud</groupId>
19+
<artifactId>spring-cloud-contract-sample-kafka-avro-schema</artifactId>
20+
<version>${project.version}</version>
21+
</dependency>
22+
<dependency>
23+
<groupId>org.springframework.boot</groupId>
24+
<artifactId>spring-boot-starter</artifactId>
25+
</dependency>
26+
<dependency>
27+
<groupId>org.springframework.boot</groupId>
28+
<artifactId>spring-boot-starter-kafka</artifactId>
29+
</dependency>
30+
<dependency>
31+
<groupId>io.confluent</groupId>
32+
<artifactId>kafka-avro-serializer</artifactId>
33+
</dependency>
34+
35+
<dependency>
36+
<groupId>org.springframework.boot</groupId>
37+
<artifactId>spring-boot-starter-test</artifactId>
38+
<scope>test</scope>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.springframework.cloud</groupId>
42+
<artifactId>spring-cloud-starter-contract-stub-runner</artifactId>
43+
<scope>test</scope>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.testcontainers</groupId>
48+
<artifactId>testcontainers</artifactId>
49+
<scope>test</scope>
50+
</dependency>
51+
<dependency>
52+
<groupId>org.testcontainers</groupId>
53+
<artifactId>junit-jupiter</artifactId>
54+
<scope>test</scope>
55+
</dependency>
56+
<dependency>
57+
<groupId>org.testcontainers</groupId>
58+
<artifactId>kafka</artifactId>
59+
<scope>test</scope>
60+
</dependency>
61+
</dependencies>
62+
63+
<build>
64+
<plugins>
65+
<plugin>
66+
<groupId>org.springframework.boot</groupId>
67+
<artifactId>spring-boot-maven-plugin</artifactId>
68+
</plugin>
69+
</plugins>
70+
</build>
71+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.example.kafka.consumer;
2+
3+
import com.example.kafka.avro.Book;
4+
5+
import org.springframework.kafka.annotation.KafkaListener;
6+
import org.springframework.stereotype.Component;
7+
8+
@Component
9+
class BooksReturnedListener {
10+
11+
private final EmailService emailService;
12+
13+
BooksReturnedListener(EmailService emailService) {
14+
this.emailService = emailService;
15+
}
16+
17+
@KafkaListener(topics = "book.returned")
18+
public void sendEmailOnBookReturned(Book book) {
19+
String emailBody = """
20+
Dear User,
21+
22+
The book you borrowed has been successfully returned:
23+
Title: %s, Author: %s, ISBN: %s
24+
25+
""".formatted(book.getTitle(), book.getAuthor(), book.getIsbn());
26+
27+
emailService.sendEmail(emailBody);
28+
}
29+
30+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.example.kafka.consumer;
2+
3+
import org.springframework.stereotype.Service;
4+
5+
@Service
6+
public class EmailService {
7+
8+
public void sendEmail(String emailBody) {
9+
// Simulate sending an email
10+
System.out.println("Sending email:\n" + emailBody);
11+
}
12+
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.example.kafka.consumer;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class KafkaAvroConsumerApplication {
8+
9+
public static void main(String[] args) {
10+
SpringApplication.run(KafkaAvroConsumerApplication.class, args);
11+
}
12+
13+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
spring:
2+
application-name: kafka-avro-consumer
3+
kafka:
4+
bootstrap-servers: localhost:9092
5+
producer:
6+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
7+
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
8+
consumer:
9+
group-id: kafka-avro-consumer-group
10+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
11+
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
12+
properties:
13+
specific.avro.reader: true
14+
properties:
15+
schema.registry.url: mock://
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package com.example.kafka.consumer;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
import com.example.kafka.avro.Book;
7+
8+
import org.jetbrains.annotations.Nullable;
9+
import org.junit.jupiter.api.Test;
10+
import org.junit.jupiter.api.extension.ExtendWith;
11+
import org.testcontainers.junit.jupiter.Container;
12+
import org.testcontainers.junit.jupiter.Testcontainers;
13+
import org.testcontainers.kafka.ConfluentKafkaContainer;
14+
import org.testcontainers.utility.DockerImageName;
15+
16+
import wiremock.com.fasterxml.jackson.core.JsonProcessingException;
17+
import wiremock.com.fasterxml.jackson.databind.json.JsonMapper;
18+
19+
import org.springframework.beans.factory.annotation.Autowired;
20+
import org.springframework.boot.test.context.SpringBootTest;
21+
import org.springframework.boot.test.system.OutputCaptureExtension;
22+
import org.springframework.cloud.contract.stubrunner.StubTrigger;
23+
import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
24+
import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
25+
import org.springframework.cloud.contract.verifier.converter.YamlContract;
26+
import org.springframework.cloud.contract.verifier.messaging.MessageVerifierSender;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.kafka.core.KafkaTemplate;
30+
import org.springframework.kafka.support.KafkaHeaders;
31+
import org.springframework.messaging.Message;
32+
import org.springframework.messaging.MessageHeaders;
33+
import org.springframework.messaging.support.MessageBuilder;
34+
import org.springframework.test.context.DynamicPropertyRegistry;
35+
import org.springframework.test.context.DynamicPropertySource;
36+
import org.springframework.test.context.bean.override.mockito.MockitoBean;
37+
38+
import static java.util.Collections.emptyMap;
39+
import static org.awaitility.Awaitility.await;
40+
import static org.mockito.ArgumentMatchers.contains;
41+
import static org.mockito.Mockito.verify;
42+
43+
@Testcontainers
44+
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {
45+
CollaborationTest.TestConfig.class, KafkaAvroConsumerApplication.class })
46+
@AutoConfigureStubRunner(ids = "org.springframework.cloud:spring-cloud-contract-sample-kafka-avro-producer:+:stubs", stubsMode = StubRunnerProperties.StubsMode.LOCAL)
47+
@ExtendWith(OutputCaptureExtension.class)
48+
class CollaborationTest {
49+
50+
@Autowired
51+
StubTrigger trigger;
52+
53+
@MockitoBean
54+
EmailService emailService;
55+
56+
@Container
57+
static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(
58+
DockerImageName.parse("confluentinc/cp-kafka"));
59+
60+
@DynamicPropertySource
61+
static void kafkaProperties(DynamicPropertyRegistry registry) {
62+
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
63+
}
64+
65+
@Test
66+
void shouldSendEmail_onBookReturned() {
67+
trigger.trigger("book_returned");
68+
69+
// @formatter:off
70+
await().untilAsserted(() ->
71+
verify(emailService).sendEmail(
72+
contains("Title: Contract Testing for Dummies, Author: John Doe, ISBN: 978-1234567890")));
73+
// @formatter:om
74+
}
75+
76+
@Configuration
77+
static class TestConfig {
78+
79+
@Bean
80+
MessageVerifierSender<Message<?>> standaloneMessageVerifier(KafkaTemplate<String, Object> kafkaTemplate) {
81+
return new KafkaAvroMessageVerifierSender<>(kafkaTemplate);
82+
}
83+
84+
}
85+
86+
static class KafkaAvroMessageVerifierSender<M> implements MessageVerifierSender<M> {
87+
88+
private final KafkaTemplate<String, Object> kafkaTemplate;
89+
90+
// TODO: should this be the default?
91+
@Override
92+
public void send(M message, String destination, @Nullable YamlContract contract) {
93+
send(message, emptyMap(), destination, contract);
94+
}
95+
96+
@Override
97+
public <T> void send(T payload, Map<String, Object> headers, String destination,
98+
@Nullable YamlContract contract) {
99+
Map<String, Object> newHeaders = headers != null ? new HashMap<>(headers) : new HashMap<>();
100+
newHeaders.put(KafkaHeaders.TOPIC, destination);
101+
MessageHeaders msgHeaders = new MessageHeaders(newHeaders);
102+
103+
try {
104+
// TODO: remove this workaround after merging:
105+
// https://github.com/spring-cloud/spring-cloud-contract/issues/2404
106+
Book avroPayload = new JsonMapper().readValue(payload.toString(), Book.class);
107+
var message = MessageBuilder.createMessage(avroPayload, msgHeaders);
108+
kafkaTemplate.send(message);
109+
}
110+
catch (JsonProcessingException e) {
111+
throw new RuntimeException(e);
112+
}
113+
}
114+
115+
KafkaAvroMessageVerifierSender(KafkaTemplate<String, Object> kafkaTemplate) {
116+
this.kafkaTemplate = kafkaTemplate;
117+
}
118+
119+
}
120+
121+
}

0 commit comments

Comments
 (0)