Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ public void onMessage(MapRecord<String, String, String> message) {
String lessonIdStr = message.getValue().get("lessonId");
if (lessonIdStr != null) {
String key = "lesson:busy:" + lessonIdStr;
String lastActiveKey = "lesson:busy:last_active:" + lessonIdStr;
coreRedisTemplate.opsForValue().increment(key);
coreRedisTemplate.expire(key, java.time.Duration.ofSeconds(60));
coreRedisTemplate.expire(key, java.time.Duration.ofMinutes(10));
coreRedisTemplate.opsForValue().set(lastActiveKey, String.valueOf(System.currentTimeMillis()), java.time.Duration.ofMinutes(10));
}

buffer.add(message);
Expand Down Expand Up @@ -132,11 +134,15 @@ public Object execute(org.springframework.data.redis.core.RedisOperations operat
}
} finally {
// 작업 완료 후 각 레슨별로 이번 배치에서 처리한 개수만큼 Busy 카운터 감소
// 작업 완료 후 각 레슨별로 마지막 처리 시점 기록
Map<Long, Long> countsPerLesson = messages.stream()
.collect(Collectors.groupingBy(ApplyMessage::lessonId, Collectors.counting()));

countsPerLesson.forEach((lessonId, count) -> {
coreRedisTemplate.opsForValue().decrement("lesson:busy:" + lessonId, count);
String busyKey = "lesson:busy:" + lessonId;
String lastActiveKey = "lesson:busy:last_active:" + lessonId;
coreRedisTemplate.opsForValue().decrement(busyKey, count);
coreRedisTemplate.opsForValue().set(lastActiveKey, String.valueOf(System.currentTimeMillis()), java.time.Duration.ofMinutes(10));
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,40 @@ public void reconcileStock() {

// 해당 레슨이 현재 배치 처리 중인지 확인
String busyKey = "lesson:busy:" + lessonId;
String lastActiveKey = "lesson:busy:last_active:" + lessonId;
String busyCountStr = coreRedisTemplate.opsForValue().get(busyKey);
int busyCount = (busyCountStr == null) ? 0 : Integer.parseInt(busyCountStr);

if (busyCount > 0) {
log.info("Lesson [{}] is still being processed by batch consumers (Busy count: {}). Skipping reconciliation for now.",
lessonId, busyCount);
continue;
// 보정 로직 미실행 조건 체크
if (busyCountStr != null) {
// 마지막 활동 시간 로드
String lastActiveStr = coreRedisTemplate.opsForValue().get(lastActiveKey);
long now = System.currentTimeMillis();
int busyCount = Integer.parseInt(busyCountStr);

// 일정 시간 무응답시 교착상태 방지
boolean isStale = (lastActiveStr != null && (now - Long.parseLong(lastActiveStr) > 600000)); // 10분 이상 무응답
boolean isRecent = (lastActiveStr != null && (now - Long.parseLong(lastActiveStr) < 5000)); // 5초 이내 활동
boolean isNegative = busyCount < 0;

// busy 카운터 존재 & 응답 10분 미만
if (busyCount > 0 && !isStale) {
log.info("Lesson [{}] is still being processed (Busy count: {}). Skipping reconciliation.",
lessonId, busyCountStr);
continue;
}

// busy 카운터 = 0 & 5초 내 활동이 있었음 (DB트랜잭션 잠시 대기)
if (busyCount == 0 && isRecent) {
log.info("Lesson [{}] recently active. Waiting for safety margin.", lessonId);
continue;
}

// 응답 10분 초과 OR 음수 카운터 발생 (교착상태 방지 및 회복)
if (isStale || isNegative) {
log.warn("Detected stale or invalid busy state for lesson [{}] (Count: {}, Stale: {}). Forcing reset.",
lessonId, busyCountStr, isStale);
coreRedisTemplate.opsForValue().set(busyKey, "0", java.time.Duration.ofMinutes(10));
}
}

// 실제 참여자 수 계산 (DB)
Expand Down
Loading