Skip to content

Commit 7615991

Browse files
authored
Reuse legacy schema detail connection (DataDog#23544)
Keep the legacy schema detail connection open for the duration of each database schema collection instead of reconnecting once per table.
1 parent ab3a82b commit 7615991

3 files changed

Lines changed: 189 additions & 26 deletions

File tree

sqlserver/changelog.d/23544.fixed

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Reuse the auxiliary SQL Server schema collection connection for legacy table detail queries.

sqlserver/datadog_checks/sqlserver/schemas.py

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def __init__(self, check: SQLServer):
8686
config.max_tables = check._config.schema_config.get('max_tables', 300)
8787
self._is_2016_or_earlier = None
8888
self._database_compatibility_levels: dict[str, int] = {}
89+
self._pre_2017_cursor = None
8990
super().__init__(check, config)
9091

9192
@property
@@ -106,14 +107,26 @@ def _get_databases(self) -> list[DatabaseInfo]:
106107

107108
@contextlib.contextmanager
108109
def _get_cursor(self, database_name):
109-
with self._check.connection.open_managed_default_connection(KEY_PREFIX):
110-
with self._check.connection.get_managed_cursor(KEY_PREFIX) as cursor:
110+
with contextlib.ExitStack() as stack:
111+
try:
112+
stack.enter_context(self._check.connection.open_managed_default_connection(KEY_PREFIX))
113+
cursor = stack.enter_context(self._check.connection.get_managed_cursor(KEY_PREFIX))
111114
switch_db_statement = construct_use_statement(database_name)
112115
cursor.execute(switch_db_statement)
113116
self._is_2016_or_earlier = self._should_use_legacy_schema_query(database_name)
117+
118+
if self._is_2016_or_earlier:
119+
stack.enter_context(self._check.connection.open_managed_default_connection(KEY_PREFIX_PRE_2017))
120+
self._pre_2017_cursor = stack.enter_context(
121+
self._check.connection.get_managed_cursor(KEY_PREFIX_PRE_2017)
122+
)
123+
self._pre_2017_cursor.execute(switch_db_statement)
124+
114125
query = self._get_tables_query()
115126
cursor.execute(query)
116127
yield cursor
128+
finally:
129+
self._pre_2017_cursor = None
117130

118131
def _record_database_compatibility_levels(self, databases: list[DatabaseInfo]) -> None:
119132
self._database_compatibility_levels = {}
@@ -205,25 +218,25 @@ def _map_row(self, database: DatabaseInfo, cursor_row) -> DatabaseObject:
205218
object = super()._map_row(database, cursor_row)
206219
if self._is_2016_or_earlier:
207220
# We need to fetch the related data for each table
208-
# Use a key_prefix to get a separate connection to avoid conflicts with the main connection
209-
with self._check.connection.open_managed_default_connection(KEY_PREFIX_PRE_2017):
210-
with self._check.connection.get_managed_cursor(KEY_PREFIX_PRE_2017) as cursor:
211-
switch_db_statement = construct_use_statement(database.get("name"))
212-
cursor.execute(switch_db_statement)
213-
table_id = str(cursor_row.get("table_id"))
214-
columns_query = COLUMN_QUERY.replace("schema_tables.table_id", table_id)
215-
cursor.execute(columns_query)
216-
columns = cursor.fetchall_dict()
217-
indexes_query = INDEX_QUERY_PRE_2017.replace("schema_tables.table_id", table_id)
218-
cursor.execute(indexes_query)
219-
indexes = cursor.fetchall_dict()
220-
foreign_keys_query = FOREIGN_KEY_QUERY_PRE_2017.replace("schema_tables.table_id", table_id)
221-
cursor.execute(foreign_keys_query)
222-
foreign_keys = cursor.fetchall_dict()
223-
partitions_query = PARTITIONS_QUERY.replace("schema_tables.table_id", table_id)
224-
cursor.execute(partitions_query)
225-
partition_row = cursor.fetchone_dict()
226-
partition_count = partition_row.get("partition_count") if partition_row else None
221+
# Use a separate connection to avoid conflicts with the main cursor while it streams table rows.
222+
cursor = self._pre_2017_cursor
223+
if cursor is None:
224+
raise RuntimeError("pre-2017 schema cursor is not initialized")
225+
226+
table_id = str(cursor_row.get("table_id"))
227+
columns_query = COLUMN_QUERY.replace("schema_tables.table_id", table_id)
228+
cursor.execute(columns_query)
229+
columns = cursor.fetchall_dict()
230+
indexes_query = INDEX_QUERY_PRE_2017.replace("schema_tables.table_id", table_id)
231+
cursor.execute(indexes_query)
232+
indexes = cursor.fetchall_dict()
233+
foreign_keys_query = FOREIGN_KEY_QUERY_PRE_2017.replace("schema_tables.table_id", table_id)
234+
cursor.execute(foreign_keys_query)
235+
foreign_keys = cursor.fetchall_dict()
236+
partitions_query = PARTITIONS_QUERY.replace("schema_tables.table_id", table_id)
237+
cursor.execute(partitions_query)
238+
partition_row = cursor.fetchone_dict()
239+
partition_count = partition_row.get("partition_count") if partition_row else None
227240
else:
228241
columns = json.loads(cursor_row.get("columns") or "[]")
229242
indexes = json.loads(cursor_row.get("indexes") or "[]")

sqlserver/tests/test_unit.py

Lines changed: 154 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
STATIC_INFO_VERSION,
2828
)
2929
from datadog_checks.sqlserver.metrics import SqlFractionMetric
30-
from datadog_checks.sqlserver.schemas import SQLServerSchemaCollector
30+
from datadog_checks.sqlserver.schemas import KEY_PREFIX, KEY_PREFIX_PRE_2017, SQLServerSchemaCollector
3131
from datadog_checks.sqlserver.sqlserver import SQLConnectionError
3232
from datadog_checks.sqlserver.utils import (
3333
Database,
@@ -219,16 +219,165 @@ def test_schema_collector_uses_database_compatibility_level_for_schema_query(
219219
}
220220
)
221221
collector._database_compatibility_levels = {"datadog_test": compatibility_level}
222-
cursor = mock.Mock()
223-
collector._check.connection.open_managed_default_connection.return_value = contextlib.nullcontext()
224-
collector._check.connection.get_managed_cursor.return_value = contextlib.nullcontext(cursor)
222+
main_cursor = mock.Mock()
223+
detail_cursor = mock.Mock()
224+
collector._check.connection.open_managed_default_connection.side_effect = [
225+
contextlib.nullcontext(),
226+
contextlib.nullcontext(),
227+
]
228+
collector._check.connection.get_managed_cursor.side_effect = [
229+
contextlib.nullcontext(main_cursor),
230+
contextlib.nullcontext(detail_cursor),
231+
]
225232

