Skip to content

Commit 1c91dc9

Browse files
Merge pull request #40 from igorcampos-dev/feature/kafka
Feature/kafka
2 parents 68bf3a3 + b34ad1f commit 1c91dc9

18 files changed

Lines changed: 384 additions & 25 deletions

File tree

.github/workflows/spring-kafka-example.yml

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,19 @@ jobs:
6868
echo "Checking the health of the containers..."
6969
7070
SERVICES="${{ steps.services.outputs.services }}"
71+
7172
for service in $SERVICES; do
72-
status=$(docker inspect -f '{{.State.Running}}' "$service" || echo "false")
73-
echo "$service status: $status"
74-
if [ "$status" != "true" ]; then
75-
echo "::error ::Service $service is not running."
76-
exit 1
73+
running=$(docker inspect -f '{{.State.Running}}' "$service" 2>/dev/null || echo "false")
74+
75+
if [ "$running" = "true" ]; then
76+
echo "$service is running ✔️"
77+
else
78+
exit_code=$(docker inspect -f '{{.State.ExitCode}}' "$service" 2>/dev/null || echo "1")
79+
if [ "$exit_code" = "0" ]; then
80+
echo "$service is stopped but exited with code 0 ✔️"
81+
else
82+
echo "::error ::$service is stopped and exited with code $exit_code ❌"
83+
exit 1
84+
fi
7785
fi
78-
done
86+
done

spring-kafka-example/compose.yaml

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,59 @@ services:
99
environment:
1010
SERVER_PORT: "80"
1111
SPRING_PROFILES_ACTIVE: "default"
12+
KAFKA_TOPIC_1: spring-kafka-example-simple-topic
13+
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
14+
KAFKA_GROUP_ID: spring-kafka-example-group
15+
KAFKA_AUTO_OFFSET_RESET: earliest
16+
KAFKA_KEY_DESERIALIZER: org.apache.kafka.common.serialization.StringDeserializer
17+
KAFKA_VALUE_DESERIALIZER: org.apache.kafka.common.serialization.StringDeserializer
18+
KAFKA_MISSING_TOPICS_FATAL: false
19+
depends_on:
20+
- kafka
21+
networks:
22+
- kafka-net
1223

1324
kafka:
25+
image: apache/kafka:4.0.0
1426
container_name: kafka
15-
image: confluentinc/cp-kafka:8.0.0
1627
ports:
1728
- "9092:9092"
29+
- "9093:9093"
1830
environment:
1931
KAFKA_NODE_ID: 1
2032
KAFKA_PROCESS_ROLES: broker,controller
21-
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
22-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
23-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
24-
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
33+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
34+
KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9094,PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9093
35+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
2536
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
2637
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
38+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9094
2739
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
28-
CLUSTER_ID: qGyb4Z0XQpeoKgUXYfCCLw
40+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
41+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
42+
networks:
43+
- kafka-net
44+
45+
kafka-init:
46+
container_name: kafka-init
47+
image: apache/kafka:4.0.0
48+
depends_on:
49+
kafka:
50+
condition: service_started
51+
command: [ "/bin/bash", "-c", "/create_topic.sh" ]
52+
environment:
53+
KAFKA_HOST: kafka
54+
KAFKA_PORT: 9092
55+
KAFKA_TOPIC_CREATE: spring-kafka-example-simple-topic
56+
volumes:
57+
- type: bind
58+
source: ./docker/create_topic.sh
59+
target: /create_topic.sh
60+
init: true
61+
networks:
62+
- kafka-net
63+
64+
networks:
65+
kafka-net:
66+
driver: bridge
67+
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#!/bin/bash
2+
3+
echo "Waiting for Kafka on port $KAFKA_PORT..."
4+
while ! nc -z $KAFKA_HOST $KAFKA_PORT; do
5+
sleep 2
6+
done
7+
8+
/opt/kafka/bin/kafka-topics.sh \
9+
--create \
10+
--topic "$KAFKA_TOPIC_CREATE" \
11+
--bootstrap-server $KAFKA_HOST:$KAFKA_PORT \
12+
--partitions 1 \
13+
--replication-factor 1
14+
15+
echo "Topic $KAFKA_TOPIC_CREATE created successfully!"

spring-kafka-example/pom.xml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
<java.version>21</java.version>
2323
<spring.boot.version>3.5.4</spring.boot.version>
2424
<spring.kafka.version>3.3.8</spring.kafka.version>
25+
<lombok.version>1.18.38</lombok.version>
26+
<jakarta.validation.version>3.1.1</jakarta.validation.version>
2527
</properties>
2628

2729
<dependencies>
@@ -32,12 +34,31 @@
3234
<version>${spring.boot.version}</version>
3335
</dependency>
3436

37+
<dependency>
38+
<groupId>org.springframework.boot</groupId>
39+
<artifactId>spring-boot-starter-validation</artifactId>
40+
<version>${spring.boot.version}</version>
41+
</dependency>
42+
3543
<dependency>
3644
<groupId>org.springframework.kafka</groupId>
3745
<artifactId>spring-kafka</artifactId>
3846
<version>${spring.kafka.version}</version>
3947
</dependency>
4048

