Skip to content

Commit a4bb99d

Browse files
authored
Merge pull request #205 from FC-InnerCircle-ICD2/refactor/convert-redis-stream
refactor : 배송 요청 처리 프로세스 변경
2 parents ec15352 + ac04db8 commit a4bb99d

13 files changed

Lines changed: 302 additions & 121 deletions

File tree

order/src/main/java/com/emotionalcart/order/infra/config/RedisConfig.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
99
import org.springframework.data.redis.core.RedisTemplate;
1010
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
11-
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
1211
import org.springframework.data.redis.serializer.StringRedisSerializer;
1312

1413
@RequiredArgsConstructor
@@ -23,11 +22,13 @@ public RedisConnectionFactory redisConnectionFactory() {
2322
}
2423

2524
@Bean
26-
public <String, RedisMessage> RedisTemplate<String, RedisMessage> redisTemplate() {
27-
RedisTemplate<String, RedisMessage> redisTemplate = new RedisTemplate<>();
28-
redisTemplate.setConnectionFactory(redisConnectionFactory()); //connection
29-
redisTemplate.setKeySerializer(new StringRedisSerializer()); // key
30-
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class)); //Java Obj <-> JSON -> String Value
25+
public RedisTemplate<String, Object> redisTemplate() {
26+
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
27+
redisTemplate.setConnectionFactory(redisConnectionFactory());
28+
redisTemplate.setKeySerializer(new StringRedisSerializer());
29+
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
30+
redisTemplate.setValueSerializer(new StringRedisSerializer());
31+
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
3132
return redisTemplate;
3233
}
3334

