Skip to content

Commit 364198e

Browse files
authored
fix: Redis Stream 신청 메시지 실패 처리 보강 (#235)
* refactor: 수락제와 선착순 레슨 신청 경로 분리 * fix: Redis Stream 신청 메시지 실패 처리 보강 * fix: 레슨 신청 중복 방지를 위한 유니크 제약 추가 * chore: Swagger API 태그 정리
1 parent a24737d commit 364198e

18 files changed

Lines changed: 422 additions & 89 deletions

src/main/java/com/threestar/trainus/domain/file/controller/S3Controller.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
import com.threestar.trainus.global.annotation.LoginUser;
1515
import com.threestar.trainus.global.unit.BaseResponse;
1616

17+
import io.swagger.v3.oas.annotations.tags.Tag;
18+
19+
@Tag(name = "파일 API", description = "S3 Presigned URL 발급 API")
1720
@RestController
1821
@RequiredArgsConstructor
1922
@RequestMapping("/api/v1/s3")

src/main/java/com/threestar/trainus/domain/lesson/issue/LessonAdmissionScheduler.java

Lines changed: 72 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -73,63 +73,92 @@ private void processAdmissionForLesson(Long lessonId) {
7373
log.warn(
7474
"Admission diagnostics. lessonId={}, dequeuedCount={}, statusKeyCount={}, statusInfos=null. Admission skipped after dequeue.",
7575
lessonId, requestIds.size(), statusKeys.size());
76+
requeueAfterAdmissionFailure(lessonId, requestIds, statusKeys, null);
7677
return;
7778
}
7879

7980
AtomicInteger statusNullCount = new AtomicInteger();
8081
AtomicInteger invalidStatusCount = new AtomicInteger();
8182
AtomicInteger xaddAttemptCount = new AtomicInteger();
8283

83-
// Pipelining 방식으로 요청 상태 변경 / Stream ADD 로직 일괄 처리
84-
mqRedisTemplate.executePipelined(new SessionCallback<Object>() {
85-
@Override
86-
public Object execute(RedisOperations operations) {
87-
for (int i = 0; i < statusKeys.size(); i++) {
88-
String info = statusInfos.get(i);
89-
if (info == null) {
90-
statusNullCount.incrementAndGet();
91-
continue;
84+
try {
85+
// Pipelining 방식으로 요청 상태 변경 / Stream ADD 로직 일괄 처리
86+
mqRedisTemplate.executePipelined(new SessionCallback<Object>() {
87+
@Override
88+
public Object execute(RedisOperations operations) {
89+
for (int i = 0; i < statusKeys.size(); i++) {
90+
String info = statusInfos.get(i);
91+
if (info == null) {
92+
statusNullCount.incrementAndGet();
93+
continue;
94+
}
95+
96+
String[] parts = info.split(":");
97+
if (parts.length < 3) {
98+
invalidStatusCount.incrementAndGet();
99+
continue;
100+
}
101+
102+
String requestId = requestIds.get(i);
103+
Long userId;
104+
Long originalTimestamp;
105+
try {
106+
userId = Long.parseLong(parts[2]);
107+
originalTimestamp = parts.length >= 4 ? Long.parseLong(parts[3]) : System.currentTimeMillis();
108+
} catch (NumberFormatException e) {
109+
invalidStatusCount.incrementAndGet();
110+
continue;
111+
}
112+
113+
// 상태 변경 (SET)
114+
String statusKey = statusKeys.get(i);
115+
String processingInfo = String.format("%s:%d:%d:%d", LessonApplyStreamConstant.STATUS_PROCESSING, lessonId, userId, originalTimestamp);
116+
operations.opsForValue().set(statusKey, processingInfo, java.time.Duration.ofMinutes(LessonApplyStreamConstant.STATUS_TTL_MINUTE));
117+
118+
// 스트림 추가 (XADD)
119+
Map<String, String> content = new HashMap<>();
120+
content.put("lessonId", String.valueOf(lessonId));
121+
content.put("userId", String.valueOf(userId));
122+
content.put("requestId", requestId);
123+
content.put("timestamp", String.valueOf(originalTimestamp));
124+
operations.opsForStream().add(LessonApplyStreamConstant.STREAM_KEY, content);
125+
xaddAttemptCount.incrementAndGet();
92126
}
93-
94-
String[] parts = info.split(":");
95-
if (parts.length < 3) {
96-
invalidStatusCount.incrementAndGet();
97-
continue;
98-
}
99-
100-
String requestId = requestIds.get(i);
101-
Long userId;
102-
Long originalTimestamp;
103-
try {
104-
userId = Long.parseLong(parts[2]);
105-
originalTimestamp = parts.length >= 4 ? Long.parseLong(parts[3]) : System.currentTimeMillis();
106-
} catch (NumberFormatException e) {
107-
invalidStatusCount.incrementAndGet();
108-
continue;
109-
}
110-
111-
// 상태 변경 (SET)
112-
String statusKey = statusKeys.get(i);
113-
String processingInfo = String.format("%s:%d:%d:%d", LessonApplyStreamConstant.STATUS_PROCESSING, lessonId, userId, originalTimestamp);
114-
operations.opsForValue().set(statusKey, processingInfo, java.time.Duration.ofMinutes(LessonApplyStreamConstant.STATUS_TTL_MINUTE));
115-
116-
// 스트림 추가 (XADD)
117-
Map<String, String> content = new HashMap<>();
118-
content.put("lessonId", String.valueOf(lessonId));
119-
content.put("userId", String.valueOf(userId));
120-
content.put("requestId", requestId);
121-
content.put("timestamp", String.valueOf(originalTimestamp));
122-
operations.opsForStream().add(LessonApplyStreamConstant.STREAM_KEY, content);
123-
xaddAttemptCount.incrementAndGet();
127+
return null;
124128
}
125-
return null;
126-
}
127-
});
129+
});
130+
} catch (RuntimeException e) {
131+
requeueAfterAdmissionFailure(lessonId, requestIds, statusKeys, statusInfos);
132+
log.error("Admission pipeline failed. Requeued requests for lessonId={}", lessonId, e);
133+
return;
134+
}
128135

129136
log.info(
130137
"Admission diagnostics. lessonId={}, dequeuedCount={}, statusKeyCount={}, statusNullCount={}, invalidStatusCount={}, xaddAttemptCount={}",
131138
lessonId, requestIds.size(), statusKeys.size(), statusNullCount.get(), invalidStatusCount.get(),
132139
xaddAttemptCount.get());
133140
log.debug("Admitted {} users to MQ via pipeline for lesson: {}", requestIds.size(), lessonId);
134141
}
142+
143+
private void requeueAfterAdmissionFailure(
144+
Long lessonId,
145+
List<String> requestIds,
146+
List<String> statusKeys,
147+
List<String> statusInfos
148+
) {
149+
int requeueCount = 0;
150+
for (int i = 0; i < requestIds.size(); i++) {
151+
String requestId = requestIds.get(i);
152+
String statusInfo = statusInfos == null ? null : statusInfos.get(i);
153+
154+
waitingRoomService.requeueAfterAdmissionFailure(lessonId, requestId);
155+
if (statusInfo != null) {
156+
mqRedisTemplate.opsForValue()
157+
.set(statusKeys.get(i), statusInfo, java.time.Duration.ofMinutes(LessonApplyStreamConstant.STATUS_TTL_MINUTE));
158+
}
159+
requeueCount++;
160+
}
161+
162+
log.warn("Admission failed after dequeue. Requeued {} requests. lessonId={}", requeueCount, lessonId);
163+
}
135164
}

src/main/java/com/threestar/trainus/domain/lesson/issue/LessonApplyConsumer.java

Lines changed: 74 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class LessonApplyConsumer implements StreamListener<String, MapRecord<Str
3434

3535
private final ConcurrentLinkedQueue<MapRecord<String, String, String>> buffer = new ConcurrentLinkedQueue<>();
3636
private static final int BATCH_SIZE = 1000;
37+
private static final int CHUNK_SIZE = 100;
3738
private static final String STREAM_KEY = LessonApplyStreamConstant.STREAM_KEY;
3839
private static final String GROUP = LessonApplyStreamConstant.GROUP;
3940
private long lastProcessedTime = System.currentTimeMillis();
@@ -99,39 +100,23 @@ private void processBuffer() {
99100
value.get("requestId"), Long.parseLong(value.get("timestamp")));
100101
}).collect(Collectors.toList());
101102

