|
4 | 4 | import jakarta.annotation.PreDestroy; |
5 | 5 | import lombok.RequiredArgsConstructor; |
6 | 6 | import lombok.extern.slf4j.Slf4j; |
| 7 | +import org.springframework.dao.QueryTimeoutException; |
7 | 8 | import org.springframework.data.redis.core.StringRedisTemplate; |
8 | 9 | import org.springframework.stereotype.Component; |
9 | 10 | import sopt.comfit.global.constants.Constants; |
|
18 | 19 | import sopt.comfit.report.service.AIReportQueryService; |
19 | 20 |
|
20 | 21 | import java.time.Duration; |
21 | | -import java.util.ArrayList; |
22 | 22 | import java.util.List; |
| 23 | +import java.util.concurrent.CopyOnWriteArrayList; |
| 24 | +import java.util.concurrent.atomic.AtomicBoolean; |
23 | 25 |
|
24 | 26 | @Slf4j |
25 | 27 | @Component |
26 | 28 | @RequiredArgsConstructor |
27 | 29 | public class AIReportJobWorker { |
28 | 30 |
|
| 31 | + private static final int WORKER_COUNT = 2; |
| 32 | + |
29 | 33 | private final StringRedisTemplate redisTemplate; |
30 | 34 | private final AIReportJobService reportJobService; |
31 | 35 | private final AIReportQueryService aiReportQueryService; |
32 | 36 | private final AIReportCommandService aiReportCommandService; |
33 | 37 | private final RetryableAiCallerService aiCaller; |
34 | 38 |
|
35 | | - List<Thread> workers = new ArrayList<>(); |
| 39 | + private final AtomicBoolean running = new AtomicBoolean(false); |
| 40 | + List<Thread> workers = new CopyOnWriteArrayList<>(); |
36 | 41 |
|
37 | 42 | @PostConstruct |
38 | 43 | public void startWorker() { |
39 | | - int workerCount = 2; |
40 | | - workers = new ArrayList<>(); |
41 | | - for (int i = 0; i < workerCount; i++) { |
42 | | - Thread t = Thread.ofVirtual() |
43 | | - .name("report-job-worker-" + i) |
44 | | - .start(this::listen); |
45 | | - workers.add(t); |
| 44 | + running.set(true); |
| 45 | + workers = new CopyOnWriteArrayList<>(); |
| 46 | + for (int i = 0; i < WORKER_COUNT; i++) { |
| 47 | + spawnWorker(i); |
46 | 48 | } |
47 | | - log.info("ReportJobWorker {}개 시작", workerCount); |
| 49 | + log.info("ReportJobWorker {}개 시작", WORKER_COUNT); |
48 | 50 | } |
49 | 51 |
|
50 | 52 | @PreDestroy |
51 | 53 | public void stopWorker() { |
| 54 | + running.set(false); |
52 | 55 | workers.forEach(Thread::interrupt); |
53 | 56 | log.info("ReportJobWorker 종료"); |
54 | 57 | } |
55 | 58 |
|
| 59 | + private void spawnWorker(int index) { |
| 60 | + Thread t = Thread.ofVirtual() |
| 61 | + .name("report-job-worker-" + index) |
| 62 | + .start(() -> guardedListen(index)); |
| 63 | + workers.add(t); |
| 64 | + } |
| 65 | + |
| 66 | + /** |
| 67 | + * listen()을 감싸서 Error 포함 Throwable이 발생해 스레드가 죽어도 |
| 68 | + * running 상태면 자동으로 재시작한다. |
| 69 | + */ |
| 70 | + private void guardedListen(int index) { |
| 71 | + while (running.get()) { |
| 72 | + try { |
| 73 | + listen(); |
| 74 | + } catch (Throwable t) { |
| 75 | + if (!running.get()) break; |
| 76 | + log.error("Worker-{} 치명적 오류로 종료, 3초 후 재시작", index, t); |
| 77 | + sleep(3); |
| 78 | + } |
| 79 | + } |
| 80 | + log.info("Worker-{} 정상 종료", index); |
| 81 | + } |
| 82 | + |
56 | 83 | private void listen() { |
57 | | - while (!Thread.currentThread().isInterrupted()) { |
| 84 | + while (!Thread.currentThread().isInterrupted() && running.get()) { |
58 | 85 | try { |
59 | 86 | String jobId = redisTemplate.opsForList() |
60 | 87 | .rightPop(Constants.JOB_QUEUE_KEY, Duration.ofSeconds(30)); |
61 | | - |
62 | 88 | if (jobId == null) continue; |
63 | | - |
64 | 89 | processJob(Long.parseLong(jobId)); |
| 90 | + |
| 91 | + } catch (QueryTimeoutException e) { |
| 92 | + log.debug("BRPOP timeout, 재시도"); |
| 93 | + |
65 | 94 | } catch (Exception e) { |
66 | 95 | log.error("Worker 루프 에러", e); |
| 96 | + sleep(3); // Redis 장애 시 스핀 방지 |
67 | 97 | } |
68 | 98 | } |
69 | 99 | } |
70 | 100 |
|
| 101 | + private void sleep(int seconds) { |
| 102 | + try { |
| 103 | + Thread.sleep(Duration.ofSeconds(seconds)); |
| 104 | + } catch (InterruptedException e) { |
| 105 | + Thread.currentThread().interrupt(); |
| 106 | + } |
| 107 | + } |
| 108 | + |
71 | 109 | private void processJob(Long jobId) { |
72 | 110 |
|
73 | 111 | try { |
|
0 commit comments