Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions scripts/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,17 @@ if [ -f "$PROJECT_ROOT/build/libs/$JAR_NAME" ]; then
cp $PROJECT_ROOT/build/libs/$JAR_NAME $PROJECT_ROOT/$JAR_NAME
fi

echo "> SERVER_ROLE: $SERVER_ROLE 프로파일로 실행합니다." >> $DEPLOY_LOG
# 5. 프로파일 설정 결정 (환경변수 우선순위 적용)
if [ -n "$SPRING_PROFILES_ACTIVE" ]; then
ACTIVE_PROFILE=$SPRING_PROFILES_ACTIVE
echo "> 시스템 환경변수 SPRING_PROFILES_ACTIVE($ACTIVE_PROFILE)를 사용합니다." >> $DEPLOY_LOG
else
ACTIVE_PROFILE=$SERVER_ROLE
echo "> SERVER_ROLE($ACTIVE_PROFILE)을 기본 프로파일로 사용합니다." >> $DEPLOY_LOG
fi

nohup java -jar \
-Dspring.profiles.active=$SERVER_ROLE \
-Dspring.profiles.active=$ACTIVE_PROFILE \
$PROJECT_ROOT/$JAR_NAME > $APP_LOG 2>&1 &

echo "> 애플리케이션 실행 완료" >> $DEPLOY_LOG
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.threestar.trainus.domain.lesson.issue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Profile("consumer & legacy")
@Slf4j
@Component
@RequiredArgsConstructor
public class LegacyLessonAdmissionScheduler {

@Qualifier("coreRedisTemplate")
private final StringRedisTemplate coreRedisTemplate;

@Qualifier("mqRedisTemplate")
private final StringRedisTemplate mqRedisTemplate;

private final LessonWaitingRoomService waitingRoomService;

private static final long ADMIT_BATCH_SIZE = 1000L;

@Scheduled(fixedDelay = 500)
public void admitUsers() {
Set<String> activeLessonIds = coreRedisTemplate.opsForSet().members(LessonApplyStreamConstant.DIRTY_SET_KEY);

if (activeLessonIds == null || activeLessonIds.isEmpty()) {
return;
}

for (String lessonIdStr : activeLessonIds) {
Long lessonId = Long.parseLong(lessonIdStr);
processAdmissionForLesson(lessonId);
}
}

private void processAdmissionForLesson(Long lessonId) {
// 해당 레슨 대기열에서 인원 추출
Set<String> requestIds = waitingRoomService.dequeue(lessonId, ADMIT_BATCH_SIZE);

if (requestIds.isEmpty()) {
return;
}

// MGET 방식으로 벌크 GET
List<String> statusKeys = requestIds.stream().map(id -> LessonApplyStreamConstant.STATUS_PREFIX + id).toList();
List<String> statusInfos = mqRedisTemplate.opsForValue().multiGet(statusKeys);

if (statusInfos == null)
return;

// 상태 변경 및 스트림 추가
for (int i = 0; i < statusKeys.size(); i++) {
String info = statusInfos.get(i);
if (info == null)
continue;

String[] parts = info.split(":");
if (parts.length < 3)
continue;

String requestId = requestIds.toArray(new String[0])[i];
Long userId = Long.parseLong(parts[2]);
Long originalTimestamp = parts.length >= 4 ? Long.parseLong(parts[3]) : System.currentTimeMillis();

// 상태 변경
String statusKey = statusKeys.get(i);
String processingInfo = String.format("%s:%d:%d:%d", LessonApplyStreamConstant.STATUS_PROCESSING, lessonId,
userId, originalTimestamp);
mqRedisTemplate.opsForValue()
.set(statusKey, processingInfo,
java.time.Duration.ofMinutes(LessonApplyStreamConstant.STATUS_TTL_MINUTE));

// 스트림 추가
Map<String, String> content = new HashMap<>();
content.put("lessonId", String.valueOf(lessonId));
content.put("userId", String.valueOf(userId));
content.put("requestId", requestId);
content.put("timestamp", String.valueOf(originalTimestamp));
mqRedisTemplate.opsForStream().add(LessonApplyStreamConstant.STREAM_KEY, content);
}

log.info("[Legacy] Admitted {} users to MQ via iteration for lesson: {}", requestIds.size(), lessonId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.threestar.trainus.domain.lesson.issue;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

@Profile("consumer & legacy")
@Slf4j
@Component
@RequiredArgsConstructor
public class LegacyLessonApplyConsumer implements StreamListener<String, MapRecord<String, String, String>> {

@Qualifier("mqRedisTemplate")
private final StringRedisTemplate mqRedisTemplate;

private final LessonApplyService lessonApplyService;

private static final String STREAM_KEY = LessonApplyStreamConstant.STREAM_KEY;
private static final String GROUP = LessonApplyStreamConstant.GROUP;

@Override
public void onMessage(MapRecord<String, String, String> message) {
Map<String, String> value = message.getValue();

Long lessonId = Long.parseLong(value.get("lessonId"));
Long userId = Long.parseLong(value.get("userId"));
String requestId = value.get("requestId");
Long timestamp = Long.parseLong(value.get("timestamp"));

try {
// DB 저장
lessonApplyService.apply(lessonId, userId, requestId, timestamp);

// Redis ACK 및 DELETE
mqRedisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP, message.getId());
mqRedisTemplate.opsForStream().delete(STREAM_KEY, message.getId());

log.info("[Legacy] Consumer processed single message: {}", requestId);
} catch (Exception e) {
log.error("[Legacy] Consumer failed: {}. Error: {}", requestId, e.getMessage());
mqRedisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP, message.getId());
mqRedisTemplate.opsForStream().delete(STREAM_KEY, message.getId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Profile("consumer")
@Profile("consumer & !legacy")
@Slf4j
@Component
@RequiredArgsConstructor
Expand Down Expand Up @@ -59,12 +59,6 @@ private void processAdmissionForLesson(Long lessonId) {
Set<String> requestIds = waitingRoomService.dequeue(lessonId, ADMIT_BATCH_SIZE);

if (requestIds.isEmpty()) {
// 실제 대기열이 비었는지 다시 확인
Long remainingInWaitingRoom = coreRedisTemplate.opsForZSet().size(String.format(LessonApplyStreamConstant.WAITING_ROOM_KEY, lessonId));
if (remainingInWaitingRoom == null || remainingInWaitingRoom == 0) {
coreRedisTemplate.opsForSet().remove(LessonApplyStreamConstant.DIRTY_SET_KEY, String.valueOf(lessonId));
log.info("Lesson {} admission completed. Removed from active set.", lessonId);
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;

@Profile("consumer")
@Profile("consumer & !legacy")
@Slf4j
@Component
@RequiredArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

import jakarta.annotation.PostConstruct;
Expand All @@ -27,7 +28,7 @@ public class LessonApplyStreamConfig {
private int concurrency;

private final StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;
private final LessonApplyConsumer lessonApplyConsumer;
private final StreamListener<String, MapRecord<String, String, String>> lessonApplyConsumer;

@Qualifier("mqRedisTemplate")
private final StringRedisTemplate mqRedisTemplate;
Expand Down
Loading