103+
processBatch(records, messages);
104+
}
105+
106+
private void processBatch(
107+
List<MapRecord<String, String, String>> records,
108+
List<ApplyMessage> messages
109+
) {
102110
try {
103111
// DB 배치 처리
104112
lessonApplyService.applyBatch(messages);
105113

106-
// Pipelining 으로 상태 ACK 업데이트 / 스트림에서 DELETE 일괄 처리
107-
RecordId[] ids = records.stream().map(MapRecord::getId).toArray(RecordId[]::new);
108-
mqRedisTemplate.executePipelined(new org.springframework.data.redis.core.SessionCallback<Object>() {
109-
@Override
110-
public Object execute(org.springframework.data.redis.core.RedisOperations operations) {
111-
operations.opsForStream().acknowledge(STREAM_KEY, GROUP, ids);
112-
operations.opsForStream().delete(STREAM_KEY, ids);
113-
return null;
114-
}
115-
});
114+
ackAndDeleteAll(records);
116115
log.info("Batch processed {} messages successfully", messages.size());
117116
} catch (Exception e) {
118-
// 실패 시 개별 메세지 실행
119-
log.error("Batch failed: {}. Falling back to individual processing.", e.getMessage());
120-
121-
for (int i = 0; i < records.size(); i++) {
122-
MapRecord<String, String, String> record = records.get(i);
123-
ApplyMessage msg = messages.get(i);
124-
125-
try {
126-
lessonApplyService.apply(msg.lessonId(), msg.userId(), msg.requestId(), msg.timestamp());
127-
mqRedisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP, record.getId());
128-
mqRedisTemplate.opsForStream().delete(STREAM_KEY, record.getId());
129-
} catch (Exception ex) {
130-
log.error("Individual message failed: {}, Error={}", msg.requestId(), ex.getMessage());
131-
mqRedisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP, record.getId());
132-
mqRedisTemplate.opsForStream().delete(STREAM_KEY, record.getId());
133-
}
134-
}
117+
// 배치 실패 시 chunk 단위 재시도
118+
log.error("Batch failed: {}. Retrying by chunks.", e.getMessage());
119+
processChunks(records, messages);
135120
} finally {
136121
// 작업 완료 후 각 레슨별로 이번 배치에서 처리한 개수만큼 Busy 카운터 감소, 마지막 처리 시점 기록
137122
Map<Long, Long> countsPerLesson = messages.stream()
@@ -146,9 +131,71 @@ public Object execute(org.springframework.data.redis.core.RedisOperations operat
146131
}
147132
}
148133

