1+ package sopt .comfit .report .job ;
2+
3+ import jakarta .annotation .PostConstruct ;
4+ import jakarta .annotation .PreDestroy ;
5+ import lombok .RequiredArgsConstructor ;
6+ import lombok .extern .slf4j .Slf4j ;
7+ import org .springframework .data .redis .core .StringRedisTemplate ;
8+ import org .springframework .stereotype .Component ;
9+ import sopt .comfit .global .constants .Constants ;
10+ import sopt .comfit .report .domain .AIReportJob ;
11+ import sopt .comfit .report .dto .command .MatchExperienceCommandDto ;
12+ import sopt .comfit .report .infra .dto .PreparedDataDto ;
13+ import sopt .comfit .report .infra .prompt .AIReportParallelPromptBuilder ;
14+ import sopt .comfit .report .infra .service .RetryableAiCallerService ;
15+ import sopt .comfit .report .service .AIReportCommandService ;
16+ import sopt .comfit .report .service .AIReportQueryService ;
17+
18+ import java .time .Duration ;
19+ import java .util .ArrayList ;
20+ import java .util .List ;
21+
22+ @ Slf4j
23+ @ Component
24+ @ RequiredArgsConstructor
25+ public class AIReportJobWorker {
26+
27+ private final StringRedisTemplate redisTemplate ;
28+ private final AIReportJobService reportJobService ;
29+ private final AIReportQueryService aiReportQueryService ;
30+ private final AIReportCommandService aiReportCommandService ;
31+ private final RetryableAiCallerService aiCaller ;
32+
33+ List <Thread > workers = new ArrayList <>();
34+
35+ @ PostConstruct
36+ public void startWorker () {
37+ int workerCount = 3 ;
38+ workers = new ArrayList <>();
39+ for (int i = 0 ; i < workerCount ; i ++) {
40+ Thread t = Thread .ofVirtual ()
41+ .name ("report-job-worker-" + i )
42+ .start (this ::listen );
43+ workers .add (t );
44+ }
45+ log .info ("ReportJobWorker {}개 시작" , workerCount );
46+ }
47+
48+ @ PreDestroy
49+ public void stopWorker () {
50+ workers .forEach (Thread ::interrupt );
51+ log .info ("ReportJobWorker 종료" );
52+ }
53+
54+ private void listen () {
55+ while (!Thread .currentThread ().isInterrupted ()) {
56+ try {
57+ String jobId = redisTemplate .opsForList ()
58+ .rightPop (Constants .JOB_QUEUE_KEY , Duration .ofSeconds (30 ));
59+
60+ if (jobId == null ) continue ;
61+
62+ processJob (Long .parseLong (jobId ));
63+ } catch (Exception e ) {
64+ log .error ("Worker 루프 에러" , e );
65+ }
66+ }
67+ }
68+
69+ private void processJob (Long jobId ) {
70+ reportJobService .startProcessing (jobId );
71+
72+ try {
73+ MatchExperienceCommandDto command = buildCommand (jobId );
74+ PreparedDataDto data = aiReportQueryService .prepareData (command );
75+
76+ String perspectivesJson = aiCaller .callSyncWithField (
77+ AIReportParallelPromptBuilder .buildPerspective (data ),
78+ "Perspectives" , "perspectives" );
79+
80+ String mergedJson = aiCaller .callParallelWithVirtualThread (data , perspectivesJson );
81+
82+ aiReportCommandService .parseAndSave (mergedJson , data .experience (),
83+ data .company (), command .jobDescription ());
84+
85+ reportJobService .complete (jobId );
86+ log .info ("Job 처리 완료 - jobId: {}" , jobId );
87+ } catch (Exception e ) {
88+ log .error ("Job 처리 실패 - jobId: {}" , jobId , e );
89+ reportJobService .fail (jobId );
90+ }
91+ }
92+
93+ private MatchExperienceCommandDto buildCommand (Long jobId ) {
94+ AIReportJob job = reportJobService .findJob (jobId );
95+ return new MatchExperienceCommandDto (
96+ job .getUserId (),
97+ job .getCompanyId (),
98+ job .getExperienceId (),
99+ job .getDescription ());
100+ }
101+ }
0 commit comments