Skip to content

Commit 77e6bc6

Browse files
feat : adds workflow and testcases for kafka and rabbitmq (#3)
* feat : adds workflow for kafka and rabbitmq * Adds test cases for kafka * add test cases for rabbit sender * adds tests for received
1 parent a3c5a6e commit 77e6bc6

22 files changed

Lines changed: 537 additions & 90 deletions

File tree

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
name: spring-reactive-boot-reactor-kafka
2+
3+
on:
4+
push:
5+
paths:
6+
- "boot-reactor-kafka/**"
7+
branches: [ master ]
8+
pull_request:
9+
paths:
10+
- "boot-reactor-kafka/**"
11+
types:
12+
- opened
13+
- synchronize
14+
- reopened
15+
16+
jobs:
17+
build:
18+
19+
runs-on: ubuntu-latest
20+
21+
steps:
22+
- uses: actions/checkout@v6
23+
with:
24+
fetch-depth: 0
25+
- name: Set up JDK
26+
uses: actions/setup-java@v5
27+
with:
28+
java-version: '21'
29+
distribution: 'temurin'
30+
cache: 'maven'
31+
- name: Build Sender with Maven
32+
run: |
33+
cd boot-reactor-kafka/sender
34+
mvn -B package --file pom.xml
35+
- name: Build Receiver with Maven
36+
run: |
37+
cd boot-reactor-kafka/receiver
38+
mvn -B package --file pom.xml
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
name: spring-reactive-boot-reactor-rabbitmq
2+
3+
on:
4+
push:
5+
paths:
6+
- "boot-reactor-rabbitmq/**"
7+
branches: [ master ]
8+
pull_request:
9+
paths:
10+
- "boot-reactor-rabbitmq/**"
11+
types:
12+
- opened
13+
- synchronize
14+
- reopened
15+
16+
jobs:
17+
build:
18+
19+
runs-on: ubuntu-latest
20+
21+
steps:
22+
- uses: actions/checkout@v6
23+
with:
24+
fetch-depth: 0
25+
- name: Set up JDK
26+
uses: actions/setup-java@v5
27+
with:
28+
java-version: '21'
29+
distribution: 'temurin'
30+
cache: 'maven'
31+
- name: Build Sender with Maven
32+
run: |
33+
cd boot-reactor-rabbitmq/sender
34+
mvn -B package --file pom.xml
35+
- name: Build Receiver with Maven
36+
run: |
37+
cd boot-reactor-rabbitmq/receiver
38+
mvn -B package --file pom.xml

boot-reactor-kafka/receiver/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,21 @@
5757
<artifactId>spring-boot-starter-kafka-test</artifactId>
5858
<scope>test</scope>
5959
</dependency>
60+
<dependency>
61+
<groupId>org.springframework.boot</groupId>
62+
<artifactId>spring-boot-testcontainers</artifactId>
63+
<scope>test</scope>
64+
</dependency>
65+
<dependency>
66+
<groupId>org.testcontainers</groupId>
67+
<artifactId>testcontainers-junit-jupiter</artifactId>
68+
<scope>test</scope>
69+
</dependency>
70+
<dependency>
71+
<groupId>org.testcontainers</groupId>
72+
<artifactId>testcontainers-kafka</artifactId>
73+
<scope>test</scope>
74+
</dependency>
6075
</dependencies>
6176

6277
<build>

boot-reactor-kafka/receiver/src/main/java/com/example/demo/ReceiverApplication.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.springframework.boot.SpringApplication;
1010
import org.springframework.boot.autoconfigure.SpringBootApplication;
1111
import org.springframework.boot.context.event.ApplicationStartedEvent;
12+
import org.springframework.boot.kafka.autoconfigure.KafkaConnectionDetails;
1213
import org.springframework.context.annotation.Bean;
1314
import org.springframework.context.event.EventListener;
1415
import org.springframework.http.MediaType;
@@ -51,9 +52,9 @@ public void init(ApplicationStartedEvent event) {
5152
}
5253

5354
@Bean
54-
Map<String, Object> consumerProps() {
55+
Map<String, Object> consumerProps(KafkaConnectionDetails kafkaConnectionDetails) {
5556
Map<String, Object> props = new HashMap<>();
56-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
57+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConnectionDetails.getBootstrapServers());
5758
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-consumer");
5859
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
5960
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
@@ -63,8 +64,8 @@ Map<String, Object> consumerProps() {
6364
}
6465

6566
@Bean
66-
KafkaReceiver<Integer, String> receiver() {
67-
var receiverOptions = ReceiverOptions.<Integer, String>create(consumerProps())
67+
KafkaReceiver<Integer, String> receiver(KafkaConnectionDetails kafkaConnectionDetails) {
68+
var receiverOptions = ReceiverOptions.<Integer, String>create(consumerProps(kafkaConnectionDetails))
6869
.subscription(Collections.singleton(HELLO_TOPIC))
6970
.addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
7071
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));

boot-reactor-kafka/receiver/src/test/java/com/example/demo/DemoApplicationTests.java

Lines changed: 0 additions & 13 deletions
This file was deleted.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package com.example.demo;
2+
3+
import org.apache.kafka.clients.producer.ProducerConfig;
4+
import org.apache.kafka.clients.producer.ProducerRecord;
5+
import org.apache.kafka.common.serialization.IntegerSerializer;
6+
import org.apache.kafka.common.serialization.StringSerializer;
7+
import org.junit.jupiter.api.Test;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.boot.test.context.SpringBootTest;
10+
import org.springframework.boot.webtestclient.autoconfigure.AutoConfigureWebTestClient;
11+
import org.springframework.context.annotation.Import;
12+
import org.springframework.http.MediaType;
13+
import org.springframework.test.web.reactive.server.WebTestClient;
14+
import org.testcontainers.kafka.KafkaContainer;
15+
import reactor.core.publisher.Flux;
16+
import reactor.kafka.sender.KafkaSender;
17+
import reactor.kafka.sender.SenderOptions;
18+
import reactor.kafka.sender.SenderRecord;
19+
import reactor.test.StepVerifier;
20+
21+
import java.time.Duration;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
26+
@Import(TestReceiverApplicationConfig.class)
27+
@AutoConfigureWebTestClient
28+
public class ReceiverApplicationTests {
29+
30+
@Autowired
31+
private WebTestClient webTestClient;
32+
33+
@Autowired
34+
private KafkaContainer kafkaContainer;
35+
36+
@Test
37+
public void testEventsStream() {
38+
String testMessage = "Hello from Test " + System.currentTimeMillis();
39+
40+
// 1. Instantiate KafkaSender to publish a message
41+
Map<String, Object> producerProps = new HashMap<>();
42+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
43+
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
44+
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
45+
46+
SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProps);
47+
KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);
48+
49+
sender.send(Flux.just(SenderRecord.create(new ProducerRecord<>(ReceiverApplication.HELLO_TOPIC, 1, testMessage), 1)))
50+
.doOnError(e -> System.err.println("Send failed: " + e.getMessage()))
51+
.subscribe();
52+
53+
// 2. Connect to GET /events and assert SSE stream
54+
webTestClient.get()
55+
.uri("/events")
56+
.accept(MediaType.TEXT_EVENT_STREAM)
57+
.exchange()
58+
.expectStatus().isOk()
59+
.returnResult(String.class)
60+
.getResponseBody()
61+
.as(StepVerifier::create)
62+
.expectNextMatches(received -> received.equals(testMessage))
63+
.thenCancel()
64+
.verify(Duration.ofSeconds(10));
65+
66+
sender.close();
67+
}
68+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.example.demo;
2+
3+
import org.springframework.boot.test.context.TestConfiguration;
4+
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
5+
import org.springframework.context.annotation.Bean;
6+
import org.testcontainers.kafka.KafkaContainer;
7+
import org.testcontainers.utility.DockerImageName;
8+
9+
@TestConfiguration(proxyBeanMethods = false)
10+
public class TestReceiverApplicationConfig {
11+
12+
@Bean
13+
@ServiceConnection
14+
KafkaContainer kafkaContainer() {
15+
return new KafkaContainer(DockerImageName.parse("apache/kafka-native").withTag("4.2.0"));
16+
}
17+
18+
}

boot-reactor-kafka/sender/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,21 @@
5757
<artifactId>spring-boot-starter-kafka-test</artifactId>
5858
<scope>test</scope>
5959
</dependency>
60+
<dependency>
61+
<groupId>org.springframework.boot</groupId>
62+
<artifactId>spring-boot-testcontainers</artifactId>
63+
<scope>test</scope>
64+
</dependency>
65+
<dependency>
66+
<groupId>org.testcontainers</groupId>
67+
<artifactId>testcontainers-junit-jupiter</artifactId>
68+
<scope>test</scope>
69+
</dependency>
70+
<dependency>
71+
<groupId>org.testcontainers</groupId>
72+
<artifactId>testcontainers-kafka</artifactId>
73+
<scope>test</scope>
74+
</dependency>
6075
</dependencies>
6176

6277
<build>

boot-reactor-kafka/sender/src/main/java/com/example/demo/SenderApplication.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.springframework.boot.SpringApplication;
1313
import org.springframework.boot.autoconfigure.SpringBootApplication;
1414
import org.springframework.boot.context.event.ApplicationStartedEvent;
15+
import org.springframework.boot.kafka.autoconfigure.KafkaConnectionDetails;
1516
import org.springframework.context.annotation.Bean;
1617
import org.springframework.context.event.EventListener;
1718
import org.springframework.http.ResponseEntity;
@@ -55,9 +56,9 @@ public void init(ApplicationStartedEvent event) {
5556
);
5657
}
5758