134+
private void processChunks(
135+
List<MapRecord<String, String, String>> records,
136+
List<ApplyMessage> messages
137+
) {
138+
for (int from = 0; from < messages.size(); from += CHUNK_SIZE) {
139+
// 전체 메세지 chunk로 나누어 처리
140+
int to = Math.min(from + CHUNK_SIZE, messages.size());
141+
List<MapRecord<String, String, String>> chunkRecords = records.subList(from, to);
142+
List<ApplyMessage> chunkMessages = messages.subList(from, to);
143+
144+
try {
145+
lessonApplyService.applyBatch(chunkMessages);
146+
ackAndDeleteAll(chunkRecords);
147+
log.info("Chunk processed {} messages successfully", chunkMessages.size());
148+
} catch (Exception e) {
149+
// chunk 단위에서도 실패한 경우에만 개별 메세지 처리로 전환
150+
log.error("Chunk failed: {}. Falling back to individual processing.", e.getMessage());
151+
processIndividually(chunkRecords, chunkMessages);
152+
}
153+
}
154+
}
155+
156+
private void processIndividually(
157+
List<MapRecord<String, String, String>> records,
158+
List<ApplyMessage> messages
159+
) {
160+
for (int i = 0; i < records.size(); i++) {
161+
MapRecord<String, String, String> record = records.get(i);
162+
ApplyMessage msg = messages.get(i);
163+
164+
try {
165+
boolean completed = lessonApplyService.apply(msg.lessonId(), msg.userId(), msg.requestId(), msg.timestamp());
166+
ackAndDelete(record);
167+
if (!completed) {
168+
// 비즈니스 실패
169+
log.warn("Individual message completed as business failure. requestId={}", msg.requestId());
170+
}
171+
} catch (Exception e) {
172+
// 시스템 실패 가능성 : pending 상태 유지
173+
log.error("Individual message failed. requestId={}, message remains pending", msg.requestId(), e);
174+
}
175+
}
176+
}
177+
149178
public void clearBuffer() {
150179
buffer.clear();
151180
lastProcessedTime = System.currentTimeMillis();
152181
log.info("Consumer buffer cleared.");
153182
}
183+
184+
private void ackAndDeleteAll(List<MapRecord<String, String, String>> records) {
185+
// Pipelining 으로 여러 메세지를 ACK 처리 후 Stream에서 일괄 삭제
186+
RecordId[] ids = records.stream().map(MapRecord::getId).toArray(RecordId[]::new);
187+
mqRedisTemplate.executePipelined(new org.springframework.data.redis.core.SessionCallback<Object>() {
188+
@Override
189+
public Object execute(org.springframework.data.redis.core.RedisOperations operations) {
190+
operations.opsForStream().acknowledge(STREAM_KEY, GROUP, ids);
191+
operations.opsForStream().delete(STREAM_KEY, ids);
192+
return null;
193+
}
194+
});
195+
}
196+
197+
private void ackAndDelete(MapRecord<String, String, String> record) {
198+
mqRedisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP, record.getId());
199+
mqRedisTemplate.opsForStream().delete(STREAM_KEY, record.getId());
200+
}
154201
}

