Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions .github/workflows/spring-kafka-example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,19 @@ jobs:
echo "Checking the health of the containers..."

SERVICES="${{ steps.services.outputs.services }}"

for service in $SERVICES; do
status=$(docker inspect -f '{{.State.Running}}' "$service" || echo "false")
echo "$service status: $status"
if [ "$status" != "true" ]; then
echo "::error ::Service $service is not running."
exit 1
running=$(docker inspect -f '{{.State.Running}}' "$service" 2>/dev/null || echo "false")

if [ "$running" = "true" ]; then
echo "$service is running ✔️"
else
exit_code=$(docker inspect -f '{{.State.ExitCode}}' "$service" 2>/dev/null || echo "1")
if [ "$exit_code" = "0" ]; then
echo "$service is stopped but exited with code 0 ✔️"
else
echo "::error ::$service is stopped and exited with code $exit_code ❌"
exit 1
fi
fi
done
done
51 changes: 45 additions & 6 deletions spring-kafka-example/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,59 @@ services:
environment:
SERVER_PORT: "80"
SPRING_PROFILES_ACTIVE: "default"
KAFKA_TOPIC_1: spring-kafka-example-simple-topic
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
KAFKA_GROUP_ID: spring-kafka-example-group
KAFKA_AUTO_OFFSET_RESET: earliest
KAFKA_KEY_DESERIALIZER: org.apache.kafka.common.serialization.StringDeserializer
KAFKA_VALUE_DESERIALIZER: org.apache.kafka.common.serialization.StringDeserializer
KAFKA_MISSING_TOPICS_FATAL: false
Comment thread
igorcampos-dev marked this conversation as resolved.
depends_on:
- kafka
networks:
- kafka-net

kafka:
image: apache/kafka:4.0.0
container_name: kafka
image: confluentinc/cp-kafka:8.0.0
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9094,PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9094
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: qGyb4Z0XQpeoKgUXYfCCLw
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
networks:
- kafka-net

kafka-init:
container_name: kafka-init
image: apache/kafka:4.0.0
depends_on:
kafka:
condition: service_started
Comment thread
igorcampos-dev marked this conversation as resolved.
command: [ "/bin/bash", "-c", "/create_topic.sh" ]
Comment thread
igorcampos-dev marked this conversation as resolved.
environment:
KAFKA_HOST: kafka
KAFKA_PORT: 9092
KAFKA_TOPIC_CREATE: spring-kafka-example-simple-topic
volumes:
- type: bind
source: ./docker/create_topic.sh
target: /create_topic.sh
init: true
networks:
- kafka-net

networks:
kafka-net:
driver: bridge

15 changes: 15 additions & 0 deletions spring-kafka-example/docker/create_topic.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash

echo "Waiting for Kafka on port $KAFKA_PORT..."
while ! nc -z $KAFKA_HOST $KAFKA_PORT; do
sleep 2
done
Comment thread
igorcampos-dev marked this conversation as resolved.

/opt/kafka/bin/kafka-topics.sh \
--create \
--topic "$KAFKA_TOPIC_CREATE" \
--bootstrap-server $KAFKA_HOST:$KAFKA_PORT \
--partitions 1 \
--replication-factor 1
Comment thread
igorcampos-dev marked this conversation as resolved.

echo "Topic $KAFKA_TOPIC_CREATE created successfully!"
44 changes: 44 additions & 0 deletions spring-kafka-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
<java.version>21</java.version>
<spring.boot.version>3.5.4</spring.boot.version>
<spring.kafka.version>3.3.8</spring.kafka.version>
<lombok.version>1.18.38</lombok.version>
<jakarta.validation.version>3.1.1</jakarta.validation.version>
</properties>

