@@ -39,6 +39,12 @@ public class LessonStockReconciliationScheduler {
3939 @ Scheduled (fixedRate = 30000 )
4040 @ SchedulerLock (name = "LessonStockReconciliation" , lockAtMostFor = "25s" , lockAtLeastFor = "20s" )
4141 public void reconcileStock () {
42+ // 대기열 잔여 메세지 존재 시 연기 (Core Redis)
43+ if (hasWaitingRoomMessages ()) {
44+ log .info ("Waiting room still has messages. Postponing reconciliation." );
45+ return ;
46+ }
47+
4248 // 미처리 메세지 존재 시 연기 (MQ Redis)
4349 if (hasStreamLag ()) {
4450 log .info ("Stream still has pending messages or backlog in MQ. Postponing reconciliation." );
@@ -134,6 +140,37 @@ public void reconcileStock() {
134140 log .info ("Finished Smart Stock Reconciliation for {} lessons." , processedCount );
135141 }
136142
143+ // 미처리 메세지 확인 메서드 (Core Redis)
144+ private boolean hasWaitingRoomMessages () {
145+ String dirtySetKey = LessonApplyStreamConstant .DIRTY_SET_KEY ;
146+
147+ // Dirty Set 확인 (Core Redis)
148+ java .util .Set <String > lessonIds = coreRedisTemplate .opsForSet ().members (dirtySetKey );
149+ if (lessonIds == null || lessonIds .isEmpty ()) {
150+ return false ;
151+ }
152+
153+ for (String lessonIdStr : lessonIds ) {
154+ try {
155+ Long lessonId = Long .valueOf (lessonIdStr );
156+
157+ // 레슨별 대기열 잔여 메세지 확인 (Core Redis)
158+ String waitingRoomKey = String .format (LessonApplyStreamConstant .WAITING_ROOM_KEY , lessonId );
159+ Long size = coreRedisTemplate .opsForZSet ().size (waitingRoomKey );
160+ if (size != null && size > 0 ) {
161+ log .info ("Waiting room still has messages. lessonId={}, size={}" , lessonId , size );
162+ return true ;
163+ }
164+ } catch (Exception e ) {
165+ // 대기열 확인 실패 시 보수적으로 보정 연기
166+ log .warn ("Failed to check waiting room for lesson [{}]: {}" , lessonIdStr , e .getMessage ());
167+ return true ;
168+ }
169+ }
170+
171+ return false ;
172+ }
173+
137174 // 미처리 메세지 확인 메서드 (MQ Redis)
138175 private boolean hasStreamLag () {
139176 try {
@@ -151,8 +188,9 @@ private boolean hasStreamLag() {
151188 return size != null && size > 0 ;
152189
153190 } catch (Exception e ) {
154- // 스트림이 없거나 초기 상태일 경우
155- return false ;
191+ // 스트림 상태 확인 실패 시 보수적으로 보정 연기
192+ log .warn ("Failed to check stream lag. Postponing reconciliation: {}" , e .getMessage ());
193+ return true ;
156194 }
157195 }
158196}
0 commit comments