src/main/java/com/threestar/trainus/domain/lesson/issue/LessonApplyService.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ public void applyBatch(List<ApplyMessage> messages) {
6161
.stream()
6262
.collect(Collectors.toMap(Lesson::getId, l -> l));
6363

64-
String sql = "INSERT INTO lesson_participants (lesson_id, user_id, join_at, status) VALUES (?, ?, NOW(), ?)";
64+
String sql = """
65+
INSERT INTO lesson_participants (lesson_id, user_id, join_at, status)
66+
VALUES (?, ?, NOW(), ?)
67+
ON CONFLICT (user_id, lesson_id) DO NOTHING
68+
""";
6569
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
6670
@Override
6771
public void setValues(PreparedStatement ps, int i) throws SQLException {
@@ -152,10 +156,13 @@ public boolean apply(Long lessonId, Long userId, String requestId, Long produceT
152156
updateStatus(statusKey, "SUCCESS");
153157

154158
return true;
155-
} catch (Exception e) {
156-
log.error("Failed to apply lesson in consumer: {}", e.getMessage());
157-
updateStatus(statusKey, "FAIL:ERROR");
159+
} catch (BusinessException e) {
160+
log.warn("Lesson apply business failed. requestId={}, errorCode={}", requestId, e.getErrorCode());
161+
updateStatus(statusKey, "FAIL:" + e.getErrorCode().name());
158162
return false;
163+
} catch (Exception e) {
164+
log.error("Failed to apply lesson in consumer. requestId={}", requestId, e);
165+
throw e;
159166
}
160167
}
161168

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package com.threestar.trainus.domain.lesson.issue;
2+
3+
import java.time.Duration;
4+
import java.util.HashMap;
5+
import java.util.List;
6+
import java.util.Map;
7+
8+
import org.springframework.beans.factory.annotation.Qualifier;
9+
import org.springframework.context.annotation.Profile;
10+
import org.springframework.data.domain.Range;
11+
import org.springframework.data.redis.connection.stream.MapRecord;
12+
import org.springframework.data.redis.connection.stream.PendingMessage;
13+
import org.springframework.data.redis.connection.stream.PendingMessages;
14+
import org.springframework.data.redis.connection.stream.RecordId;
15+
import org.springframework.data.redis.core.StringRedisTemplate;
16+
import org.springframework.scheduling.annotation.Scheduled;
17+
import org.springframework.stereotype.Component;
18+
19+
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
20+
21+
import lombok.RequiredArgsConstructor;
22+
import lombok.extern.slf4j.Slf4j;
23+
24+
@Profile("consumer")
25+
@Slf4j
26+
@Component
27+
@RequiredArgsConstructor
28+
public class LessonPendingMessageRecoveryScheduler {
29+
30+
private static final String STREAM_KEY = LessonApplyStreamConstant.STREAM_KEY;
31+
private static final String GROUP = LessonApplyStreamConstant.GROUP;
32+
private static final String RECOVERY_CONSUMER = "lesson-recovery-consumer";
33+
private static final Duration MIN_IDLE_TIME = Duration.ofSeconds(10);
34+
private static final int RECOVERY_BATCH_SIZE = 100;
35+
36+
@Qualifier("mqRedisTemplate")
37+
private final StringRedisTemplate mqRedisTemplate;
38+
39+
private final LessonApplyConsumer lessonApplyConsumer;
40+
41+
@Scheduled(fixedDelay = 5000)
42+
@SchedulerLock(name = "LessonPendingMessageRecovery", lockAtMostFor = "4s", lockAtLeastFor = "1s")
43+
public void recoverPendingMessages() {
44+
// Pending 메시지 목록 조회
45+
PendingMessages pendingMessages = mqRedisTemplate.opsForStream()
46+
.pending(STREAM_KEY, GROUP, Range.unbounded(), RECOVERY_BATCH_SIZE);
47+
48+
if (pendingMessages == null || pendingMessages.isEmpty()) {
49+
return;
50+
}
51+
52+
// 일정 시간 이상 응답이 없던 메시지만 선별
53+
RecordId[] staleRecordIds = pendingMessages.stream()
54+
.filter(message -> message.getElapsedTimeSinceLastDelivery().compareTo(MIN_IDLE_TIME) >= 0)
55+
.map(PendingMessage::getId)
56+
.toArray(RecordId[]::new);
57+
58+
if (staleRecordIds.length == 0) {
59+
return;
60+
}
61+
62+
List<MapRecord<String, Object, Object>> claimedRecords = mqRedisTemplate.opsForStream()
63+
.claim(STREAM_KEY, GROUP, RECOVERY_CONSUMER, MIN_IDLE_TIME, staleRecordIds);
64+
65+
if (claimedRecords == null || claimedRecords.isEmpty()) {
66+
return;
67+
}
68+
69+
// 회수한 메시지를 기존 Consumer 흐름으로 재전달
70+
claimedRecords.stream()
71+
.map(this::toStringRecord)
72+
.forEach(lessonApplyConsumer::onMessage);
73+
log.warn("Recovered {} pending lesson apply messages.", claimedRecords.size());
74+
}
75+
76+
private MapRecord<String, String, String> toStringRecord(MapRecord<String, Object, Object> record) {
77+
// Claim 결과를 String 기반 Record로 변환
78+
Map<String, String> value = new HashMap<>();
79+
record.getValue().forEach((key, val) -> value.put(String.valueOf(key), String.valueOf(val)));
80+
return MapRecord.create(STREAM_KEY, value).withId(record.getId());
81+
}
82+
}

0 commit comments

Comments
 (0)