Skip to content

Commit cda82b7

Browse files
Add aiokafka support (AIOKafkaInstrumentation)
1 parent 7bbd56f commit cda82b7

6 files changed

Lines changed: 810 additions & 0 deletions

File tree

Lines changed: 372 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,372 @@
1+
# BSD 3-Clause License
2+
#
3+
# Copyright (c) 2019, Elasticsearch BV
4+
# All rights reserved.
5+
#
6+
# Redistribution and use in source and binary forms, with or without
7+
# modification, are permitted provided that the following conditions are met:
8+
#
9+
# * Redistributions of source code must retain the above copyright notice, this
10+
# list of conditions and the following disclaimer.
11+
#
12+
# * Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# * Neither the name of the copyright holder nor the names of its
17+
# contributors may be used to endorse or promote products derived from
18+
# this software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
31+
import time
32+
from collections.abc import Awaitable, Callable, Container, Iterable, MutableSequence
33+
from enum import IntEnum
34+
from typing import TYPE_CHECKING, Dict, List, Optional, TypeVar, cast
35+
36+
from elasticapm import Client, get_client
37+
from elasticapm.conf.constants import OUTCOME, TRACEPARENT_BINARY_HEADER_NAME
38+
from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule
39+
from elasticapm.traces import DroppedSpan, Span, Transaction, capture_span, execution_context
40+
from elasticapm.utils.disttracing import TraceParent
41+
42+
if TYPE_CHECKING:
43+
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer, ConsumerRecord, TopicPartition # pragma: no cover
44+
45+
46+
class _KafkaTimestampType(IntEnum):
47+
NO_TIMESTAMP_TYPE = -1
48+
CREATE_TIME = 0
49+
LOG_APPEND_TIME = 1
50+
51+
52+
class AIOKafkaInstrumentation(AsyncAbstractInstrumentedModule):
53+
"""Instrument the aiokafka's consumer and producer
54+
55+
Features:
56+
- Like KafkaInstrumentation, it begins a new transaction on asynchronous
57+
iteration over the consumer if no transaction is active.
58+
- Unlike KafkaInstrumentation, when an active transaction exists, it also
59+
records the last span that awaits a message before StopAsyncIteration
60+
arises.
61+
- Automatic trace context propagation is not supported for messages being
62+
sent via send_batch().
63+
"""
64+
65+
instrument_list = [
66+
("aiokafka", "AIOKafkaConsumer.getone"),
67+
("aiokafka", "AIOKafkaConsumer.getmany"),
68+
("aiokafka", "AIOKafkaProducer.send"),
69+
("aiokafka", "AIOKafkaProducer.send_batch"),
70+
("aiokafka", "AIOKafkaConsumer.__anext__"),
71+
]
72+
name = "aiokafka"
73+
creates_transactions = True
74+
75+
SPAN_TYPE = SERVICE_TYPE = TRANSACTION_TYPE = "messaging"
76+
SPAN_SUBTYPE = SERVICE_NAME = "kafka"
77+
78+
T_Result = TypeVar("T_Result")
79+
80+
async def call(
81+
self,
82+
module: str,
83+
method: str,
84+
wrapped: Callable[..., Awaitable[T_Result]],
85+
instance: Optional[object],
86+
args: tuple,
87+
kwargs: dict,
88+
) -> T_Result:
89+
90+
client = get_client()
91+
if not client:
92+
return await wrapped(*args, **kwargs)
93+
94+
transaction = execution_context.get_transaction()
95+
96+
if method == "AIOKafkaConsumer.__anext__":
97+
# If no transaction exists, we create and start new ones implicitly
98+
# like we do in KafkaInstrumentation.
99+
100+
if transaction and transaction.transaction_type != self.TRANSACTION_TYPE:
101+
# Somebody started a transaction outside of the consumer,
102+
# so we will only capture subsequent getone() as a span.
103+
return await wrapped(*args, **kwargs)
104+
105+
# No transaction running, or this is a transaction started by us,
106+
# so let's end it and start the next,
107+
# unless a StopAsyncIteration is raised, at which point we do nothing.
108+
if transaction:
109+
client.end_transaction(result=OUTCOME.SUCCESS)
110+
111+
# May raise StopAsyncIteration
112+
result = await wrapped(*args, **kwargs)
113+
message = cast("ConsumerRecord", result)
114+
115+
if client.should_ignore_topic(message.topic):
116+
return result
117+
118+
trace_parent = _extract_trace_parent_from_message_headers(message.headers)
119+
transaction = client.begin_transaction(self.TRANSACTION_TYPE, trace_parent=trace_parent)
120+
121+
if not transaction:
122+
return result
123+
124+
transaction.name = f"Kafka RECEIVE from {message.topic}"
125+
self._enrich_transaction_context(
126+
transaction, message.topic, timestamp_type=message.timestamp_type, timestamp=message.timestamp
127+
)
128+
129+
return result
130+
131+
elif not transaction:
132+
return await wrapped(*args, **kwargs)
133+
134+
elif method.startswith("AIOKafkaConsumer.get"):
135+
return await self._trace_get(
136+
wrapped,
137+
cast(Optional["AIOKafkaConsumer"], instance),
138+
args,
139+
kwargs,
140+
client=client,
141+
)
142+
143+
else:
144+
return await self._trace_send(
145+
wrapped,
146+
cast(Optional["AIOKafkaProducer"], instance),
147+
args,
148+
kwargs,
149+
client=client,
150+
trace_parent=transaction.trace_parent,
151+
)
152+
153+
@classmethod
154+
async def _trace_get(
155+
cls,
156+
wrapped: Callable[..., Awaitable[T_Result]],
157+
instance: Optional["AIOKafkaConsumer"],
158+
args: tuple,
159+
kwargs: dict,
160+
*,
161+
client: Client,
162+
) -> T_Result:
163+
"""Trace the consumer's get() and getmany() by capturing a span"""
164+
165+
with capture_span(
166+
name="Kafka RECEIVE",
167+
leaf=True,
168+
span_type=cls.SPAN_TYPE,
169+
span_subtype=cls.SPAN_SUBTYPE,
170+
span_action="receive",
171+
) as span:
172+
173+
result = await wrapped(*args, **kwargs)
174+
175+
if not span or isinstance(span, DroppedSpan):
176+
return result
177+
178+
trace_topics = [
179+
topic for topic in _extract_topics_from_get_result(result) if not client.should_ignore_topic(topic)
180+
]
181+
182+
if not trace_topics:
183+
span.cancel()
184+
return result
185+
186+
span.name += f" from {', '.join(trace_topics)}"
187+
cls._enrich_span_context(span, *trace_topics, consumer=instance)
188+
189+
for message in _extract_messages_from_get_result(result, include_topics=trace_topics):
190+
trace_parent = _extract_trace_parent_from_message_headers(message.headers)
191+
if trace_parent:
192+
span.add_link(trace_parent)
193+
194+
return result
195+
196+
@classmethod
197+
async def _trace_send(
198+
cls,
199+
wrapped: Callable[..., Awaitable[T_Result]],
200+
instance: Optional["AIOKafkaProducer"],
201+
args: tuple,
202+
kwargs: dict,
203+
*,
204+
client: Client,
205+
trace_parent: TraceParent,
206+
) -> T_Result:
207+
"""Trace the producer's send() and send_batch() by capturing a span"""
208+
209+
topic = _extract_topic_from_send_arguments(args, kwargs)
210+
if client.should_ignore_topic(topic):
211+
return await wrapped(*args, **kwargs)
212+
213+
with capture_span(
214+
name=f"Kafka SEND to {topic}",
215+
leaf=True,
216+
span_type=cls.SPAN_TYPE,
217+
span_subtype=cls.SPAN_SUBTYPE,
218+
span_action="send",
219+
) as span:
220+
221+
if span and not isinstance(span, DroppedSpan):
222+
trace_parent = trace_parent.copy_from(span_id=span.id)
223+
224+
mutable_args = list(args)
225+
_inject_trace_parent_into_send_arguments(mutable_args, kwargs, trace_parent)
226+
227+
result = await wrapped(*mutable_args, **kwargs)
228+
229+
if span and not isinstance(span, DroppedSpan):
230+
cls._enrich_span_context(span, topic, producer=instance)
231+
232+
return result
233+
234+
@classmethod
235+
def _enrich_span_context(
236+
cls,
237+
span: Span,
238+
topic: str,
239+
*topics: str,
240+
producer: Optional["AIOKafkaProducer"] = None,
241+
consumer: Optional["AIOKafkaConsumer"] = None,
242+
):
243+
244+
destination_service = {"type": cls.SERVICE_TYPE, "name": cls.SERVICE_NAME}
245+
service_framework = {"name": "Kafka"}
246+
247+
span.context.setdefault("destination", {}).setdefault("service", {}).update(destination_service)
248+
span.context.setdefault("service", {}).setdefault("framework", {}).update(service_framework)
249+
250+
if not topics:
251+
span.context["destination"]["service"]["resource"] = f"{cls.SERVICE_NAME}/{topic}"
252+
span.context.setdefault("message", {}).setdefault("queue", {}).update({"name": topic})
253+
254+
if producer and producer.client.cluster.controller:
255+
span.context["destination"]["address"] = producer.client.cluster.controller.host
256+
span.context["destination"]["port"] = producer.client.cluster.controller.port
257+
258+
@classmethod
259+
def _enrich_transaction_context(
260+
cls,
261+
transaction: Transaction,
262+
topic: str,
263+
*,
264+
timestamp_type: int,
265+
timestamp: int,
266+
):
267+
268+
destination_service = {
269+
"type": cls.SERVICE_TYPE,
270+
"name": cls.SERVICE_NAME,
271+
"resource": f"{cls.SERVICE_NAME}/{topic}",
272+
}
273+
message_queue = {"name": topic}
274+
service_framework = {"name": "Kafka"}
275+
276+
transaction.context.setdefault("destination", {}).setdefault("service", {}).update(destination_service)
277+
transaction.context.setdefault("message", {}).setdefault("queue", {}).update(message_queue)
278+
transaction.context.setdefault("service", {}).setdefault("framework", {}).update(service_framework)
279+
280+
if timestamp_type == _KafkaTimestampType.CREATE_TIME:
281+
current_time_millis = int(round(time.time() * 1000))
282+
age = current_time_millis - timestamp
283+
transaction.context["message"].setdefault("age", {}).update({"ms": age})
284+
285+
286+
def _extract_trace_parent_from_message_headers(headers: Optional[Iterable]) -> Optional[TraceParent]:
287+
288+
for key, value in headers or []:
289+
if key == TRACEPARENT_BINARY_HEADER_NAME:
290+
return TraceParent.from_binary(value)
291+
292+
return None
293+
294+
295+
def _extract_topics_from_get_result(result) -> Iterable[str]:
296+
297+
if hasattr(result, "topic"):
298+
message = cast("ConsumerRecord", result) # from getone()
299+
yield message.topic
300+
301+
else:
302+
messages = cast(Dict["TopicPartition", List["ConsumerRecord"]], result) # from getmany()
303+
for topic_partition in messages:
304+
yield topic_partition.topic
305+
306+
307+
def _extract_messages_from_get_result(result, *, include_topics: Container[str] = ()) -> Iterable["ConsumerRecord"]:
308+
309+
if hasattr(result, "topic"):
310+
message = cast("ConsumerRecord", result) # from getone()
311+
if message.topic in include_topics:
312+
yield message
313+
314+
else:
315+
messages = cast(Dict["TopicPartition", List["ConsumerRecord"]], result) # from getmany()
316+
for topic_partition in messages:
317+
if topic_partition.topic not in include_topics:
318+
continue
319+
yield from messages[topic_partition]
320+
321+
322+
def _has_append_method(obj: object) -> bool:
323+
324+
return hasattr(obj, "append") and callable(getattr(obj, "append"))
325+
326+
327+
def _extract_topic_from_send_arguments(args: tuple, kwargs: dict) -> str:
328+
329+
if "topic" in kwargs:
330+
return kwargs["topic"]
331+
332+
elif _has_append_method(args[0]):
333+
# The first argument of the producer's send_batch() may be 'BatchBuilder'
334+
# which has 'append' method. If that's the case, the second one is 'topic'.
335+
return args[1]
336+
337+
return args[0]
338+
339+
340+
def _inject_trace_parent_into_send_arguments(args: list, kwargs: dict, trace_parent: TraceParent):
341+
342+
if "batch" in kwargs or args and _has_append_method(args[0]):
343+
return # Injection is not practical as messages are already encoded in the batch
344+
345+
if "headers" in kwargs:
346+
headers = kwargs["headers"]
347+
if headers is None:
348+
headers = kwargs["headers"] = []
349+
350+
else:
351+
# headers is the 6th parameter in send()
352+
headers_position_in_args = 5 # 6th parameter, 0-indexed
353+
for preceding_parameter in ["topic", "value", "key", "partition", "timestamp_ms"]:
354+
if preceding_parameter in kwargs:
355+
headers_position_in_args -= 1
356+
357+
try:
358+
headers = args[headers_position_in_args]
359+
except IndexError:
360+
headers = kwargs["headers"] = []
361+
else:
362+
if headers is None:
363+
headers = args[headers_position_in_args] = []
364+
365+
if not isinstance(headers, MutableSequence):
366+
# headers may also be a tuple, for example
367+
raise TypeError(f"'headers' is not a MutableSequence, got {type(headers).__name__}")
368+
369+
# Injecting trace parent after removing any existing one; Here, we retain
370+
# even a header with zero elements as we are not in a position to remove it.
371+
headers[:] = [header for header in headers if not header or header[0] != TRACEPARENT_BINARY_HEADER_NAME]
372+
headers.append((TRACEPARENT_BINARY_HEADER_NAME, trace_parent.to_binary()))

elasticapm/instrumentation/register.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
"elasticapm.instrumentation.packages.asyncio.redis_asyncio.RedisPipelineInstrumentation",
9797
"elasticapm.instrumentation.packages.asyncio.psycopg_async.AsyncPsycopgInstrumentation",
9898
"elasticapm.instrumentation.packages.grpc.GRPCAsyncServerInstrumentation",
99+
"elasticapm.instrumentation.packages.asyncio.aiokafka.AIOKafkaInstrumentation",
99100
]
100101
)
101102

setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ markers =
113113
kafka
114114
grpc
115115
azurestorage
116+
aiokafka
116117
addopts=--random-order
117118

118119
[isort]

0 commit comments

Comments
 (0)