Skip to content

Commit b100e8a

Browse files
beniwohlibasepi
andcommitted
align elasticsearch body capturing with other agents (elastic#1013)
* align elasticsearch body capturing with other agents See elastic/apm#55 This also allows us to get rid of a hack we used to pass the API method and an unserialized version of the body to the instrumentation. Neither is used anymore, so we can skip that part of the instrumentation. This does also mean that we stop capturing the body for some methods, specifically `update` and `delete_by_query`. closes elastic#419 * fix test code using params=None never made sense here * update path matching regex to align with Node.js agent * Add CHANGELOG Co-authored-by: Colton Myers <colton.myers@gmail.com>
1 parent d8b413d commit b100e8a

6 files changed

Lines changed: 11 additions & 155 deletions

File tree

CHANGELOG.asciidoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ endif::[]
3737
[float]
3838
===== Breaking changes
3939
40-
* Python 2.7 and 3.5 support has been deprecated. The Python agent now requires Python 3.6+
40+
* Python 2.7 and 3.5 support has been deprecated. The Python agent now requires Python 3.6+ {pull}1021[#1021]
41+
* No longer collecting body for `elasticsearch-py` `update` and `delete_by_query` {pull}1013[#1013]
4142
4243
4344
[float]

elasticapm/instrumentation/packages/asyncio/elasticsearch.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
import elasticapm
3232
from elasticapm.instrumentation.packages.asyncio.base import AsyncAbstractInstrumentedModule
33-
from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin, ElasticsearchInstrumentation
33+
from elasticapm.instrumentation.packages.elasticsearch import ElasticSearchConnectionMixin
3434

3535

3636
class ElasticSearchAsyncConnection(ElasticSearchConnectionMixin, AsyncAbstractInstrumentedModule):
@@ -55,18 +55,3 @@ async def call(self, module, method, wrapped, instance, args, kwargs):
5555
leaf=True,
5656
):
5757
return await wrapped(*args, **kwargs)
58-
59-
60-
class AsyncElasticsearchInstrumentation(ElasticsearchInstrumentation, AsyncAbstractInstrumentedModule):
61-
name = "elasticsearch"
62-
63-
instrument_list = [
64-
("elasticsearch._async.client", "AsyncElasticsearch.delete_by_query"),
65-
("elasticsearch._async.client", "AsyncElasticsearch.search"),
66-
("elasticsearch._async.client", "AsyncElasticsearch.count"),
67-
("elasticsearch._async.client", "AsyncElasticsearch.update"),
68-
]
69-
70-
async def call(self, module, method, wrapped, instance, args, kwargs):
71-
kwargs = self.inject_apm_params(method, kwargs)
72-
return await wrapped(*args, **kwargs)

elasticapm/instrumentation/packages/elasticsearch.py

Lines changed: 6 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,15 @@
3030

3131
from __future__ import absolute_import
3232

33-
import json
33+
import re
3434

3535
import elasticapm
3636
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
37-
from elasticapm.utils import compat
3837
from elasticapm.utils.logging import get_logger
3938

4039
logger = get_logger("elasticapm.instrument")
4140

42-
43-
API_METHOD_KEY_NAME = "__elastic_apm_api_method_name"
44-
BODY_REF_NAME = "__elastic_apm_body_ref"
41+
should_capture_body_re = re.compile("/(_search|_msearch|_count|_async_search|_sql|_eql)(/|$)")
4542

4643

4744
class ElasticSearchConnectionMixin(object):
@@ -56,13 +53,14 @@ def get_signature(self, args, kwargs):
5653

5754
def get_context(self, instance, args, kwargs):
5855
args_len = len(args)
56+
url = args[1] if args_len > 1 else kwargs.get("url")
5957
params = args[2] if args_len > 2 else kwargs.get("params")
60-
body = params.pop(BODY_REF_NAME, None) if params else None
6158
body_serialized = args[3] if args_len > 3 else kwargs.get("body")
6259

63-
api_method = params.pop(API_METHOD_KEY_NAME, None) if params else None
60+
should_capture_body = bool(should_capture_body_re.search(url))
61+
6462
context = {"db": {"type": "elasticsearch"}}
65-
if api_method in self.query_methods:
63+
if should_capture_body:
6664
query = []
6765
# using both q AND body is allowed in some API endpoints / ES versions,
6866
# but not in others. We simply capture both if they are there so the
@@ -76,17 +74,8 @@ def get_context(self, instance, args, kwargs):
7674
query.append(body_serialized.decode("utf-8", errors="replace"))
7775
else:
7876
query.append(body_serialized)
79-
elif body and isinstance(body, dict):
80-
try:
81-
query.append(json.dumps(body, default=compat.text_type))
82-
except TypeError:
83-
pass
8477
if query:
8578
context["db"]["statement"] = "\n\n".join(query)
86-
elif api_method == "update":
87-
if isinstance(body, dict) and "script" in body:
88-
# only get the `script` field from the body
89-
context["db"]["statement"] = json.dumps({"script": body["script"]})
9079
context["destination"] = {
9180
"address": instance.host,
9281
"service": {"name": "elasticsearch", "resource": "elasticsearch", "type": "db"},
@@ -116,49 +105,3 @@ def call(self, module, method, wrapped, instance, args, kwargs):
116105
leaf=True,
117106
):
118107
return wrapped(*args, **kwargs)
119-
120-
121-
class ElasticsearchInstrumentation(AbstractInstrumentedModule):
122-
name = "elasticsearch"
123-
124-
instrument_list = [
125-
("elasticsearch.client", "Elasticsearch.delete_by_query"),
126-
("elasticsearch.client", "Elasticsearch.search"),
127-
("elasticsearch.client", "Elasticsearch.count"),
128-
("elasticsearch.client", "Elasticsearch.update"),
129-
]
130-
131-
def __init__(self):
132-
super(ElasticsearchInstrumentation, self).__init__()
133-
try:
134-
from elasticsearch import VERSION
135-
136-
self.version = VERSION[0]
137-
except ImportError:
138-
self.version = None
139-
140-
def instrument(self):
141-
if self.version and not 2 <= self.version < 8:
142-
logger.debug("Instrumenting version %s of Elasticsearch is not supported by Elastic APM", self.version)
143-
return
144-
super(ElasticsearchInstrumentation, self).instrument()
145-
146-
def call(self, module, method, wrapped, instance, args, kwargs):
147-
kwargs = self.inject_apm_params(method, kwargs)
148-
return wrapped(*args, **kwargs)
149-
150-
def inject_apm_params(self, method, kwargs):
151-
params = kwargs.pop("params", {})
152-
153-
# make a copy of params in case the caller reuses them for some reason
154-
params = params.copy() if params is not None else {}
155-
156-
method_name = method.partition(".")[-1]
157-
158-
# store a reference to the non-serialized body so we can use it in the connection layer
159-
body = kwargs.get("body")
160-
params[BODY_REF_NAME] = body
161-
params[API_METHOD_KEY_NAME] = method_name
162-
163-
kwargs["params"] = params
164-
return kwargs

elasticapm/instrumentation/register.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
"elasticapm.instrumentation.packages.sqlite.SQLiteInstrumentation",
5555
"elasticapm.instrumentation.packages.urllib3.Urllib3Instrumentation",
5656
"elasticapm.instrumentation.packages.elasticsearch.ElasticsearchConnectionInstrumentation",
57-
"elasticapm.instrumentation.packages.elasticsearch.ElasticsearchInstrumentation",
5857
"elasticapm.instrumentation.packages.cassandra.CassandraInstrumentation",
5958
"elasticapm.instrumentation.packages.pymssql.PyMSSQLInstrumentation",
6059
"elasticapm.instrumentation.packages.pyodbc.PyODBCInstrumentation",
@@ -73,7 +72,6 @@
7372
"elasticapm.instrumentation.packages.asyncio.aiohttp_client.AioHttpClientInstrumentation",
7473
"elasticapm.instrumentation.packages.asyncio.httpx.HttpxAsyncClientInstrumentation",
7574
"elasticapm.instrumentation.packages.asyncio.elasticsearch.ElasticSearchAsyncConnection",
76-
"elasticapm.instrumentation.packages.asyncio.elasticsearch.AsyncElasticsearchInstrumentation",
7775
"elasticapm.instrumentation.packages.asyncio.aiopg.AioPGInstrumentation",
7876
"elasticapm.instrumentation.packages.asyncio.asyncpg.AsyncPGInstrumentation",
7977
"elasticapm.instrumentation.packages.tornado.TornadoRequestExecuteInstrumentation",

tests/instrumentation/asyncio_tests/async_elasticsearch_client_tests.py

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -175,24 +175,3 @@ async def test_count_body(instrument, elasticapm_client, async_elasticsearch):
175175
assert span["context"]["db"]["type"] == "elasticsearch"
176176
assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query": {"term": {"user": "kimchy"}}}')
177177
assert span["sync"] is False
178-
179-
180-
async def test_delete_by_query_body(instrument, elasticapm_client, async_elasticsearch):
181-
await async_elasticsearch.create(
182-
index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True
183-
)
184-
elasticapm_client.begin_transaction("test")
185-
result = await async_elasticsearch.delete_by_query(index="tweets", body={"query": {"term": {"user": "kimchy"}}})
186-
elasticapm_client.end_transaction("test", "OK")
187-
188-
transaction = elasticapm_client.events[TRANSACTION][0]
189-
spans = elasticapm_client.spans_for_transaction(transaction)
190-
191-
span = spans[0]
192-
assert span["name"] == "ES POST /tweets/_delete_by_query"
193-
assert span["type"] == "db"
194-
assert span["subtype"] == "elasticsearch"
195-
assert span["action"] == "query"
196-
assert span["context"]["db"]["type"] == "elasticsearch"
197-
assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query":{"term":{"user":"kimchy"}}}')
198-
assert span["sync"] is False

tests/instrumentation/elasticsearch_tests.py

Lines changed: 2 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -295,34 +295,6 @@ def test_get_source(instrument, elasticapm_client, elasticsearch):
295295
assert "statement" not in span["context"]["db"]
296296

297297

298-
@pytest.mark.skipif(ES_VERSION[0] < 5, reason="unsupported method")
299-
@pytest.mark.integrationtest
300-
def test_update_script(instrument, elasticapm_client, elasticsearch):
301-
elasticsearch.create(
302-
index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True
303-
)
304-
elasticapm_client.begin_transaction("test")
305-
r1 = elasticsearch.update(
306-
index="tweets", id=1, doc_type=document_type, body={"script": "ctx._source.text = 'adios'"}, refresh=True
307-
)
308-
elasticapm_client.end_transaction("test", "OK")
309-
310-
transaction = elasticapm_client.events[TRANSACTION][0]
311-
r2 = elasticsearch.get(index="tweets", doc_type=document_type, id=1)
312-
assert r1["result"] == "updated"
313-
assert r2["_source"] == {"user": "kimchy", "text": "adios"}
314-
spans = elasticapm_client.spans_for_transaction(transaction)
315-
assert len(spans) == 1
316-
317-
span = spans[0]
318-
assert span["name"] == "ES POST /tweets/%s/1/_update" % document_type
319-
assert span["type"] == "db"
320-
assert span["subtype"] == "elasticsearch"
321-
assert span["action"] == "query"
322-
assert span["context"]["db"]["type"] == "elasticsearch"
323-
assert span["context"]["db"]["statement"] == '{"script": "ctx._source.text = \'adios\'"}'
324-
325-
326298
@pytest.mark.integrationtest
327299
def test_update_document(instrument, elasticapm_client, elasticsearch):
328300
elasticsearch.create(
@@ -356,7 +328,7 @@ def test_search_body(instrument, elasticapm_client, elasticsearch):
356328
)
357329
elasticapm_client.begin_transaction("test")
358330
search_query = {"query": {"term": {"user": "kimchy"}}, "sort": ["userid"]}
359-
result = elasticsearch.search(body=search_query, params=None)
331+
result = elasticsearch.search(body=search_query)
360332
elasticapm_client.end_transaction("test", "OK")
361333

362334
transaction = elasticapm_client.events[TRANSACTION][0]
@@ -500,28 +472,6 @@ def test_delete(instrument, elasticapm_client, elasticsearch):
500472
assert span["context"]["db"]["type"] == "elasticsearch"
501473

502474

503-
@pytest.mark.skipif(ES_VERSION[0] < 5, reason="unsupported method")
504-
@pytest.mark.integrationtest
505-
def test_delete_by_query_body(instrument, elasticapm_client, elasticsearch):
506-
elasticsearch.create(
507-
index="tweets", doc_type=document_type, id=1, body={"user": "kimchy", "text": "hola"}, refresh=True
508-
)
509-
elasticapm_client.begin_transaction("test")
510-
result = elasticsearch.delete_by_query(index="tweets", body={"query": {"term": {"user": "kimchy"}}})
511-
elasticapm_client.end_transaction("test", "OK")
512-
513-
transaction = elasticapm_client.events[TRANSACTION][0]
514-
spans = elasticapm_client.spans_for_transaction(transaction)
515-
516-
span = spans[0]
517-
assert span["name"] == "ES POST /tweets/_delete_by_query"
518-
assert span["type"] == "db"
519-
assert span["subtype"] == "elasticsearch"
520-
assert span["action"] == "query"
521-
assert span["context"]["db"]["type"] == "elasticsearch"
522-
assert json.loads(span["context"]["db"]["statement"]) == json.loads('{"query":{"term":{"user":"kimchy"}}}')
523-
524-
525475
@pytest.mark.integrationtest
526476
def test_multiple_indexes(instrument, elasticapm_client, elasticsearch):
527477
elasticsearch.create(index="tweets", doc_type="users", id=1, body={"user": "kimchy", "text": "hola"}, refresh=True)
@@ -571,7 +521,7 @@ def test_custom_serializer(instrument, elasticapm_client, elasticsearch):
571521
elasticsearch.index(index="test-index", body={"2": 1})
572522
elasticapm_client.begin_transaction("test")
573523
search_query = {"query": {"term": {NumberObj(2): {"value": 1}}}}
574-
result = elasticsearch.search(index="test-index", body=search_query, params=None)
524+
result = elasticsearch.search(index="test-index", body=search_query)
575525
elasticapm_client.end_transaction("test", "OK")
576526

577527
transaction = elasticapm_client.events[TRANSACTION][0]

0 commit comments

Comments
 (0)