22
33import io .micrometer .observation .Observation ;
44import io .micrometer .observation .ObservationRegistry ;
5+ import io .opentelemetry .api .trace .propagation .W3CTraceContextPropagator ;
6+ import io .opentelemetry .context .Context ;
7+ import io .opentelemetry .context .propagation .TextMapGetter ;
58import jakarta .annotation .PostConstruct ;
69import jakarta .annotation .PreDestroy ;
710import lombok .RequiredArgsConstructor ;
@@ -86,10 +89,16 @@ private void guardedListen(int index) {
8689 private void listen () {
8790 while (!Thread .currentThread ().isInterrupted () && running .get ()) {
8891 try {
89- String jobId = redisTemplate .opsForList ()
92+ String raw = redisTemplate .opsForList ()
9093 .rightPop (Constants .JOB_QUEUE_KEY , Duration .ofSeconds (30 ));
91- if (jobId == null ) continue ;
92- processJob (Long .parseLong (jobId ));
94+ if (raw == null ) continue ;
95+
96+ // "jobId|traceparent" 또는 "jobId" 형태 파싱
97+ String [] parts = raw .split ("\\ |" , 2 );
98+ Long jobId = Long .parseLong (parts [0 ]);
99+ String traceparent = parts .length > 1 ? parts [1 ] : null ;
100+
101+ processJob (jobId , traceparent );
93102
94103 } catch (QueryTimeoutException e ) {
95104 log .debug ("BRPOP timeout, 재시도" );
@@ -109,14 +118,29 @@ private void sleep(int seconds) {
109118 }
110119 }
111120
112- private void processJob (Long jobId ) {
113- // 루트 Span 생성 — Worker는 HTTP 요청이 아니라 Micrometer가 자동 생성 안 함
114- // Observation을 시작하면 traceId/spanId가 MDC에 자동 주입되고
115- // 내부의 Feign 호출(perspectives, density 등)이 child Span으로 자동 연결됨
116- Observation observation = Observation .createNotStarted ("job.process" , observationRegistry )
117- .lowCardinalityKeyValue ("jobId" , String .valueOf (jobId ));
118-
119- observation .observe (() -> {
121+ private void processJob (Long jobId , String traceparent ) {
122+ // traceparent가 있으면 HTTP 요청 trace의 child span으로 연결
123+ // 없으면 새 root trace 시작 (이전 버전 Redis 값 대비 방어)
124+ Context parentContext = traceparent != null
125+ ? W3CTraceContextPropagator .getInstance ().extract (
126+ Context .root (), traceparent ,
127+ new TextMapGetter <String >() {
128+ @ Override
129+ public Iterable <String > keys (String carrier ) {
130+ return java .util .List .of ("traceparent" );
131+ }
132+ @ Override
133+ public String get (String carrier , String key ) {
134+ return "traceparent" .equals (key ) ? carrier : null ;
135+ }
136+ })
137+ : Context .current ();
138+
139+ try (io .opentelemetry .context .Scope ignored = parentContext .makeCurrent ()) {
140+ Observation observation = Observation .createNotStarted ("job.process" , observationRegistry )
141+ .lowCardinalityKeyValue ("jobId" , String .valueOf (jobId ));
142+
143+ observation .observe (() -> {
120144 try {
121145 // jobId는 Micrometer가 자동으로 안 넣어주므로 직접 설정
122146 MdcUtils .setJobId (jobId );
@@ -151,6 +175,7 @@ private void processJob(Long jobId) {
151175 MdcUtils .clear ();
152176 }
153177 });
178+ } // try (parentContext.makeCurrent())
154179 }
155180
156181 private MatchExperienceCommandDto buildCommand (Long jobId ) {
0 commit comments