<dependencies>
Expand All @@ -32,12 +34,31 @@
<version>${spring.boot.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<version>${spring.boot.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring.kafka.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
<version>${lombok.version}</version>
</dependency>

<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
<version>${jakarta.validation.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand All @@ -55,15 +76,38 @@
</dependencies>

<build>

<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>

<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>

</plugins>

</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.io.example.config;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JacksonConfig {

@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper()
.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.io.example.consumer;

@SuppressWarnings("unused")
Comment thread
igorcampos-dev marked this conversation as resolved.
public interface KafkaConsumerService {
void consume(String message);
Comment thread
igorcampos-dev marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.io.example.consumer;

import com.io.example.controller.dto.request.MessageRequestDtoRequest;
import com.io.example.mapper.JsonMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
@SuppressWarnings("unused")
public class KafkaConsumerServiceImpl implements KafkaConsumerService {
Comment thread
igorcampos-dev marked this conversation as resolved.

private final JsonMapper mapper;

@KafkaListener(topics = "${spring.kafka.topics.topic-1}" , groupId = "${spring.kafka.consumer.group-id}")
public void consume(String message) {
MessageRequestDtoRequest messageRequestDtoRequest = mapper.toObject(message, MessageRequestDtoRequest.class);
log.info("Message receive: {} ", messageRequestDtoRequest.toString());
}
Comment thread
igorcampos-dev marked this conversation as resolved.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.io.example.controller;

import com.io.example.controller.dto.request.MessageRequestDtoRequest;
import com.io.example.controller.dto.response.SimpleTopicDtoResponse;
import com.io.example.producer.KafkaProducerService;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequiredArgsConstructor
@RequestMapping("/v1/example")
public class ExampleController {

private final KafkaProducerService kafkaProducerService;

@PostMapping("/send/simple-topic")
public ResponseEntity<SimpleTopicDtoResponse> sendMessageToSimpleTopic( @Valid @RequestBody MessageRequestDtoRequest dto){
this.kafkaProducerService.sendMessage(dto);
return ResponseEntity.status(HttpStatus.OK).body(new SimpleTopicDtoResponse("Message sent successfully!"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.io.example.controller.dto.request;

import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;

public record MessageRequestDtoRequest(
@NotNull(message = "message cannot be null.")
@NotBlank(message = "message cannot be blank.")
String message
) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.io.example.controller.dto.response;

public record SimpleTopicDtoResponse( String message) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.io.example.exception.handler;

import com.io.example.exception.responseBody.Error;
import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;

@Slf4j
@ControllerAdvice
@SuppressWarnings("unused")
Comment thread
igorcampos-dev marked this conversation as resolved.
public class GlobalExceptionHandler {

@ExceptionHandler(RuntimeException.class)
public ResponseEntity<Error> handleRuntimeException(RuntimeException ex, HttpServletRequest s) {
log.error("HandleRuntimeException= {}", ex.getMessage());
return Error.response(ex.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR, s);
}

@ExceptionHandler(MethodArgumentNotValidException.class)
public ResponseEntity<Error> handleMethodArgumentNotValidException(MethodArgumentNotValidException ex, HttpServletRequest s) {
log.error("HandleMethodArgumentNotValidException= {}", ex.getMessage());
return Error.response(this.getFirstErrorMessage(ex), HttpStatus.BAD_REQUEST, s);

}

private String getFirstErrorMessage(MethodArgumentNotValidException e) {
return e.getBindingResult()
.getFieldErrors()
.stream()
.findFirst()
.map(FieldError::getDefaultMessage)
.orElse(e.getMessage());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.io.example.exception.responseBody;

import jakarta.servlet.http.HttpServletRequest;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Error {
private String timestamp;
private String path;
private Integer status;
private String error;

public static ResponseEntity<Error> response(String message, HttpStatus status, HttpServletRequest request){
return ResponseEntity
.status(status)
.body(Error.builder()
.timestamp(getInstantNow())
.path(request.getRequestURI())
.status(status.value())
.error(message)
.build());
}

private static String getInstantNow() {
return Instant.now()
.atZone(ZoneId.systemDefault())
.format(DateTimeFormatter.ofPattern("dd-MM-yyyy HH:mm:ss"));

}
Comment thread
igorcampos-dev marked this conversation as resolved.

}
Loading