Skip to content

Commit b54cb3a

Browse files
authored
refactor: 배치 처리 최대 허용 시간 정의 및 Busy Counter 상태 유지 주기 동기화 (#224)
* refactor: 배치 처리 최대 허용 시간 정의 및 Busy Counter 상태 유지 주기 동기화
1 parent e6795ea commit b54cb3a

2 files changed

Lines changed: 40 additions & 7 deletions

File tree

src/main/java/com/threestar/trainus/domain/lesson/issue/LessonApplyConsumer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,10 @@ public void onMessage(MapRecord<String, String, String> message) {
4545
String lessonIdStr = message.getValue().get("lessonId");
4646
if (lessonIdStr != null) {
4747
String key = "lesson:busy:" + lessonIdStr;
48+
String lastActiveKey = "lesson:busy:last_active:" + lessonIdStr;
4849
coreRedisTemplate.opsForValue().increment(key);
49-
coreRedisTemplate.expire(key, java.time.Duration.ofSeconds(60));
50+
coreRedisTemplate.expire(key, java.time.Duration.ofMinutes(10));
51+
coreRedisTemplate.opsForValue().set(lastActiveKey, String.valueOf(System.currentTimeMillis()), java.time.Duration.ofMinutes(10));
5052
}
5153

5254
buffer.add(message);
@@ -132,11 +134,15 @@ public Object execute(org.springframework.data.redis.core.RedisOperations operat
132134
}
133135
} finally {
134136
// 작업 완료 후 각 레슨별로 이번 배치에서 처리한 개수만큼 Busy 카운터 감소
137+
// 작업 완료 후 각 레슨별로 마지막 처리 시점 기록
135138
Map<Long, Long> countsPerLesson = messages.stream()
136139
.collect(Collectors.groupingBy(ApplyMessage::lessonId, Collectors.counting()));
137140

138141
countsPerLesson.forEach((lessonId, count) -> {
139-
coreRedisTemplate.opsForValue().decrement("lesson:busy:" + lessonId, count);
142+
String busyKey = "lesson:busy:" + lessonId;
143+
String lastActiveKey = "lesson:busy:last_active:" + lessonId;
144+
coreRedisTemplate.opsForValue().decrement(busyKey, count);
145+
coreRedisTemplate.opsForValue().set(lastActiveKey, String.valueOf(System.currentTimeMillis()), java.time.Duration.ofMinutes(10));
140146
});
141147
}
142148
}

src/main/java/com/threestar/trainus/domain/lesson/issue/LessonStockReconciliationScheduler.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,40 @@ public void reconcileStock() {
6363

6464
// 해당 레슨이 현재 배치 처리 중인지 확인
6565
String busyKey = "lesson:busy:" + lessonId;
66+
String lastActiveKey = "lesson:busy:last_active:" + lessonId;
6667
String busyCountStr = coreRedisTemplate.opsForValue().get(busyKey);
67-
int busyCount = (busyCountStr == null) ? 0 : Integer.parseInt(busyCountStr);
6868

69-
if (busyCount > 0) {
70-
log.info("Lesson [{}] is still being processed by batch consumers (Busy count: {}). Skipping reconciliation for now.",
71-
lessonId, busyCount);
72-
continue;
69+
// 보정 로직 미실행 조건 체크
70+
if (busyCountStr != null) {
71+
// 마지막 활동 시간 로드
72+
String lastActiveStr = coreRedisTemplate.opsForValue().get(lastActiveKey);
73+
long now = System.currentTimeMillis();
74+
int busyCount = Integer.parseInt(busyCountStr);
75+
76+
// 일정 시간 무응답시 교착상태 방지
77+
boolean isStale = (lastActiveStr != null && (now - Long.parseLong(lastActiveStr) > 600000)); // 10분 이상 무응답
78+
boolean isRecent = (lastActiveStr != null && (now - Long.parseLong(lastActiveStr) < 5000)); // 5초 이내 활동
79+
boolean isNegative = busyCount < 0;
80+
81+
// busy 카운터 존재 & 응답 10분 미만
82+
if (busyCount > 0 && !isStale) {
83+
log.info("Lesson [{}] is still being processed (Busy count: {}). Skipping reconciliation.",
84+
lessonId, busyCountStr);
85+
continue;
86+
}
87+
88+
// busy 카운터 = 0 & 5초 내 활동이 있었음 (DB트랜잭션 잠시 대기)
89+
if (busyCount == 0 && isRecent) {
90+
log.info("Lesson [{}] recently active. Waiting for safety margin.", lessonId);
91+
continue;
92+
}
93+
94+
// 응답 10분 초과 OR 음수 카운터 발생 (교착상태 방지 및 회복)
95+
if (isStale || isNegative) {
96+
log.warn("Detected stale or invalid busy state for lesson [{}] (Count: {}, Stale: {}). Forcing reset.",
97+
lessonId, busyCountStr, isStale);
98+
coreRedisTemplate.opsForValue().set(busyKey, "0", java.time.Duration.ofMinutes(10));
99+
}
73100
}
74101

75102
// 실제 참여자 수 계산 (DB)

0 commit comments

Comments
 (0)