58-
private Map<String, Object> producerProps() {
59+
private Map<String, Object> producerProps(KafkaConnectionDetails kafkaConnectionDetails) {
5960
Map<String, Object> props = new HashMap<>();
60-
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
61+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConnectionDetails.getBootstrapServers());
6162
props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
6263
props.put(ProducerConfig.ACKS_CONFIG, "all");
6364
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
@@ -66,8 +67,8 @@ private Map<String, Object> producerProps() {
6667
}
6768

6869
@Bean
69-
KafkaSender<Integer, String> sender() {
70-
var senderOptions = SenderOptions.<Integer, String>create(producerProps());
70+
KafkaSender<Integer, String> sender(KafkaConnectionDetails kafkaConnectionDetails) {
71+
var senderOptions = SenderOptions.<Integer, String>create(producerProps(kafkaConnectionDetails));
7172
return KafkaSender.create(senderOptions);
7273
}
7374
}
@@ -86,7 +87,7 @@ public Mono<ResponseEntity<Object>> sendMessage(@RequestBody Message message) {
8687
Integer key = new SecureRandom().nextInt(Integer.MAX_VALUE);
8788
return messageRepository.save(message)
8889
.doOnSuccess(it -> {
89-
var notification = "Message #" + it.id() + " was sent at " + it.sentAt();
90+
var notification = "Message #" + it.id() + " with text '" + it.text() + "' was sent at " + it.sentAt();
9091
this.sender.send(Flux.just(SenderRecord.create(new ProducerRecord<>(SenderApplication.HELLO_TOPIC, key, notification), key)))
9192
.doOnError(e -> log.error("Send failed", e))
9293
.subscribe(r -> {
@@ -122,4 +123,4 @@ public Mono<Message> findById(UUID id) {
122123
}
123124

124125
record Message(UUID id, String text, LocalDateTime sentAt) {
125-
}
126+
}

boot-reactor-kafka/sender/src/test/java/com/example/demo/DemoApplicationTests.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

0 commit comments

Comments
 (0)