Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import com.threestar.trainus.global.annotation.LoginUser;
import com.threestar.trainus.global.unit.BaseResponse;

import io.swagger.v3.oas.annotations.tags.Tag;

@Tag(name = "파일 API", description = "S3 Presigned URL 발급 API")
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1/s3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,63 +73,92 @@ private void processAdmissionForLesson(Long lessonId) {
log.warn(
"Admission diagnostics. lessonId={}, dequeuedCount={}, statusKeyCount={}, statusInfos=null. Admission skipped after dequeue.",
lessonId, requestIds.size(), statusKeys.size());
requeueAfterAdmissionFailure(lessonId, requestIds, statusKeys, null);
return;
}

AtomicInteger statusNullCount = new AtomicInteger();
AtomicInteger invalidStatusCount = new AtomicInteger();
AtomicInteger xaddAttemptCount = new AtomicInteger();

// Pipelining 방식으로 요청 상태 변경 / Stream ADD 로직 일괄 처리
mqRedisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) {
for (int i = 0; i < statusKeys.size(); i++) {
String info = statusInfos.get(i);
if (info == null) {
statusNullCount.incrementAndGet();
continue;
try {
// Pipelining 방식으로 요청 상태 변경 / Stream ADD 로직 일괄 처리
mqRedisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) {
for (int i = 0; i < statusKeys.size(); i++) {
String info = statusInfos.get(i);
if (info == null) {
statusNullCount.incrementAndGet();
continue;
}

String[] parts = info.split(":");
if (parts.length < 3) {
invalidStatusCount.incrementAndGet();
continue;
}

String requestId = requestIds.get(i);
Long userId;
Long originalTimestamp;
try {
userId = Long.parseLong(parts[2]);
originalTimestamp = parts.length >= 4 ? Long.parseLong(parts[3]) : System.currentTimeMillis();
} catch (NumberFormatException e) {
invalidStatusCount.incrementAndGet();
continue;
}

// 상태 변경 (SET)
String statusKey = statusKeys.get(i);
String processingInfo = String.format("%s:%d:%d:%d", LessonApplyStreamConstant.STATUS_PROCESSING, lessonId, userId, originalTimestamp);
operations.opsForValue().set(statusKey, processingInfo, java.time.Duration.ofMinutes(LessonApplyStreamConstant.STATUS_TTL_MINUTE));

// 스트림 추가 (XADD)
Map<String, String> content = new HashMap<>();
content.put("lessonId", String.valueOf(lessonId));
content.put("userId", String.valueOf(userId));
content.put("requestId", requestId);
content.put("timestamp", String.valueOf(originalTimestamp));
operations.opsForStream().add(LessonApplyStreamConstant.STREAM_KEY, content);
xaddAttemptCount.incrementAndGet();
}

String[] parts = info.split(":");
if (parts.length < 3) {
invalidStatusCount.incrementAndGet();
continue;
}

String requestId = requestIds.get(i);
Long userId;
Long originalTimestamp;
try {
userId = Long.parseLong(parts[2]);
originalTimestamp = parts.length >= 4 ? Long.parseLong(parts[3]) : System.currentTimeMillis();
} catch (NumberFormatException e) {
invalidStatusCount.incrementAndGet();
continue;
}

// 상태 변경 (SET)
String statusKey = statusKeys.get(i);
String processingInfo = String.format("%s:%d:%d:%d", LessonApplyStreamConstant.STATUS_PROCESSING, lessonId, userId, originalTimestamp);
operations.opsForValue().set(statusKey, processingInfo, java.time.Duration.ofMinutes(LessonApplyStreamConstant.STATUS_TTL_MINUTE));

// 스트림 추가 (XADD)
Map<String, String> content = new HashMap<>();
content.put("lessonId", String.valueOf(lessonId));
content.put("userId", String.valueOf(userId));
content.put("requestId", requestId);
content.put("timestamp", String.valueOf(originalTimestamp));
operations.opsForStream().add(LessonApplyStreamConstant.STREAM_KEY, content);
xaddAttemptCount.incrementAndGet();
return null;
}
return null;
}
});
});
} catch (RuntimeException e) {
requeueAfterAdmissionFailure(lessonId, requestIds, statusKeys, statusInfos);
log.error("Admission pipeline failed. Requeued requests for lessonId={}", lessonId, e);
return;
}

log.info(
"Admission diagnostics. lessonId={}, dequeuedCount={}, statusKeyCount={}, statusNullCount={}, invalidStatusCount={}, xaddAttemptCount={}",
lessonId, requestIds.size(), statusKeys.size(), statusNullCount.get(), invalidStatusCount.get(),
xaddAttemptCount.get());
log.debug("Admitted {} users to MQ via pipeline for lesson: {}", requestIds.size(), lessonId);
}