49+
<dependency>
50+
<groupId>org.projectlombok</groupId>
51+
<artifactId>lombok</artifactId>
52+
<optional>true</optional>
53+
<version>${lombok.version}</version>
54+
</dependency>
55+
56+
<dependency>
57+
<groupId>jakarta.validation</groupId>
58+
<artifactId>jakarta.validation-api</artifactId>
59+
<version>${jakarta.validation.version}</version>
60+
</dependency>
61+
4162
<dependency>
4263
<groupId>org.springframework.boot</groupId>
4364
<artifactId>spring-boot-starter-test</artifactId>
@@ -55,15 +76,38 @@
5576
</dependencies>
5677

5778
<build>
79+
5880
<plugins>
5981

82+
<plugin>
83+
<groupId>org.apache.maven.plugins</groupId>
84+
<artifactId>maven-compiler-plugin</artifactId>
85+
<configuration>
86+
<annotationProcessorPaths>
87+
<path>
88+
<groupId>org.projectlombok</groupId>
89+
<artifactId>lombok</artifactId>
90+
</path>
91+
</annotationProcessorPaths>
92+
</configuration>
93+
</plugin>
94+
6095
<plugin>
6196
<groupId>org.springframework.boot</groupId>
6297
<artifactId>spring-boot-maven-plugin</artifactId>
6398
<version>${spring.boot.version}</version>
99+
<configuration>
100+
<excludes>
101+
<exclude>
102+
<groupId>org.projectlombok</groupId>
103+
<artifactId>lombok</artifactId>
104+
</exclude>
105+
</excludes>
106+
</configuration>
64107
</plugin>
65108

66109
</plugins>
110+
67111
</build>
68112

69113
</project>
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.io.example.config;
2+
3+
import com.fasterxml.jackson.annotation.JsonInclude;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.databind.SerializationFeature;
6+
import org.springframework.context.annotation.Bean;
7+
import org.springframework.context.annotation.Configuration;
8+
9+
@Configuration
10+
public class JacksonConfig {
11+
12+
@Bean
13+
public ObjectMapper objectMapper() {
14+
return new ObjectMapper()
15+
.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
16+
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
17+
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
18+
}
19+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.io.example.consumer;
2+
3+
@SuppressWarnings("unused")
4+
public interface KafkaConsumerService {
5+
void consume(String message);
6+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.io.example.consumer;
2+
3+
import com.io.example.controller.dto.request.MessageRequestDtoRequest;
4+
import com.io.example.mapper.JsonMapper;
5+
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.kafka.annotation.KafkaListener;
8+
import org.springframework.stereotype.Service;
9+
10+
@Slf4j
11+
@Service
12+
@RequiredArgsConstructor
13+
@SuppressWarnings("unused")
14+
public class KafkaConsumerServiceImpl implements KafkaConsumerService {
15+
16+
private final JsonMapper mapper;
17+
18+
@KafkaListener(topics = "${spring.kafka.topics.topic-1}" , groupId = "${spring.kafka.consumer.group-id}")
19+
public void consume(String message) {
20+
MessageRequestDtoRequest messageRequestDtoRequest = mapper.toObject(message, MessageRequestDtoRequest.class);
21+
log.info("Message receive: {} ", messageRequestDtoRequest.toString());
22+
}
23+
24+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.io.example.controller;
2+
3+
import com.io.example.controller.dto.request.MessageRequestDtoRequest;
4+
import com.io.example.controller.dto.response.SimpleTopicDtoResponse;
5+
import com.io.example.producer.KafkaProducerService;
6+
import jakarta.validation.Valid;
7+
import lombok.RequiredArgsConstructor;
8+
import org.springframework.http.HttpStatus;
9+
import org.springframework.http.ResponseEntity;
10+
import org.springframework.web.bind.annotation.*;
11+
12+
@RestController
13+
@RequiredArgsConstructor
14+
@RequestMapping("/v1/example")
15+
public class ExampleController {
16+
17+
private final KafkaProducerService kafkaProducerService;
18+
19+
@PostMapping("/send/simple-topic")
20+
public ResponseEntity<SimpleTopicDtoResponse> sendMessageToSimpleTopic( @Valid @RequestBody MessageRequestDtoRequest dto){
21+
this.kafkaProducerService.sendMessage(dto);
22+
return ResponseEntity.status(HttpStatus.OK).body(new SimpleTopicDtoResponse("Message sent successfully!"));
23+
}
24+
25+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.io.example.controller.dto.request;
2+
3+
import jakarta.validation.constraints.NotBlank;
4+
import jakarta.validation.constraints.NotNull;
5+
6+
public record MessageRequestDtoRequest(
7+
@NotNull(message = "message cannot be null.")
8+
@NotBlank(message = "message cannot be blank.")
9+
String message
10+
) {}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package com.io.example.controller.dto.response;
2+
3+
public record SimpleTopicDtoResponse( String message) {}

0 commit comments

Comments
 (0)