1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ *
17+ */
18+
19+ package org .apache .skywalking .apm .plugin .spring .kafka ;
20+
21+ import java .lang .reflect .Method ;
22+ import java .nio .charset .StandardCharsets ;
23+ import java .util .Iterator ;
24+ import java .util .Set ;
25+ import java .util .stream .Collectors ;
26+ import org .apache .kafka .clients .consumer .ConsumerRecord ;
27+ import org .apache .kafka .clients .consumer .ConsumerRecords ;
28+ import org .apache .kafka .common .TopicPartition ;
29+ import org .apache .kafka .common .header .Header ;
30+ import org .apache .skywalking .apm .agent .core .context .CarrierItem ;
31+ import org .apache .skywalking .apm .agent .core .context .ContextCarrier ;
32+ import org .apache .skywalking .apm .agent .core .context .ContextManager ;
33+ import org .apache .skywalking .apm .agent .core .context .tag .Tags ;
34+ import org .apache .skywalking .apm .agent .core .context .trace .AbstractSpan ;
35+ import org .apache .skywalking .apm .agent .core .context .trace .SpanLayer ;
36+ import org .apache .skywalking .apm .agent .core .plugin .interceptor .enhance .EnhancedInstance ;
37+ import org .apache .skywalking .apm .agent .core .plugin .interceptor .enhance .InstanceMethodsAroundInterceptor ;
38+ import org .apache .skywalking .apm .agent .core .plugin .interceptor .enhance .MethodInterceptResult ;
39+ import org .apache .skywalking .apm .network .trace .component .ComponentsDefine ;
40+ import org .apache .skywalking .apm .plugin .kafka .define .Constants ;
41+ import org .apache .skywalking .apm .plugin .kafka .define .KafkaContext ;
42+
43+ public class ExtendedKafkaConsumerInterceptor implements InstanceMethodsAroundInterceptor {
44+
45+ private static final String OPERATE_NAME_PREFIX = "Kafka/" ;
46+ private static final String CONSUMER_OPERATE_NAME = "/Consumer/" ;
47+ private static final String UNKNOWN = "Unknown" ;
48+
49+ @ Override
50+ public void beforeMethod (EnhancedInstance objInst , Method method , Object [] allArguments , Class <?>[] argumentsTypes ,
51+ MethodInterceptResult result ) throws Throwable {
52+ ExtendedConsumerEnhanceRequiredInfo requiredInfo = (ExtendedConsumerEnhanceRequiredInfo ) objInst .getSkyWalkingDynamicField ();
53+ if (requiredInfo != null ) {
54+ requiredInfo .setStartTime (System .currentTimeMillis ());
55+ }
56+ }
57+
58+ @ Override
59+ public Object afterMethod (EnhancedInstance objInst , Method method , Object [] allArguments , Class <?>[] argumentsTypes ,
60+ Object ret ) throws Throwable {
61+ if (ret == null ) {
62+ return ret ;
63+ }
64+
65+ ConsumerRecords <?, ?> records = (ConsumerRecords <?, ?>) ret ;
66+
67+ // Only create entry span when consumer received at least one message
68+ if (records .count () > 0 ) {
69+ createEntrySpan (objInst , records );
70+ }
71+ return ret ;
72+ }
73+
74+ @ Override
75+ public void handleMethodException (EnhancedInstance objInst , Method method , Object [] allArguments ,
76+ Class <?>[] argumentsTypes , Throwable t ) {
77+ if (ContextManager .isActive ()) {
78+ ContextManager .activeSpan ().log (t );
79+ }
80+ }
81+
82+ private void createEntrySpan (EnhancedInstance objInst , ConsumerRecords <?, ?> records ) {
83+ KafkaContext context = (KafkaContext ) ContextManager .getRuntimeContext ().get (Constants .KAFKA_FLAG );
84+ if (context != null ) {
85+ ContextManager .createEntrySpan (context .getOperationName (), null );
86+ context .setNeedStop (true );
87+ }
88+
89+ ExtendedConsumerEnhanceRequiredInfo requiredInfo = (ExtendedConsumerEnhanceRequiredInfo ) objInst .getSkyWalkingDynamicField ();
90+
91+ SpanInfo spanInfo = buildSpanInfo (requiredInfo , records );
92+
93+ String operationName = OPERATE_NAME_PREFIX + spanInfo .topic + CONSUMER_OPERATE_NAME + spanInfo .groupId ;
94+ AbstractSpan activeSpan = ContextManager .createEntrySpan (operationName , null );
95+
96+ if (requiredInfo != null ) {
97+ activeSpan .start (requiredInfo .getStartTime ());
98+ }
99+
100+ activeSpan .setComponent (ComponentsDefine .KAFKA_CONSUMER );
101+ SpanLayer .asMQ (activeSpan );
102+ Tags .MQ_BROKER .set (activeSpan , spanInfo .brokerServers );
103+ Tags .MQ_TOPIC .set (activeSpan , spanInfo .topic );
104+ activeSpan .setPeer (spanInfo .brokerServers );
105+
106+ extractContextCarrier (records );
107+ ContextManager .stopSpan ();
108+ }
109+
110+ private SpanInfo buildSpanInfo (ExtendedConsumerEnhanceRequiredInfo requiredInfo , ConsumerRecords <?, ?> records ) {
111+ String topic = UNKNOWN ;
112+ String groupId = UNKNOWN ;
113+ String brokerServers = UNKNOWN ;
114+
115+ if (requiredInfo != null ) {
116+ groupId = requiredInfo .getGroupId ();
117+ brokerServers = requiredInfo .getBrokerServers ();
118+ }
119+
120+ Set <TopicPartition > partitions = records .partitions ();
121+ if (!partitions .isEmpty ()) {
122+ topic = partitions .stream ()
123+ .map (TopicPartition ::topic ).distinct ()
124+ .collect (Collectors .joining (";" ));
125+ }
126+
127+ return new SpanInfo (topic , groupId , brokerServers );
128+ }
129+
130+ private void extractContextCarrier (ConsumerRecords <?, ?> records ) {
131+ for (ConsumerRecord <?, ?> record : records ) {
132+ ContextCarrier contextCarrier = new ContextCarrier ();
133+ CarrierItem next = contextCarrier .items ();
134+
135+ while (next .hasNext ()) {
136+ next = next .next ();
137+ Iterator <Header > iterator = record .headers ().headers (next .getHeadKey ()).iterator ();
138+ if (iterator .hasNext ()) {
139+ next .setHeadValue (new String (iterator .next ().value (), StandardCharsets .UTF_8 ));
140+ }
141+ }
142+ ContextManager .extract (contextCarrier );
143+ }
144+ }
145+
146+ private static class SpanInfo {
147+ final String topic ;
148+ final String groupId ;
149+ final String brokerServers ;
150+
151+ SpanInfo (String topic , String groupId , String brokerServers ) {
152+ this .topic = topic ;
153+ this .groupId = groupId ;
154+ this .brokerServers = brokerServers ;
155+ }
156+ }
157+ }
0 commit comments