private void requeueAfterAdmissionFailure(
Long lessonId,
List<String> requestIds,
List<String> statusKeys,
List<String> statusInfos
) {
int requeueCount = 0;
for (int i = 0; i < requestIds.size(); i++) {
String requestId = requestIds.get(i);
String statusInfo = statusInfos == null ? null : statusInfos.get(i);

waitingRoomService.requeueAfterAdmissionFailure(lessonId, requestId);
if (statusInfo != null) {
mqRedisTemplate.opsForValue()
.set(statusKeys.get(i), statusInfo, java.time.Duration.ofMinutes(LessonApplyStreamConstant.STATUS_TTL_MINUTE));
}
requeueCount++;
}

log.warn("Admission failed after dequeue. Requeued {} requests. lessonId={}", requeueCount, lessonId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class LessonApplyConsumer implements StreamListener<String, MapRecord<Str

private final ConcurrentLinkedQueue<MapRecord<String, String, String>> buffer = new ConcurrentLinkedQueue<>();
private static final int BATCH_SIZE = 1000;
private static final int CHUNK_SIZE = 100;
private static final String STREAM_KEY = LessonApplyStreamConstant.STREAM_KEY;
private static final String GROUP = LessonApplyStreamConstant.GROUP;
private long lastProcessedTime = System.currentTimeMillis();
Expand Down Expand Up @@ -99,39 +100,23 @@ private void processBuffer() {
value.get("requestId"), Long.parseLong(value.get("timestamp")));
}).collect(Collectors.toList());

processBatch(records, messages);
}

private void processBatch(
List<MapRecord<String, String, String>> records,
List<ApplyMessage> messages
) {
try {
// DB 배치 처리
lessonApplyService.applyBatch(messages);

// Pipelining 으로 상태 ACK 업데이트 / 스트림에서 DELETE 일괄 처리
RecordId[] ids = records.stream().map(MapRecord::getId).toArray(RecordId[]::new);
mqRedisTemplate.executePipelined(new org.springframework.data.redis.core.SessionCallback<Object>() {
@Override
public Object execute(org.springframework.data.redis.core.RedisOperations operations) {
operations.opsForStream().acknowledge(STREAM_KEY, GROUP, ids);
operations.opsForStream().delete(STREAM_KEY, ids);
return null;
}
});
ackAndDeleteAll(records);
log.info("Batch processed {} messages successfully", messages.size());
} catch (Exception e) {
// 실패 시 개별 메세지 실행
log.error("Batch failed: {}. Falling back to individual processing.", e.getMessage());

for (int i = 0; i < records.size(); i++) {
MapRecord<String, String, String> record = records.get(i);
ApplyMessage msg = messages.get(i);

try {
lessonApplyService.apply(msg.lessonId(), msg.userId(), msg.requestId(), msg.timestamp());
mqRedisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP, record.getId());
mqRedisTemplate.opsForStream().delete(STREAM_KEY, record.getId());
} catch (Exception ex) {
log.error("Individual message failed: {}, Error={}", msg.requestId(), ex.getMessage());
mqRedisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP, record.getId());
mqRedisTemplate.opsForStream().delete(STREAM_KEY, record.getId());
}
}
// 배치 실패 시 chunk 단위 재시도
log.error("Batch failed: {}. Retrying by chunks.", e.getMessage());
processChunks(records, messages);
} finally {
// 작업 완료 후 각 레슨별로 이번 배치에서 처리한 개수만큼 Busy 카운터 감소, 마지막 처리 시점 기록
Map<Long, Long> countsPerLesson = messages.stream()
Expand All @@ -146,9 +131,71 @@ public Object execute(org.springframework.data.redis.core.RedisOperations operat
}
}

private void processChunks(
List<MapRecord<String, String, String>> records,
List<ApplyMessage> messages
) {
for (int from = 0; from < messages.size(); from += CHUNK_SIZE) {
// 전체 메세지 chunk로 나누어 처리
int to = Math.min(from + CHUNK_SIZE, messages.size());
List<MapRecord<String, String, String>> chunkRecords = records.subList(from, to);
List<ApplyMessage> chunkMessages = messages.subList(from, to);

try {
lessonApplyService.applyBatch(chunkMessages);
ackAndDeleteAll(chunkRecords);
log.info("Chunk processed {} messages successfully", chunkMessages.size());
} catch (Exception e) {
// chunk 단위에서도 실패한 경우에만 개별 메세지 처리로 전환
log.error("Chunk failed: {}. Falling back to individual processing.", e.getMessage());
processIndividually(chunkRecords, chunkMessages);
}
}
}

private void processIndividually(
List<MapRecord<String, String, String>> records,
List<ApplyMessage> messages
) {
for (int i = 0; i < records.size(); i++) {
MapRecord<String, String, String> record = records.get(i);
ApplyMessage msg = messages.get(i);

try {
boolean completed = lessonApplyService.apply(msg.lessonId(), msg.userId(), msg.requestId(), msg.timestamp());
ackAndDelete(record);
if (!completed) {
// 비즈니스 실패
log.warn("Individual message completed as business failure. requestId={}", msg.requestId());
}
} catch (Exception e) {
// 시스템 실패 가능성 : pending 상태 유지
log.error("Individual message failed. requestId={}, message remains pending", msg.requestId(), e);
}
}
}

