diff --git a/.github/workflows/spring-kafka-example.yml b/.github/workflows/spring-kafka-example.yml
index a99b89f..fd6fed4 100644
--- a/.github/workflows/spring-kafka-example.yml
+++ b/.github/workflows/spring-kafka-example.yml
@@ -68,11 +68,19 @@ jobs:
echo "Checking the health of the containers..."
SERVICES="${{ steps.services.outputs.services }}"
+
for service in $SERVICES; do
- status=$(docker inspect -f '{{.State.Running}}' "$service" || echo "false")
- echo "$service status: $status"
- if [ "$status" != "true" ]; then
- echo "::error ::Service $service is not running."
- exit 1
+ running=$(docker inspect -f '{{.State.Running}}' "$service" 2>/dev/null || echo "false")
+
+ if [ "$running" = "true" ]; then
+ echo "$service is running ✔️"
+ else
+ exit_code=$(docker inspect -f '{{.State.ExitCode}}' "$service" 2>/dev/null || echo "1")
+ if [ "$exit_code" = "0" ]; then
+ echo "$service is stopped but exited with code 0 ✔️"
+ else
+ echo "::error ::$service is stopped and exited with code $exit_code ❌"
+ exit 1
+ fi
fi
- done
\ No newline at end of file
+ done
diff --git a/spring-kafka-example/compose.yaml b/spring-kafka-example/compose.yaml
index 2552bd9..c14aefa 100644
--- a/spring-kafka-example/compose.yaml
+++ b/spring-kafka-example/compose.yaml
@@ -9,20 +9,59 @@ services:
environment:
SERVER_PORT: "80"
SPRING_PROFILES_ACTIVE: "default"
+ KAFKA_TOPIC_1: spring-kafka-example-simple-topic
+ KAFKA_BOOTSTRAP_SERVERS: kafka:9092
+ KAFKA_GROUP_ID: spring-kafka-example-group
+ KAFKA_AUTO_OFFSET_RESET: earliest
+ KAFKA_KEY_DESERIALIZER: org.apache.kafka.common.serialization.StringDeserializer
+ KAFKA_VALUE_DESERIALIZER: org.apache.kafka.common.serialization.StringDeserializer
+ KAFKA_MISSING_TOPICS_FATAL: false
+ depends_on:
+ - kafka
+ networks:
+ - kafka-net
kafka:
+ image: apache/kafka:4.0.0
container_name: kafka
- image: confluentinc/cp-kafka:8.0.0
ports:
- "9092:9092"
+ - "9093:9093"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
- KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
+ KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9094,PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9093
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
+ KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9094
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
- CLUSTER_ID: qGyb4Z0XQpeoKgUXYfCCLw
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
+ networks:
+ - kafka-net
+
+ kafka-init:
+ container_name: kafka-init
+ image: apache/kafka:4.0.0
+ depends_on:
+ kafka:
+ condition: service_started
+ command: [ "/bin/bash", "-c", "/create_topic.sh" ]
+ environment:
+ KAFKA_HOST: kafka
+ KAFKA_PORT: 9092
+ KAFKA_TOPIC_CREATE: spring-kafka-example-simple-topic
+ volumes:
+ - type: bind
+ source: ./docker/create_topic.sh
+ target: /create_topic.sh
+ init: true
+ networks:
+ - kafka-net
+
+networks:
+ kafka-net:
+ driver: bridge
+
diff --git a/spring-kafka-example/docker/create_topic.sh b/spring-kafka-example/docker/create_topic.sh
new file mode 100755
index 0000000..e3253bc
--- /dev/null
+++ b/spring-kafka-example/docker/create_topic.sh
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+echo "Waiting for Kafka on port $KAFKA_PORT..."
+while ! nc -z $KAFKA_HOST $KAFKA_PORT; do
+ sleep 2
+done
+
+/opt/kafka/bin/kafka-topics.sh \
+ --create \
+ --topic "$KAFKA_TOPIC_CREATE" \
+ --bootstrap-server $KAFKA_HOST:$KAFKA_PORT \
+ --partitions 1 \
+ --replication-factor 1
+
+echo "Topic $KAFKA_TOPIC_CREATE created successfully!"
diff --git a/spring-kafka-example/pom.xml b/spring-kafka-example/pom.xml
index e7800d7..6c05847 100644
--- a/spring-kafka-example/pom.xml
+++ b/spring-kafka-example/pom.xml
@@ -22,6 +22,8 @@
21
3.5.4
3.3.8
+ 1.18.38
+ 3.1.1
@@ -32,12 +34,31 @@
${spring.boot.version}
+
+ org.springframework.boot
+ spring-boot-starter-validation
+ ${spring.boot.version}
+
+
org.springframework.kafka
spring-kafka
${spring.kafka.version}
+
+ org.projectlombok
+ lombok
+ true
+ ${lombok.version}
+
+
+
+ jakarta.validation
+ jakarta.validation-api
+ ${jakarta.validation.version}
+
+
org.springframework.boot
spring-boot-starter-test
@@ -55,15 +76,38 @@
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+
org.springframework.boot
spring-boot-maven-plugin
${spring.boot.version}
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+
diff --git a/spring-kafka-example/src/main/java/com/io/example/config/JacksonConfig.java b/spring-kafka-example/src/main/java/com/io/example/config/JacksonConfig.java
new file mode 100644
index 0000000..020a93e
--- /dev/null
+++ b/spring-kafka-example/src/main/java/com/io/example/config/JacksonConfig.java
@@ -0,0 +1,19 @@
+package com.io.example.config;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class JacksonConfig {
+
+ @Bean
+ public ObjectMapper objectMapper() {
+ return new ObjectMapper()
+ .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
+ .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
+ .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ }
+}
diff --git a/spring-kafka-example/src/main/java/com/io/example/consumer/KafkaConsumerService.java b/spring-kafka-example/src/main/java/com/io/example/consumer/KafkaConsumerService.java
new file mode 100644
index 0000000..c775661
--- /dev/null
+++ b/spring-kafka-example/src/main/java/com/io/example/consumer/KafkaConsumerService.java
@@ -0,0 +1,6 @@
+package com.io.example.consumer;
+
+@SuppressWarnings("unused")
+public interface KafkaConsumerService {
+ void consume(String message);
+}
diff --git a/spring-kafka-example/src/main/java/com/io/example/consumer/KafkaConsumerServiceImpl.java b/spring-kafka-example/src/main/java/com/io/example/consumer/KafkaConsumerServiceImpl.java
new file mode 100644
index 0000000..bb3a35c
--- /dev/null
+++ b/spring-kafka-example/src/main/java/com/io/example/consumer/KafkaConsumerServiceImpl.java
@@ -0,0 +1,24 @@
+package com.io.example.consumer;
+
+import com.io.example.controller.dto.request.MessageRequestDtoRequest;
+import com.io.example.mapper.JsonMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+@SuppressWarnings("unused")
+public class KafkaConsumerServiceImpl implements KafkaConsumerService {
+
+ private final JsonMapper mapper;
+
+ @KafkaListener(topics = "${spring.kafka.topics.topic-1}" , groupId = "${spring.kafka.consumer.group-id}")
+ public void consume(String message) {
+ MessageRequestDtoRequest messageRequestDtoRequest = mapper.toObject(message, MessageRequestDtoRequest.class);
+ log.info("Message receive: {} ", messageRequestDtoRequest.toString());
+ }
+
+}
diff --git a/spring-kafka-example/src/main/java/com/io/example/controller/ExampleController.java b/spring-kafka-example/src/main/java/com/io/example/controller/ExampleController.java
new file mode 100644
index 0000000..ca62540
--- /dev/null
+++ b/spring-kafka-example/src/main/java/com/io/example/controller/ExampleController.java
@@ -0,0 +1,25 @@
+package com.io.example.controller;
+
+import com.io.example.controller.dto.request.MessageRequestDtoRequest;
+import com.io.example.controller.dto.response.SimpleTopicDtoResponse;
+import com.io.example.producer.KafkaProducerService;
+import jakarta.validation.Valid;
+import lombok.RequiredArgsConstructor;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+
+@RestController
+@RequiredArgsConstructor
+@RequestMapping("/v1/example")
+public class ExampleController {
+
+ private final KafkaProducerService kafkaProducerService;
+
+ @PostMapping("/send/simple-topic")
+ public ResponseEntity sendMessageToSimpleTopic( @Valid @RequestBody MessageRequestDtoRequest dto){
+ this.kafkaProducerService.sendMessage(dto);
+ return ResponseEntity.status(HttpStatus.OK).body(new SimpleTopicDtoResponse("Message sent successfully!"));
+ }
+
+}
diff --git a/spring-kafka-example/src/main/java/com/io/example/controller/dto/request/MessageRequestDtoRequest.java b/spring-kafka-example/src/main/java/com/io/example/controller/dto/request/MessageRequestDtoRequest.java
new file mode 100644
index 0000000..94ca3b3
--- /dev/null
+++ b/spring-kafka-example/src/main/java/com/io/example/controller/dto/request/MessageRequestDtoRequest.java
@@ -0,0 +1,10 @@
+package com.io.example.controller.dto.request;
+
+import jakarta.validation.constraints.NotBlank;
+import jakarta.validation.constraints.NotNull;
+
+public record MessageRequestDtoRequest(
+ @NotNull(message = "message cannot be null.")
+ @NotBlank(message = "message cannot be blank.")
+ String message
+) {}
diff --git a/spring-kafka-example/src/main/java/com/io/example/controller/dto/response/SimpleTopicDtoResponse.java b/spring-kafka-example/src/main/java/com/io/example/controller/dto/response/SimpleTopicDtoResponse.java
new file mode 100644
index 0000000..11b99a0
--- /dev/null
+++ b/spring-kafka-example/src/main/java/com/io/example/controller/dto/response/SimpleTopicDtoResponse.java
@@ -0,0 +1,3 @@
+package com.io.example.controller.dto.response;
+
+public record SimpleTopicDtoResponse( String message) {}
diff --git a/spring-kafka-example/src/main/java/com/io/example/exception/handler/GlobalExceptionHandler.java b/spring-kafka-example/src/main/java/com/io/example/exception/handler/GlobalExceptionHandler.java
new file mode 100644
index 0000000..1df763b
--- /dev/null
+++ b/spring-kafka-example/src/main/java/com/io/example/exception/handler/GlobalExceptionHandler.java
@@ -0,0 +1,40 @@
+package com.io.example.exception.handler;
+
+import com.io.example.exception.responseBody.Error;
+import jakarta.servlet.http.HttpServletRequest;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.validation.FieldError;
+import org.springframework.web.bind.MethodArgumentNotValidException;
+import org.springframework.web.bind.annotation.ControllerAdvice;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+
+@Slf4j
+@ControllerAdvice
+@SuppressWarnings("unused")
+public class GlobalExceptionHandler {
+
+ @ExceptionHandler(RuntimeException.class)
+ public ResponseEntity handleRuntimeException(RuntimeException ex, HttpServletRequest s) {
+ log.error("HandleRuntimeException= {}", ex.getMessage());
+ return Error.response(ex.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR, s);
+ }
+
+ @ExceptionHandler(MethodArgumentNotValidException.class)
+ public ResponseEntity handleMethodArgumentNotValidException(MethodArgumentNotValidException ex, HttpServletRequest s) {
+ log.error("HandleMethodArgumentNotValidException= {}", ex.getMessage());
+ return Error.response(this.getFirstErrorMessage(ex), HttpStatus.BAD_REQUEST, s);
+
+ }
+
+ private String getFirstErrorMessage(MethodArgumentNotValidException e) {
+ return e.getBindingResult()
+ .getFieldErrors()
+ .stream()
+ .findFirst()
+ .map(FieldError::getDefaultMessage)
+ .orElse(e.getMessage());
+ }
+
+}
diff --git a/spring-kafka-example/src/main/java/com/io/example/exception/responseBody/Error.java b/spring-kafka-example/src/main/java/com/io/example/exception/responseBody/Error.java
new file mode 100644
index 0000000..9b88736
--- /dev/null
+++ b/spring-kafka-example/src/main/java/com/io/example/exception/responseBody/Error.java
@@ -0,0 +1,42 @@
+package com.io.example.exception.responseBody;
+
+import jakarta.servlet.http.HttpServletRequest;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class Error {
+ private String timestamp;
+ private String path;
+ private Integer status;
+ private String error;
+
+ public static ResponseEntity response(String message, HttpStatus status, HttpServletRequest request){
+ return ResponseEntity
+ .status(status)
+ .body(Error.builder()
+ .timestamp(getInstantNow())
+ .path(request.getRequestURI())
+ .status(status.value())
+ .error(message)
+ .build());
+ }
+
+ private static String getInstantNow() {
+ return Instant.now()
+ .atZone(ZoneId.systemDefault())
+ .format(DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss"));
+
+ }
+
+}
\ No newline at end of file
diff --git a/spring-kafka-example/src/main/java/com/io/example/mapper/JsonMapper.java b/spring-kafka-example/src/main/java/com/io/example/mapper/JsonMapper.java
new file mode 100644
index 0000000..3525c1e
--- /dev/null
+++ b/spring-kafka-example/src/main/java/com/io/example/mapper/JsonMapper.java
@@ -0,0 +1,34 @@
+package com.io.example.mapper;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class JsonMapper {
+
+ private final ObjectMapper objectMapper;
+
+ public String toJsonString(Object obj) {
+ try {
+ return objectMapper.writeValueAsString(obj);
+ } catch (JsonProcessingException e) {
+ log.error("error converting an object of type {} to json. cause of the error: {}", obj.getClass(), e.getMessage());
+ throw new RuntimeException("JSON serialization failed", e);
+ }
+ }
+
+ public T toObject(String jsonBody, Class typeClass){
+ try {
+ return objectMapper.readValue(jsonBody, typeClass);
+ } catch (JsonProcessingException e) {
+ log.error("error converting an String to object. cause of the error: {}", e.getMessage());
+ throw new IllegalArgumentException("Object deserialization failed for target type: " + typeClass.getSimpleName(), e);
+ }
+ }
+
+}
diff --git a/spring-kafka-example/src/main/java/com/io/example/producer/KafkaProducerService.java b/spring-kafka-example/src/main/java/com/io/example/producer/KafkaProducerService.java
new file mode 100644
index 0000000..f151631
--- /dev/null
+++ b/spring-kafka-example/src/main/java/com/io/example/producer/KafkaProducerService.java
@@ -0,0 +1,8 @@
+package com.io.example.producer;
+
+import com.io.example.controller.dto.request.MessageRequestDtoRequest;
+
+@FunctionalInterface
+public interface KafkaProducerService {
+ void sendMessage(MessageRequestDtoRequest message);
+}
diff --git a/spring-kafka-example/src/main/java/com/io/example/producer/KafkaProducerServiceImpl.java b/spring-kafka-example/src/main/java/com/io/example/producer/KafkaProducerServiceImpl.java
new file mode 100644
index 0000000..25e47e8
--- /dev/null
+++ b/spring-kafka-example/src/main/java/com/io/example/producer/KafkaProducerServiceImpl.java
@@ -0,0 +1,31 @@
+package com.io.example.producer;
+
+import com.io.example.controller.dto.request.MessageRequestDtoRequest;
+import com.io.example.mapper.JsonMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+@SuppressWarnings("unused")
+public class KafkaProducerServiceImpl implements KafkaProducerService {
+
+ @SuppressWarnings("unused")
+ @Value("${spring.kafka.topics.topic-1}")
+ private String topic;
+
+ private final KafkaTemplate kafkaTemplate;
+ private final JsonMapper mapper;
+
+ @Override
+ public void sendMessage(MessageRequestDtoRequest message) {
+ log.info("Preparing to send message to Kafka topic: {}", topic);
+ kafkaTemplate.send(topic, mapper.toJsonString(message));
+ log.info("Message successfully dispatched: {}", message);
+ }
+
+}
diff --git a/spring-kafka-example/src/main/resources/application-dev.yml b/spring-kafka-example/src/main/resources/application-dev.yml
index 68f4259..9d4654c 100644
--- a/spring-kafka-example/src/main/resources/application-dev.yml
+++ b/spring-kafka-example/src/main/resources/application-dev.yml
@@ -3,6 +3,18 @@ spring:
application:
name: spring-kafka-example-dev
+ kafka:
+ topics:
+ topic-1: ${KAFKA_TOPIC_1:spring-kafka-example-simple-topic}
+ bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9093}
+ consumer:
+ group-id: ${KAFKA_GROUP_ID:spring-kafka-example-group}
+ auto-offset-reset: ${KAFKA_AUTO_OFFSET_RESET:earliest}
+ key-deserializer: ${KAFKA_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
+ value-deserializer: ${KAFKA_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
+ listener:
+ missing-topics-fatal: ${KAFKA_MISSING_TOPICS_FATAL:false}
+
logging:
pattern:
diff --git a/spring-kafka-example/src/main/resources/application.yml b/spring-kafka-example/src/main/resources/application.yml
index faf30f6..374b9c9 100644
--- a/spring-kafka-example/src/main/resources/application.yml
+++ b/spring-kafka-example/src/main/resources/application.yml
@@ -3,6 +3,18 @@ spring:
application:
name: spring-kafka-example-prd
+ kafka:
+ topics:
+ topic-1: ${KAFKA_TOPIC_1}
+ bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
+ consumer:
+ group-id: ${KAFKA_GROUP_ID}
+ auto-offset-reset: ${KAFKA_AUTO_OFFSET_RESET}
+ key-deserializer: ${KAFKA_KEY_DESERIALIZER}
+ value-deserializer: ${KAFKA_VALUE_DESERIALIZER}
+ listener:
+ missing-topics-fatal: ${KAFKA_MISSING_TOPICS_FATAL}
+
logging:
pattern:
diff --git a/spring-kafka-example/src/test/java/com/io/example/SpringKafkaExampleApplicationTests.java b/spring-kafka-example/src/test/java/com/io/example/SpringKafkaExampleApplicationTests.java
deleted file mode 100644
index 27dda22..0000000
--- a/spring-kafka-example/src/test/java/com/io/example/SpringKafkaExampleApplicationTests.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.io.example;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.boot.test.context.SpringBootTest;
-
-@SpringBootTest
-class SpringKafkaExampleApplicationTests {
-
- @Test
- void contextLoads() {
- }
-
-}