Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/style-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ jobs:
java-style:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- uses: actions/setup-java@v5
with:
java-version: '21'
distribution: 'temurin'
- name: Run Google Java Format
uses: axel-op/googlejavaformat-action@v4
with:
Expand Down
4 changes: 4 additions & 0 deletions backend/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ dependencies {
testImplementation 'com.github.database-rider:rider-spring:1.44.0'
testImplementation 'org.testcontainers:junit-jupiter'
testImplementation "com.redis:testcontainers-redis:2.2.4"
testImplementation 'org.testcontainers:kafka:1.21.3'

/* KAFKA */
implementation 'org.springframework.kafka:spring-kafka'

/* ETC */
implementation 'org.apache.commons:commons-lang3:3.12.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.f1.backend.domain.stat.app;

import com.fasterxml.jackson.databind.ObjectMapper;

import io.f1.backend.domain.stat.dao.StatBatchRepository;
import io.f1.backend.domain.stat.dto.StatChangeEvent;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
import java.util.List;

@Slf4j
@Component
@RequiredArgsConstructor
public class StatKafkaConsumer {

private final StatBatchRepository statBatchRepository;
private final ObjectMapper objectMapper;

@Transactional
@KafkaListener(topics = "stat-changes")
public void handleStatChanges(List<String> messages) {

try {
List<StatChangeEvent> events = new ArrayList<>();
for (String message : messages) {
StatChangeEvent event = objectMapper.readValue(message, StatChangeEvent.class);
events.add(event);
}

statBatchRepository.batchUpdateStats(events);

} catch (Exception e) {
log.error("Failed to process stat change events", e);
throw new RuntimeException("Failed to process stat change events", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.f1.backend.domain.stat.app;

import io.f1.backend.domain.stat.dao.StatRepository;
import io.f1.backend.domain.stat.dto.StatChangeEvent;
import io.f1.backend.domain.stat.dto.StatPageResponse;
import io.f1.backend.global.exception.CustomException;
import io.f1.backend.global.exception.errorcode.RoomErrorCode;
import io.f1.backend.global.util.kafka.KafkaProducer;

import lombok.RequiredArgsConstructor;

Expand All @@ -18,6 +20,7 @@
public class StatService {

private final StatRepository statRepository;
private final KafkaProducer kafkaProducer;

@Transactional(readOnly = true)
public StatPageResponse getRanks(Pageable pageable, String nickname) {
Expand All @@ -36,9 +39,11 @@ public StatPageResponse getRanks(Pageable pageable, String nickname) {
return response;
}

// TODO: 게임 종료 후 호출 필요
public void updateRank(long userId, boolean win, int deltaScore) {
statRepository.updateRank(userId, win, deltaScore);

StatChangeEvent event = StatChangeEvent.of(userId, win, deltaScore);
kafkaProducer.sendWithKey("stat-changes", String.valueOf(userId), event);
}

public void addUser(long userId, String nickname) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.f1.backend.domain.stat.dao;

import io.f1.backend.domain.stat.dto.StatChangeEvent;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;

import java.util.List;
import java.util.stream.Collectors;

@Slf4j
@Repository
@RequiredArgsConstructor
public class StatBatchRepository {

private final JdbcTemplate jdbcTemplate;

public void batchUpdateStats(List<StatChangeEvent> events) {
if (events.isEmpty()) {
return;
}

List<StatChangeEvent> winEvents = events.stream().filter(StatChangeEvent::isWin).toList();

List<StatChangeEvent> loseEvents = events.stream().filter(e -> !e.isWin()).toList();

if (!winEvents.isEmpty()) {
batchUpdateWinStats(winEvents);
}

if (!loseEvents.isEmpty()) {
batchUpdateLoseStats(loseEvents);
}
}

private void batchUpdateWinStats(List<StatChangeEvent> events) {
StringBuilder sql =
new StringBuilder(
"""
UPDATE stat SET
total_games = total_games + 1,
winning_games = winning_games + 1,
score = score + CASE user_id
""");

for (StatChangeEvent event : events) {
sql.append(String.format("WHEN %d THEN %d ", event.getUserId(), event.getDeltaScore()));
}

sql.append("END WHERE user_id IN (");
sql.append(
events.stream()
.map(e -> String.valueOf(e.getUserId()))
.collect(Collectors.joining(",")));
sql.append(")");

int updatedRows = jdbcTemplate.update(sql.toString());
log.debug("Batch updated {} win stats", updatedRows);
}

private void batchUpdateLoseStats(List<StatChangeEvent> events) {
StringBuilder sql =
new StringBuilder(
"""
UPDATE stat SET
total_games = total_games + 1,
score = score + CASE user_id
""");

for (StatChangeEvent event : events) {
sql.append(String.format("WHEN %d THEN %d ", event.getUserId(), event.getDeltaScore()));
}

sql.append("END WHERE user_id IN (");
sql.append(
events.stream()
.map(e -> String.valueOf(e.getUserId()))
.collect(Collectors.joining(",")));
sql.append(")");

int updatedRows = jdbcTemplate.update(sql.toString());
log.debug("Batch updated {} lose stats", updatedRows);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ public void addUser(long userId, String nickname) {
@Override
public void updateRank(long userId, boolean win, int deltaScore) {
redisRepository.updateRank(userId, win, deltaScore);
if (win) {
jpaRepository.updateStatByUserIdCaseWin(deltaScore, userId);
} else {
jpaRepository.updateStatByUserIdCaseLose(deltaScore, userId);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.f1.backend.domain.stat.dto;

import lombok.Builder;
import lombok.Getter;

@Getter
@Builder
public class StatChangeEvent {

private Long userId;
private boolean win;
private int deltaScore;

public static StatChangeEvent of(Long userId, boolean win, int deltaScore) {
return StatChangeEvent.builder().userId(userId).win(win).deltaScore(deltaScore).build();
}
}
35 changes: 35 additions & 0 deletions backend/src/main/java/io/f1/backend/global/config/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.f1.backend.global.config;

import lombok.RequiredArgsConstructor;

import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

@EnableKafka
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {

private final KafkaProperties kafkaProperties;

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();

ConsumerFactory<String, String> consumerFactory =
new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());

factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
factory.setBatchListener(true);

return factory;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.f1.backend.global.util.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaProducer {

private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper = new ObjectMapper();

public void sendWithKey(String topic, String key, Object object) {
String jsonMessage = convertToJson(object);
if (jsonMessage != null) {
kafkaTemplate.send(topic, key, jsonMessage);
}
}

private String convertToJson(Object object) {
try {
return objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
log.error("KafkaProducer Json Processing error", e);
return null;
}
}
}
16 changes: 16 additions & 0 deletions backend/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ spring:
format_sql: true
dialect: org.hibernate.dialect.MySQLDialect

kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
consumer:
group-id: consumer-group
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: ${KAFKA_MAX_POLL_RECORDS}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
batch-size: 16384

security:
oauth2:
client:
Expand Down
71 changes: 71 additions & 0 deletions backend/src/test/java/io/f1/backend/domain/stat/KafkaStatTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.f1.backend.domain.stat;

import static org.assertj.core.api.Assertions.assertThat;

import com.github.database.rider.core.api.dataset.DataSet;
import com.github.database.rider.spring.api.DBRider;

import io.f1.backend.domain.stat.dao.StatJpaRepository;
import io.f1.backend.domain.stat.dto.StatChangeEvent;
import io.f1.backend.domain.stat.dto.StatWithUserSummary;
import io.f1.backend.domain.user.dao.UserRepository;
import io.f1.backend.global.config.KafkaTestContainerConfig;
import io.f1.backend.global.util.kafka.KafkaProducer;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Import;

import java.time.Duration;

@DBRider
@SpringBootTest
@Import({KafkaTestContainerConfig.class})
class KafkaStatTest {

@Autowired UserRepository userRepository;
@Autowired KafkaProducer kafkaProducer;
@Autowired StatJpaRepository statJpaRepository;

@Test
@DataSet("datasets/stat/one-user-stat.yml")
@DisplayName("Kafka를 통해 게임 결과가 전송되면 Consumer가 비동기로 처리하여 MySQL에 반영된다")
void kafkaConsumerProcessesGameResultAsynchronously() throws Exception {
// given
long userId = 1L;
StatWithUserSummary originalStat =
statJpaRepository.findStatWithUserSummary(userId).orElseThrow(AssertionError::new);

int deltaScore = 100;
StatChangeEvent event = StatChangeEvent.of(userId, true, deltaScore);
kafkaProducer.sendWithKey("stat-changes", String.valueOf(userId), event);

// when
Awaitility.await()
.atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofMillis(200))
.until(() -> isStatUpdated(userId, originalStat.score() + deltaScore));

// then
StatWithUserSummary updatedStat =
statJpaRepository.findStatWithUserSummary(userId).orElseThrow(AssertionError::new);
assertThat(updatedStat.score()).isEqualTo(originalStat.score() + deltaScore);
assertThat(updatedStat.totalGames()).isEqualTo(originalStat.totalGames() + 1);
assertThat(updatedStat.winningGames()).isEqualTo(originalStat.winningGames() + 1);
}

private boolean isStatUpdated(long userId, long expectedScore) {
try {
return statJpaRepository
.findStatWithUserSummary(userId)
.orElseThrow(AssertionError::new)
.score()
== expectedScore;
} catch (Exception e) {
return false;
}
}
}
Loading