order/src/main/java/com/emotionalcart/order/infra/redis/RedisListenerService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
@RequiredArgsConstructor
1515
public class RedisListenerService implements MessageListener {
1616

17-
private final RedisTemplate<String, RedisMessage> template;
17+
private final RedisTemplate<String, Object> template;
1818
private final ObjectMapper objectMapper;
1919

2020
@Override

order/src/main/java/com/emotionalcart/order/infra/redis/RedisPublisher.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.emotionalcart.order.infra.redis;
22

3-
import com.emotionalcart.order.infra.redis.dto.RedisMessage;
43
import lombok.RequiredArgsConstructor;
54
import org.springframework.data.redis.core.RedisTemplate;
65
import org.springframework.data.redis.listener.ChannelTopic;
@@ -10,12 +9,12 @@
109
@Service
1110
public class RedisPublisher {
1211

13-
private final RedisTemplate<String, RedisMessage> template;
12+
private final RedisTemplate<String, Object> template;
1413

1514
/**
1615
* publish
1716
*/
18-
public void publish(ChannelTopic topic, RedisMessage value) {
17+
public void publish(ChannelTopic topic, Object value) {
1918
template.convertAndSend(topic.getTopic(), value);
2019
}
2120

order/src/main/java/com/emotionalcart/order/infra/shipment/ShipmentService.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package com.emotionalcart.order.infra.shipment;
22

3-
import com.emotionalcart.order.infra.redis.dto.RedisMessage;
43
import com.emotionalcart.order.infra.shipment.dto.OrderShipmentRequest;
54
import com.fasterxml.jackson.core.JsonProcessingException;
65
import com.fasterxml.jackson.databind.ObjectMapper;
76
import lombok.RequiredArgsConstructor;
87
import lombok.extern.slf4j.Slf4j;
8+
import org.springframework.data.redis.connection.stream.MapRecord;
99
import org.springframework.data.redis.core.RedisTemplate;
1010
import org.springframework.stereotype.Service;
1111

@@ -16,16 +16,20 @@
1616
@RequiredArgsConstructor
1717
public class ShipmentService {
1818

19-
private final RedisTemplate<String, RedisMessage> redisTemplate;
19+
private final RedisTemplate<String, Object> redisTemplate;
20+
private static final String STREAM_KEY = "shipment:request:stream";
2021
private final ObjectMapper objectMapper;
21-
private static final String STREAM_KEY = "shipment_stream"; // Stream 이름
2222

2323
public void createShipment(OrderShipmentRequest orderShipmentRequest) {
2424
try {
25-
String jsonMessage = objectMapper.writeValueAsString(RedisMessage.convert(STREAM_KEY, orderShipmentRequest));
26-
redisTemplate.opsForStream().add(STREAM_KEY, Map.of("data", jsonMessage));
25+
String json = objectMapper.writeValueAsString(orderShipmentRequest);
26+
Map<String, String> map = Map.of("shipmentOrderRequest", json);
27+
28+
MapRecord<String, String, String> record = MapRecord.create(STREAM_KEY, map);
29+
redisTemplate.opsForStream().add(record);
30+
2731
} catch (JsonProcessingException e) {
28-
throw new RuntimeException(e.getMessage());
32+
log.error("error : {}", e.getMessage());
2933
}
3034
}
3135

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,34 @@
11
package com.emotionalcart.shipment.infra.config;
22

3-
import com.emotionalcart.shipment.infra.redis.RedisSubscribeListener;
43
import lombok.RequiredArgsConstructor;
54
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
65
import org.springframework.context.annotation.Bean;
76
import org.springframework.context.annotation.Configuration;
87
import org.springframework.data.redis.connection.RedisConnectionFactory;
98
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
109
import org.springframework.data.redis.core.RedisTemplate;
11-
import org.springframework.data.redis.listener.PatternTopic;
1210
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
13-
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
1411
import org.springframework.data.redis.serializer.StringRedisSerializer;
1512

1613
@RequiredArgsConstructor
1714
@Configuration
1815
public class RedisConfig {
1916

2017
private final RedisProperties redisProperties;
21-
private final RedisSubscribeListener redisSubscribeListener;
2218

2319
@Bean
2420
public RedisConnectionFactory redisConnectionFactory() {
2521
return new LettuceConnectionFactory(redisProperties.getHost(), redisProperties.getPort());
2622
}
2723

2824
@Bean
29-
public <String, RedisMessage> RedisTemplate<String, RedisMessage> redisTemplate() {
30-
RedisTemplate<String, RedisMessage> redisTemplate = new RedisTemplate<>();
31-
redisTemplate.setConnectionFactory(redisConnectionFactory()); //connection
32-
redisTemplate.setKeySerializer(new StringRedisSerializer()); // key
33-
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class)); //Java Obj <-> JSON -> String Value
25+
public RedisTemplate<String, Object> redisTemplate() {
26+
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
27+
redisTemplate.setConnectionFactory(redisConnectionFactory());
28+
redisTemplate.setKeySerializer(new StringRedisSerializer());
29+
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
30+
redisTemplate.setValueSerializer(new StringRedisSerializer());
31+
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
3432
return redisTemplate;
3533
}
3634

@@ -41,15 +39,4 @@ public RedisMessageListenerContainer redisMessageListener() {
4139
return container;
4240
}
4341

44-
@Bean
45-
public RedisMessageListenerContainer redisMessageListenerContainer() {
46-
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
47-
container.setConnectionFactory(redisConnectionFactory());
48-
49-
// 특정 채널을 구독하도록 리스너 추가
50-
container.addMessageListener(redisSubscribeListener, PatternTopic.of("create-shipment"));
51-
52-
return container;
53-
}
54-
5542
}
Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,66 @@
11
package com.emotionalcart.shipment.infra.redis;
22

3+
import com.emotionalcart.shipment.infra.redis.component.RedisOperator;
34
import lombok.RequiredArgsConstructor;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.beans.factory.InitializingBean;
7+
import org.springframework.data.redis.connection.stream.MapRecord;
8+
import org.springframework.data.redis.connection.stream.PendingMessage;
9+
import org.springframework.data.redis.connection.stream.PendingMessages;
410
import org.springframework.scheduling.annotation.Scheduled;
5-
import org.springframework.stereotype.Service;
11+
import org.springframework.stereotype.Component;
612

7-
@Service
13+
@Slf4j
14+
@Component
815
@RequiredArgsConstructor
9-
public class RedisShipmentScheduler {
16+
public class RedisShipmentScheduler implements InitializingBean {
1017

18+
private String streamKey;
19+
private String consumerGroupName;
20+
private String consumerName;
21+
private final RedisOperator redisOperator;
1122
private final ShipmentConsumerService shipmentConsumerService;
1223

13-
@Scheduled(fixedDelay = 10000) // 10초마다 실행
14-
public void processShipments() {
15-
shipmentConsumerService.consumeShipmentEvents();
24+
@Scheduled(fixedRate = 10000)
25+
public void processPendingMessage() {
26+
PendingMessages pendingMessages = this.redisOperator
27+
.findStreamPendingMessages(streamKey, consumerGroupName, consumerName);
28+
29+
for (PendingMessage pendingMessage : pendingMessages) {
30+
this.redisOperator.claimStream(pendingMessage, consumerName);
31+
try {
32+
MapRecord<String, Object, Object> messageToProcess = this.redisOperator
33+
.findStreamMessageById(this.streamKey, pendingMessage.getIdAsString());
34+
if (messageToProcess == null) {
35+
log.info("존재하지 않는 메시지입니다.");
36+
}
37+
String errorCount = (String)this.redisOperator
38+
.getRedisValue("errorCount", pendingMessage.getIdAsString());
39+
40+
if (Integer.parseInt(errorCount) >= 4) {
41+
log.info("재 처리 최대 횟수 초과 하였습니다.");
42+
43+
} else if (pendingMessage.getTotalDeliveryCount() >= 2) {
44+
log.info("최대 delivery 횟수 초과 하였습니다.");
45+
} else {
46+
this.shipmentConsumerService.consumeShipmentEvents();
47+
}
48+
this.redisOperator.ackStream(consumerGroupName, messageToProcess);
49+
} catch (Exception e) {
50+
log.info("error : {}", e.getMessage());
51+
this.redisOperator.increaseRedisValue("errorCount", pendingMessage.getIdAsString());
52+
}
53+
}
54+
}
55+
56+
/**
57+
* 빈 등록 시점에 값 정의
58+
*/
59+
@Override
60+
public void afterPropertiesSet() {
61+
this.streamKey = "shipment:request:stream";
62+
this.consumerGroupName = "shipment-consumers";
63+
this.consumerName = "shipment-processor-1";
1664
}
1765

18-
}
66+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.emotionalcart.shipment.infra.redis;
2+
3+
import com.emotionalcart.shipment.application.service.OrderShipmentService;
4+
import com.emotionalcart.shipment.infra.redis.component.RedisOperator;
5+
import com.emotionalcart.shipment.presentation.controller.request.OrderShipmentRequest;
6+
import com.fasterxml.jackson.core.JsonProcessingException;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import lombok.RequiredArgsConstructor;
9+
import lombok.extern.slf4j.Slf4j;
10+
import org.springframework.beans.factory.DisposableBean;
11+
import org.springframework.beans.factory.InitializingBean;
12+
import org.springframework.data.redis.connection.stream.Consumer;
13+
import org.springframework.data.redis.connection.stream.MapRecord;
14+
import org.springframework.data.redis.connection.stream.ReadOffset;
15+
import org.springframework.data.redis.connection.stream.StreamOffset;
16+
import org.springframework.data.redis.stream.StreamListener;
17+
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
18+
import org.springframework.data.redis.stream.Subscription;
19+
import org.springframework.stereotype.Component;
20+
21+
import java.time.Duration;
22+
23+
@Slf4j
24+
@Component
25+
@RequiredArgsConstructor
26+
public class RedisStreamConsumer implements StreamListener<String, MapRecord<String, Object, Object>>, InitializingBean, DisposableBean {
27+
28+
private StreamMessageListenerContainer<String, MapRecord<String, Object, Object>> listenerContainer;
29+
private Subscription subscription;
30+
private final OrderShipmentService orderShipmentService;
31+
private final ObjectMapper objectMapper;
32+
33+
private String streamKey;
34+
private String consumerGroupName;
35+
private String consumerName;
36+
37+
private final RedisOperator redisOperator;
38+
39+
@Override
40+
public void onMessage(MapRecord<String, Object, Object> message) {
41+
log.info("receive message");
42+
try {
43+
OrderShipmentRequest orderShipmentRequest =
44+
objectMapper.readValue((String)message.getValue().get("shipmentOrderRequest"), OrderShipmentRequest.class);
45+
orderShipmentService.createShipment(orderShipmentRequest.mapToDomain());
46+
// 이후, ack stream
47+
this.redisOperator.ackStream(streamKey, message);
48+
} catch (JsonProcessingException e) {
49+
throw new RuntimeException(e);
50+
}
51+
}
52+
53+
/**
54+
* 빈종료시 종료
55+
*/
56+
@Override
57+
public void destroy() {
58+
if (this.subscription != null) {
59+
this.subscription.cancel();
60+
}
61+
if (this.listenerContainer != null) {
62+
this.listenerContainer.stop();
63+
}
64+
}
65+
66+
/**
67+
* 빈생성시 listenerContainer 설정
68+
*
69+
* @throws Exception
70+
*/
71+
@Override
72+
public void afterPropertiesSet() throws Exception {
73+
this.streamKey = "shipment:request:stream";
74+
this.consumerGroupName = "shipment-consumers";
75+
this.consumerName = "shipment-processor-1";
76+
77+
this.redisOperator.createStreamConsumerGroup(streamKey, consumerGroupName);
78+
79+
this.listenerContainer = this.redisOperator.createStreamMessageListenerContainer();
80+
81+
this.subscription = this.listenerContainer.receive(
82+
Consumer.from(this.consumerGroupName, consumerName),
83+
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
84+
this
85+
);
86+
87+
this.subscription.await(Duration.ofSeconds(2));
88+
89+
this.listenerContainer.start();
90+
}
91+
92+
}

shipment/src/main/java/com/emotionalcart/shipment/infra/redis/RedisSubscribeListener.java

Lines changed: 0 additions & 40 deletions
This file was deleted.

0 commit comments

Comments
 (0)