diff --git a/.github/workflows/spring-kafka-example.yml b/.github/workflows/spring-kafka-example.yml index a99b89f..fd6fed4 100644 --- a/.github/workflows/spring-kafka-example.yml +++ b/.github/workflows/spring-kafka-example.yml @@ -68,11 +68,19 @@ jobs: echo "Checking the health of the containers..." SERVICES="${{ steps.services.outputs.services }}" + for service in $SERVICES; do - status=$(docker inspect -f '{{.State.Running}}' "$service" || echo "false") - echo "$service status: $status" - if [ "$status" != "true" ]; then - echo "::error ::Service $service is not running." - exit 1 + running=$(docker inspect -f '{{.State.Running}}' "$service" 2>/dev/null || echo "false") + + if [ "$running" = "true" ]; then + echo "$service is running ✔️" + else + exit_code=$(docker inspect -f '{{.State.ExitCode}}' "$service" 2>/dev/null || echo "1") + if [ "$exit_code" = "0" ]; then + echo "$service is stopped but exited with code 0 ✔️" + else + echo "::error ::$service is stopped and exited with code $exit_code ❌" + exit 1 + fi fi - done \ No newline at end of file + done diff --git a/spring-kafka-example/compose.yaml b/spring-kafka-example/compose.yaml index 2552bd9..c14aefa 100644 --- a/spring-kafka-example/compose.yaml +++ b/spring-kafka-example/compose.yaml @@ -9,20 +9,59 @@ services: environment: SERVER_PORT: "80" SPRING_PROFILES_ACTIVE: "default" + KAFKA_TOPIC_1: spring-kafka-example-simple-topic + KAFKA_BOOTSTRAP_SERVERS: kafka:9092 + KAFKA_GROUP_ID: spring-kafka-example-group + KAFKA_AUTO_OFFSET_RESET: earliest + KAFKA_KEY_DESERIALIZER: org.apache.kafka.common.serialization.StringDeserializer + KAFKA_VALUE_DESERIALIZER: org.apache.kafka.common.serialization.StringDeserializer + KAFKA_MISSING_TOPICS_FATAL: false + depends_on: + - kafka + networks: + - kafka-net kafka: + image: apache/kafka:4.0.0 container_name: kafka - image: confluentinc/cp-kafka:8.0.0 ports: - "9092:9092" + - "9093:9093" environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT - KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093 + KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9094,PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9094 KAFKA_LOG_DIRS: /tmp/kraft-combined-logs - CLUSTER_ID: qGyb4Z0XQpeoKgUXYfCCLw + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + networks: + - kafka-net + + kafka-init: + container_name: kafka-init + image: apache/kafka:4.0.0 + depends_on: + kafka: + condition: service_started + command: [ "/bin/bash", "-c", "/create_topic.sh" ] + environment: + KAFKA_HOST: kafka + KAFKA_PORT: 9092 + KAFKA_TOPIC_CREATE: spring-kafka-example-simple-topic + volumes: + - type: bind + source: ./docker/create_topic.sh + target: /create_topic.sh + init: true + networks: + - kafka-net + +networks: + kafka-net: + driver: bridge + diff --git a/spring-kafka-example/docker/create_topic.sh b/spring-kafka-example/docker/create_topic.sh new file mode 100755 index 0000000..e3253bc --- /dev/null +++ b/spring-kafka-example/docker/create_topic.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +echo "Waiting for Kafka on port $KAFKA_PORT..." +while ! nc -z $KAFKA_HOST $KAFKA_PORT; do + sleep 2 +done + +/opt/kafka/bin/kafka-topics.sh \ + --create \ + --topic "$KAFKA_TOPIC_CREATE" \ + --bootstrap-server $KAFKA_HOST:$KAFKA_PORT \ + --partitions 1 \ + --replication-factor 1 + +echo "Topic $KAFKA_TOPIC_CREATE created successfully!" diff --git a/spring-kafka-example/pom.xml b/spring-kafka-example/pom.xml index e7800d7..6c05847 100644 --- a/spring-kafka-example/pom.xml +++ b/spring-kafka-example/pom.xml @@ -22,6 +22,8 @@ 21 3.5.4 3.3.8 + 1.18.38 + 3.1.1 @@ -32,12 +34,31 @@ ${spring.boot.version} + + org.springframework.boot + spring-boot-starter-validation + ${spring.boot.version} + + org.springframework.kafka spring-kafka ${spring.kafka.version} + + org.projectlombok + lombok + true + ${lombok.version} + + + + jakarta.validation + jakarta.validation-api + ${jakarta.validation.version} + + org.springframework.boot spring-boot-starter-test @@ -55,15 +76,38 @@ + + + org.apache.maven.plugins + maven-compiler-plugin + + + + org.projectlombok + lombok + + + + + org.springframework.boot spring-boot-maven-plugin ${spring.boot.version} + + + + org.projectlombok + lombok + + + + diff --git a/spring-kafka-example/src/main/java/com/io/example/config/JacksonConfig.java b/spring-kafka-example/src/main/java/com/io/example/config/JacksonConfig.java new file mode 100644 index 0000000..020a93e --- /dev/null +++ b/spring-kafka-example/src/main/java/com/io/example/config/JacksonConfig.java @@ -0,0 +1,19 @@ +package com.io.example.config; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class JacksonConfig { + + @Bean + public ObjectMapper objectMapper() { + return new ObjectMapper() + .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) + .setSerializationInclusion(JsonInclude.Include.NON_NULL); + } +} diff --git a/spring-kafka-example/src/main/java/com/io/example/consumer/KafkaConsumerService.java b/spring-kafka-example/src/main/java/com/io/example/consumer/KafkaConsumerService.java new file mode 100644 index 0000000..c775661 --- /dev/null +++ b/spring-kafka-example/src/main/java/com/io/example/consumer/KafkaConsumerService.java @@ -0,0 +1,6 @@ +package com.io.example.consumer; + +@SuppressWarnings("unused") +public interface KafkaConsumerService { + void consume(String message); +} diff --git a/spring-kafka-example/src/main/java/com/io/example/consumer/KafkaConsumerServiceImpl.java b/spring-kafka-example/src/main/java/com/io/example/consumer/KafkaConsumerServiceImpl.java new file mode 100644 index 0000000..bb3a35c --- /dev/null +++ b/spring-kafka-example/src/main/java/com/io/example/consumer/KafkaConsumerServiceImpl.java @@ -0,0 +1,24 @@ +package com.io.example.consumer; + +import com.io.example.controller.dto.request.MessageRequestDtoRequest; +import com.io.example.mapper.JsonMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +@SuppressWarnings("unused") +public class KafkaConsumerServiceImpl implements KafkaConsumerService { + + private final JsonMapper mapper; + + @KafkaListener(topics = "${spring.kafka.topics.topic-1}" , groupId = "${spring.kafka.consumer.group-id}") + public void consume(String message) { + MessageRequestDtoRequest messageRequestDtoRequest = mapper.toObject(message, MessageRequestDtoRequest.class); + log.info("Message receive: {} ", messageRequestDtoRequest.toString()); + } + +} diff --git a/spring-kafka-example/src/main/java/com/io/example/controller/ExampleController.java b/spring-kafka-example/src/main/java/com/io/example/controller/ExampleController.java new file mode 100644 index 0000000..ca62540 --- /dev/null +++ b/spring-kafka-example/src/main/java/com/io/example/controller/ExampleController.java @@ -0,0 +1,25 @@ +package com.io.example.controller; + +import com.io.example.controller.dto.request.MessageRequestDtoRequest; +import com.io.example.controller.dto.response.SimpleTopicDtoResponse; +import com.io.example.producer.KafkaProducerService; +import jakarta.validation.Valid; +import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +@RestController +@RequiredArgsConstructor +@RequestMapping("/v1/example") +public class ExampleController { + + private final KafkaProducerService kafkaProducerService; + + @PostMapping("/send/simple-topic") + public ResponseEntity sendMessageToSimpleTopic( @Valid @RequestBody MessageRequestDtoRequest dto){ + this.kafkaProducerService.sendMessage(dto); + return ResponseEntity.status(HttpStatus.OK).body(new SimpleTopicDtoResponse("Message sent successfully!")); + } + +} diff --git a/spring-kafka-example/src/main/java/com/io/example/controller/dto/request/MessageRequestDtoRequest.java b/spring-kafka-example/src/main/java/com/io/example/controller/dto/request/MessageRequestDtoRequest.java new file mode 100644 index 0000000..94ca3b3 --- /dev/null +++ b/spring-kafka-example/src/main/java/com/io/example/controller/dto/request/MessageRequestDtoRequest.java @@ -0,0 +1,10 @@ +package com.io.example.controller.dto.request; + +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; + +public record MessageRequestDtoRequest( + @NotNull(message = "message cannot be null.") + @NotBlank(message = "message cannot be blank.") + String message +) {} diff --git a/spring-kafka-example/src/main/java/com/io/example/controller/dto/response/SimpleTopicDtoResponse.java b/spring-kafka-example/src/main/java/com/io/example/controller/dto/response/SimpleTopicDtoResponse.java new file mode 100644 index 0000000..11b99a0 --- /dev/null +++ b/spring-kafka-example/src/main/java/com/io/example/controller/dto/response/SimpleTopicDtoResponse.java @@ -0,0 +1,3 @@ +package com.io.example.controller.dto.response; + +public record SimpleTopicDtoResponse( String message) {} diff --git a/spring-kafka-example/src/main/java/com/io/example/exception/handler/GlobalExceptionHandler.java b/spring-kafka-example/src/main/java/com/io/example/exception/handler/GlobalExceptionHandler.java new file mode 100644 index 0000000..1df763b --- /dev/null +++ b/spring-kafka-example/src/main/java/com/io/example/exception/handler/GlobalExceptionHandler.java @@ -0,0 +1,40 @@ +package com.io.example.exception.handler; + +import com.io.example.exception.responseBody.Error; +import jakarta.servlet.http.HttpServletRequest; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.validation.FieldError; +import org.springframework.web.bind.MethodArgumentNotValidException; +import org.springframework.web.bind.annotation.ControllerAdvice; +import org.springframework.web.bind.annotation.ExceptionHandler; + +@Slf4j +@ControllerAdvice +@SuppressWarnings("unused") +public class GlobalExceptionHandler { + + @ExceptionHandler(RuntimeException.class) + public ResponseEntity handleRuntimeException(RuntimeException ex, HttpServletRequest s) { + log.error("HandleRuntimeException= {}", ex.getMessage()); + return Error.response(ex.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR, s); + } + + @ExceptionHandler(MethodArgumentNotValidException.class) + public ResponseEntity handleMethodArgumentNotValidException(MethodArgumentNotValidException ex, HttpServletRequest s) { + log.error("HandleMethodArgumentNotValidException= {}", ex.getMessage()); + return Error.response(this.getFirstErrorMessage(ex), HttpStatus.BAD_REQUEST, s); + + } + + private String getFirstErrorMessage(MethodArgumentNotValidException e) { + return e.getBindingResult() + .getFieldErrors() + .stream() + .findFirst() + .map(FieldError::getDefaultMessage) + .orElse(e.getMessage()); + } + +} diff --git a/spring-kafka-example/src/main/java/com/io/example/exception/responseBody/Error.java b/spring-kafka-example/src/main/java/com/io/example/exception/responseBody/Error.java new file mode 100644 index 0000000..9b88736 --- /dev/null +++ b/spring-kafka-example/src/main/java/com/io/example/exception/responseBody/Error.java @@ -0,0 +1,42 @@ +package com.io.example.exception.responseBody; + +import jakarta.servlet.http.HttpServletRequest; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class Error { + private String timestamp; + private String path; + private Integer status; + private String error; + + public static ResponseEntity response(String message, HttpStatus status, HttpServletRequest request){ + return ResponseEntity + .status(status) + .body(Error.builder() + .timestamp(getInstantNow()) + .path(request.getRequestURI()) + .status(status.value()) + .error(message) + .build()); + } + + private static String getInstantNow() { + return Instant.now() + .atZone(ZoneId.systemDefault()) + .format(DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss")); + + } + +} \ No newline at end of file diff --git a/spring-kafka-example/src/main/java/com/io/example/mapper/JsonMapper.java b/spring-kafka-example/src/main/java/com/io/example/mapper/JsonMapper.java new file mode 100644 index 0000000..3525c1e --- /dev/null +++ b/spring-kafka-example/src/main/java/com/io/example/mapper/JsonMapper.java @@ -0,0 +1,34 @@ +package com.io.example.mapper; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class JsonMapper { + + private final ObjectMapper objectMapper; + + public String toJsonString(Object obj) { + try { + return objectMapper.writeValueAsString(obj); + } catch (JsonProcessingException e) { + log.error("error converting an object of type {} to json. cause of the error: {}", obj.getClass(), e.getMessage()); + throw new RuntimeException("JSON serialization failed", e); + } + } + + public T toObject(String jsonBody, Class typeClass){ + try { + return objectMapper.readValue(jsonBody, typeClass); + } catch (JsonProcessingException e) { + log.error("error converting an String to object. cause of the error: {}", e.getMessage()); + throw new IllegalArgumentException("Object deserialization failed for target type: " + typeClass.getSimpleName(), e); + } + } + +} diff --git a/spring-kafka-example/src/main/java/com/io/example/producer/KafkaProducerService.java b/spring-kafka-example/src/main/java/com/io/example/producer/KafkaProducerService.java new file mode 100644 index 0000000..f151631 --- /dev/null +++ b/spring-kafka-example/src/main/java/com/io/example/producer/KafkaProducerService.java @@ -0,0 +1,8 @@ +package com.io.example.producer; + +import com.io.example.controller.dto.request.MessageRequestDtoRequest; + +@FunctionalInterface +public interface KafkaProducerService { + void sendMessage(MessageRequestDtoRequest message); +} diff --git a/spring-kafka-example/src/main/java/com/io/example/producer/KafkaProducerServiceImpl.java b/spring-kafka-example/src/main/java/com/io/example/producer/KafkaProducerServiceImpl.java new file mode 100644 index 0000000..25e47e8 --- /dev/null +++ b/spring-kafka-example/src/main/java/com/io/example/producer/KafkaProducerServiceImpl.java @@ -0,0 +1,31 @@ +package com.io.example.producer; + +import com.io.example.controller.dto.request.MessageRequestDtoRequest; +import com.io.example.mapper.JsonMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +@SuppressWarnings("unused") +public class KafkaProducerServiceImpl implements KafkaProducerService { + + @SuppressWarnings("unused") + @Value("${spring.kafka.topics.topic-1}") + private String topic; + + private final KafkaTemplate kafkaTemplate; + private final JsonMapper mapper; + + @Override + public void sendMessage(MessageRequestDtoRequest message) { + log.info("Preparing to send message to Kafka topic: {}", topic); + kafkaTemplate.send(topic, mapper.toJsonString(message)); + log.info("Message successfully dispatched: {}", message); + } + +} diff --git a/spring-kafka-example/src/main/resources/application-dev.yml b/spring-kafka-example/src/main/resources/application-dev.yml index 68f4259..9d4654c 100644 --- a/spring-kafka-example/src/main/resources/application-dev.yml +++ b/spring-kafka-example/src/main/resources/application-dev.yml @@ -3,6 +3,18 @@ spring: application: name: spring-kafka-example-dev + kafka: + topics: + topic-1: ${KAFKA_TOPIC_1:spring-kafka-example-simple-topic} + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9093} + consumer: + group-id: ${KAFKA_GROUP_ID:spring-kafka-example-group} + auto-offset-reset: ${KAFKA_AUTO_OFFSET_RESET:earliest} + key-deserializer: ${KAFKA_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer} + value-deserializer: ${KAFKA_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer} + listener: + missing-topics-fatal: ${KAFKA_MISSING_TOPICS_FATAL:false} + logging: pattern: diff --git a/spring-kafka-example/src/main/resources/application.yml b/spring-kafka-example/src/main/resources/application.yml index faf30f6..374b9c9 100644 --- a/spring-kafka-example/src/main/resources/application.yml +++ b/spring-kafka-example/src/main/resources/application.yml @@ -3,6 +3,18 @@ spring: application: name: spring-kafka-example-prd + kafka: + topics: + topic-1: ${KAFKA_TOPIC_1} + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS} + consumer: + group-id: ${KAFKA_GROUP_ID} + auto-offset-reset: ${KAFKA_AUTO_OFFSET_RESET} + key-deserializer: ${KAFKA_KEY_DESERIALIZER} + value-deserializer: ${KAFKA_VALUE_DESERIALIZER} + listener: + missing-topics-fatal: ${KAFKA_MISSING_TOPICS_FATAL} + logging: pattern: diff --git a/spring-kafka-example/src/test/java/com/io/example/SpringKafkaExampleApplicationTests.java b/spring-kafka-example/src/test/java/com/io/example/SpringKafkaExampleApplicationTests.java deleted file mode 100644 index 27dda22..0000000 --- a/spring-kafka-example/src/test/java/com/io/example/SpringKafkaExampleApplicationTests.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.io.example; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -@SpringBootTest -class SpringKafkaExampleApplicationTests { - - @Test - void contextLoads() { - } - -}