public void clearBuffer() {
buffer.clear();
lastProcessedTime = System.currentTimeMillis();
log.info("Consumer buffer cleared.");
}

private void ackAndDeleteAll(List<MapRecord<String, String, String>> records) {
// Pipelining 으로 여러 메세지를 ACK 처리 후 Stream에서 일괄 삭제
RecordId[] ids = records.stream().map(MapRecord::getId).toArray(RecordId[]::new);
mqRedisTemplate.executePipelined(new org.springframework.data.redis.core.SessionCallback<Object>() {
@Override
public Object execute(org.springframework.data.redis.core.RedisOperations operations) {
operations.opsForStream().acknowledge(STREAM_KEY, GROUP, ids);
operations.opsForStream().delete(STREAM_KEY, ids);
return null;
}
});
}

private void ackAndDelete(MapRecord<String, String, String> record) {
mqRedisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP, record.getId());
mqRedisTemplate.opsForStream().delete(STREAM_KEY, record.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ public void applyBatch(List<ApplyMessage> messages) {
.stream()
.collect(Collectors.toMap(Lesson::getId, l -> l));

String sql = "INSERT INTO lesson_participants (lesson_id, user_id, join_at, status) VALUES (?, ?, NOW(), ?)";
String sql = """
INSERT INTO lesson_participants (lesson_id, user_id, join_at, status)
VALUES (?, ?, NOW(), ?)
ON CONFLICT (user_id, lesson_id) DO NOTHING
""";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Expand Down Expand Up @@ -152,10 +156,13 @@ public boolean apply(Long lessonId, Long userId, String requestId, Long produceT
updateStatus(statusKey, "SUCCESS");

return true;
} catch (Exception e) {
log.error("Failed to apply lesson in consumer: {}", e.getMessage());
updateStatus(statusKey, "FAIL:ERROR");
} catch (BusinessException e) {
log.warn("Lesson apply business failed. requestId={}, errorCode={}", requestId, e.getErrorCode());
updateStatus(statusKey, "FAIL:" + e.getErrorCode().name());
return false;
} catch (Exception e) {
log.error("Failed to apply lesson in consumer. requestId={}", requestId, e);
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.threestar.trainus.domain.lesson.issue;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessage;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Profile("consumer")
@Slf4j
@Component
@RequiredArgsConstructor
public class LessonPendingMessageRecoveryScheduler {

private static final String STREAM_KEY = LessonApplyStreamConstant.STREAM_KEY;
private static final String GROUP = LessonApplyStreamConstant.GROUP;
private static final String RECOVERY_CONSUMER = "lesson-recovery-consumer";
private static final Duration MIN_IDLE_TIME = Duration.ofSeconds(10);
private static final int RECOVERY_BATCH_SIZE = 100;

@Qualifier("mqRedisTemplate")
private final StringRedisTemplate mqRedisTemplate;

private final LessonApplyConsumer lessonApplyConsumer;

@Scheduled(fixedDelay = 5000)
@SchedulerLock(name = "LessonPendingMessageRecovery", lockAtMostFor = "4s", lockAtLeastFor = "1s")
public void recoverPendingMessages() {
// Pending 메시지 목록 조회
PendingMessages pendingMessages = mqRedisTemplate.opsForStream()
.pending(STREAM_KEY, GROUP, Range.unbounded(), RECOVERY_BATCH_SIZE);

if (pendingMessages == null || pendingMessages.isEmpty()) {
return;
}

// 일정 시간 이상 응답이 없던 메시지만 선별
RecordId[] staleRecordIds = pendingMessages.stream()
.filter(message -> message.getElapsedTimeSinceLastDelivery().compareTo(MIN_IDLE_TIME) >= 0)
.map(PendingMessage::getId)
.toArray(RecordId[]::new);

if (staleRecordIds.length == 0) {
return;
}

List<MapRecord<String, Object, Object>> claimedRecords = mqRedisTemplate.opsForStream()
.claim(STREAM_KEY, GROUP, RECOVERY_CONSUMER, MIN_IDLE_TIME, staleRecordIds);

if (claimedRecords == null || claimedRecords.isEmpty()) {
return;
}

// 회수한 메시지를 기존 Consumer 흐름으로 재전달
claimedRecords.stream()
.map(this::toStringRecord)
.forEach(lessonApplyConsumer::onMessage);
log.warn("Recovered {} pending lesson apply messages.", claimedRecords.size());
}

private MapRecord<String, String, String> toStringRecord(MapRecord<String, Object, Object> record) {
// Claim 결과를 String 기반 Record로 변환
Map<String, String> value = new HashMap<>();
record.getValue().forEach((key, val) -> value.put(String.valueOf(key), String.valueOf(val)));
return MapRecord.create(STREAM_KEY, value).withId(record.getId());
}
}
Loading
Loading