226233
with collector._get_cursor("datadog_test"):
227234
pass
228235

229-
tables_query = cursor.execute.call_args_list[1][0][0]
236+
tables_query = main_cursor.execute.call_args_list[1][0][0]
230237
assert collector._is_2016_or_earlier is expected_legacy
231238
assert ("STRING_AGG" in tables_query) is not expected_legacy
239+
assert (detail_cursor.execute.call_count == 1) is expected_legacy
240+
241+
242+
def test_schema_collector_reuses_pre_2017_connection_for_table_detail_queries() -> None:
243+
collector = create_schema_collector(
244+
{
245+
STATIC_INFO_ENGINE_EDITION: ENGINE_EDITION_STANDARD,
246+
STATIC_INFO_MAJOR_VERSION: 13,
247+
}
248+
)
249+
main_cursor = mock.Mock()
250+
detail_cursor = mock.Mock()
251+
detail_cursor.fetchall_dict.side_effect = [
252+
[{"name": "id"}],
253+
[{"name": "pk_table_1"}],
254+
[{"foreign_key_name": "fk_table_1"}],
255+
[{"name": "id"}],
256+
[{"name": "pk_table_2"}],
257+
[{"foreign_key_name": "fk_table_2"}],
258+
]
259+
detail_cursor.fetchone_dict.side_effect = [{"partition_count": 1}, {"partition_count": 2}]
260+
collector._check.connection.open_managed_default_connection.side_effect = [
261+
contextlib.nullcontext(),
262+
contextlib.nullcontext(),
263+
]
264+
collector._check.connection.get_managed_cursor.side_effect = [
265+
contextlib.nullcontext(main_cursor),
266+
contextlib.nullcontext(detail_cursor),
267+
]
268+
database = {"name": "datadog_test", "id": "1", "collation": "SQL_Latin1_General_CP1_CI_AS", "owner": "dbo"}
269+
rows = [
270+
{"schema_name": "test_schema", "schema_id": 1, "owner_name": "dbo", "table_name": "table_1", "table_id": 101},
271+
{"schema_name": "test_schema", "schema_id": 1, "owner_name": "dbo", "table_name": "table_2", "table_id": 102},
272+
]
273+
274+
with collector._get_cursor("datadog_test"):
275+
mapped_rows = [collector._map_row(database, row) for row in rows]
276+
277+
assert collector._check.connection.open_managed_default_connection.call_args_list == [
278+
mock.call(KEY_PREFIX),
279+
mock.call(KEY_PREFIX_PRE_2017),
280+
]
281+
assert collector._check.connection.get_managed_cursor.call_args_list == [
282+
mock.call(KEY_PREFIX),
283+
mock.call(KEY_PREFIX_PRE_2017),
284+
]
285+
assert detail_cursor.execute.call_args_list[0] == mock.call("USE [datadog_test];")
286+
assert detail_cursor.execute.call_args_list.count(mock.call("USE [datadog_test];")) == 1
287+
assert detail_cursor.execute.call_count == 9
288+
assert collector._pre_2017_cursor is None
289+
assert [row["schemas"][0]["tables"][0]["partitions"]["partition_count"] for row in mapped_rows] == [1, 2]
290+
291+
292+
def configure_pre_2017_detail_cursor(cursor: mock.Mock, partition_count: int = 1) -> None:
293+
cursor.fetchall_dict.side_effect = [
294+
[{"name": "id"}],
295+
[{"name": "pk_table"}],
296+
[{"foreign_key_name": "fk_table"}],
297+
]
298+
cursor.fetchone_dict.return_value = {"partition_count": partition_count}
299+
300+
301+
def test_schema_collector_uses_new_pre_2017_connection_for_each_database() -> None:
302+
collector = create_schema_collector(
303+
{
304+
STATIC_INFO_ENGINE_EDITION: ENGINE_EDITION_STANDARD,
305+
STATIC_INFO_MAJOR_VERSION: 13,
306+
}
307+
)
308+
main_cursor_1 = mock.Mock()
309+
detail_cursor_1 = mock.Mock()
310+
configure_pre_2017_detail_cursor(detail_cursor_1, partition_count=1)
311+
main_cursor_2 = mock.Mock()
312+
detail_cursor_2 = mock.Mock()
313+
configure_pre_2017_detail_cursor(detail_cursor_2, partition_count=2)
314+
collector._check.connection.open_managed_default_connection.side_effect = [
315+
contextlib.nullcontext(),
316+
contextlib.nullcontext(),
317+
contextlib.nullcontext(),
318+
contextlib.nullcontext(),
319+
]
320+
collector._check.connection.get_managed_cursor.side_effect = [
321+
contextlib.nullcontext(main_cursor_1),
322+
contextlib.nullcontext(detail_cursor_1),
323+
contextlib.nullcontext(main_cursor_2),
324+
contextlib.nullcontext(detail_cursor_2),
325+
]
326+
row = {"schema_name": "test_schema", "schema_id": 1, "owner_name": "dbo", "table_name": "table", "table_id": 101}
327+
328+
with collector._get_cursor("datadog_test_1"):
329+
collector._map_row(
330+
{"name": "datadog_test_1", "id": "1", "collation": "SQL_Latin1_General_CP1_CI_AS", "owner": "dbo"},
331+
row,
332+
)
333+
with collector._get_cursor("datadog_test_2"):
334+
collector._map_row(
335+
{"name": "datadog_test_2", "id": "2", "collation": "SQL_Latin1_General_CP1_CI_AS", "owner": "dbo"},
336+
row,
337+
)
338+
339+
assert collector._check.connection.get_managed_cursor.call_args_list == [
340+
mock.call(KEY_PREFIX),
341+
mock.call(KEY_PREFIX_PRE_2017),
342+
mock.call(KEY_PREFIX),
343+
mock.call(KEY_PREFIX_PRE_2017),
344+
]
345+
assert detail_cursor_1.execute.call_args_list[0] == mock.call("USE [datadog_test_1];")
346+
assert detail_cursor_2.execute.call_args_list[0] == mock.call("USE [datadog_test_2];")
347+
assert collector._pre_2017_cursor is None
348+
349+
350+
def test_schema_collector_does_not_open_pre_2017_connection_for_modern_query() -> None:
351+
collector = create_schema_collector(
352+
{
353+
STATIC_INFO_ENGINE_EDITION: ENGINE_EDITION_STANDARD,
354+
STATIC_INFO_MAJOR_VERSION: 14,
355+
}
356+
)
357+
collector._database_compatibility_levels = {"datadog_test": 130}
358+
cursor = mock.Mock()
359+
collector._check.connection.open_managed_default_connection.return_value = contextlib.nullcontext()
360+
collector._check.connection.get_managed_cursor.return_value = contextlib.nullcontext(cursor)
361+
database = {"name": "datadog_test", "id": "1", "collation": "SQL_Latin1_General_CP1_CI_AS", "owner": "dbo"}
362+
row = {
363+
"schema_name": "test_schema",
364+
"schema_id": 1,
365+
"owner_name": "dbo",
366+
"table_name": "table",
367+
"table_id": 101,
368+
"columns": '[{"name": "id"}]',
369+
"indexes": '[{"name": "pk_table"}]',
370+
"foreign_keys": '[{"foreign_key_name": "fk_table"}]',
371+
"partition_count": 1,
372+
}
373+
374+
with collector._get_cursor("datadog_test"):
375+
mapped_row = collector._map_row(database, row)
376+
377+
assert collector._check.connection.open_managed_default_connection.call_args_list == [mock.call(KEY_PREFIX)]
378+
assert collector._check.connection.get_managed_cursor.call_args_list == [mock.call(KEY_PREFIX)]
379+
assert collector._pre_2017_cursor is None
380+
assert mapped_row["schemas"][0]["tables"][0]["columns"] == [{"name": "id"}]
232381

233382

234383
def test_get_cursor(instance_docker):

0 commit comments

Comments
 (0)