Skip to content
Merged
Show file tree
Hide file tree
Changes from 78 commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
65a75f4
added functionality for export of failure logs
saishreeeee Jun 10, 2025
5305308
changed logger.error to logger.debug in exc.py
saishreeeee Jun 11, 2025
ba83c33
Fix telemetry loss during Python shutdown
saishreeeee Jun 11, 2025
131db92
unit tests for export_failure_log
saishreeeee Jun 12, 2025
3abc40d
try-catch blocks to make telemetry failures non-blocking for connecto…
saishreeeee Jun 12, 2025
ffa4787
removed redundant try/catch blocks, added try/catch block to initiali…
saishreeeee Jun 12, 2025
cc077f3
skip null fields in telemetry request
saishreeeee Jun 12, 2025
2c6fd44
removed dup import, renamed func, changed a filter_null_values to lamda
saishreeeee Jun 12, 2025
89540a1
removed unnecassary class variable and a redundant try/except block
saishreeeee Jun 12, 2025
52a1152
public functions defined at interface level
saishreeeee Jun 12, 2025
3dcdcfa
changed export_event and flush to private functions
saishreeeee Jun 13, 2025
b2714c9
formatting
saishreeeee Jun 13, 2025
377a87b
changed connection_uuid to thread local in thrift backend
saishreeeee Jun 13, 2025
c9376b8
made errors more specific
saishreeeee Jun 13, 2025
bbfadf2
revert change to connection_uuid
saishreeeee Jun 13, 2025
9bce26b
reverting change in close in telemetry client
saishreeeee Jun 13, 2025
ef4514d
JsonSerializableMixin
saishreeeee Jun 13, 2025
8924835
isdataclass check in JsonSerializableMixin
saishreeeee Jun 13, 2025
65361e7
convert TelemetryClientFactory to module-level functions, replace Noo…
saishreeeee Jun 16, 2025
1722a77
renamed connection_uuid as session_id_hex
saishreeeee Jun 16, 2025
e841434
added NotImplementedError to abstract class, added unit tests
saishreeeee Jun 16, 2025
2f89266
formatting
saishreeeee Jun 16, 2025
5564bbb
added PEP-249 link, changed NoopTelemetryClient implementation
saishreeeee Jun 17, 2025
1e4e8cf
removed unused import
saishreeeee Jun 17, 2025
55b29bc
made telemetry client close a module-level function
saishreeeee Jun 17, 2025
93bf170
unit tests verbose
saishreeeee Jun 17, 2025
45f5ccf
debug logs in unit tests
saishreeeee Jun 17, 2025
8ff1c1f
debug logs in unit tests
saishreeeee Jun 17, 2025
8bdd324
removed ABC from mixin, added try/catch block around executor shutdown
saishreeeee Jun 17, 2025
f99f7ea
checking stuff
saishreeeee Jun 17, 2025
b972c8a
finding out
saishreeeee Jun 17, 2025
7ca3636
finding out more
saishreeeee Jun 17, 2025
0ac8ed2
more more finding out more nice
saishreeeee Jun 17, 2025
c457a09
locks are useless anyways
saishreeeee Jun 17, 2025
5f07a84
haha
saishreeeee Jun 17, 2025
1115e25
normal
saishreeeee Jun 17, 2025
de1ed87
:= looks like walrus horizontally
saishreeeee Jun 17, 2025
554aeaf
one more
saishreeeee Jun 17, 2025
fffac5f
walrus again
saishreeeee Jun 17, 2025
b77208a
old stuff without walrus seems to fail
saishreeeee Jun 17, 2025
733c288
manually do the walrussing
saishreeeee Jun 17, 2025
ca8b958
change 3.13t, v2
saishreeeee Jun 17, 2025
3eabac9
formatting, added walrus
saishreeeee Jun 17, 2025
fb9ef43
formatting
saishreeeee Jun 17, 2025
1e795aa
removed walrus, removed test before stalling test
saishreeeee Jun 17, 2025
2c293a5
changed order of stalling test
saishreeeee Jun 18, 2025
d237255
removed debugging, added TelemetryClientFactory
saishreeeee Jun 18, 2025
f101b19
remove more debugging
saishreeeee Jun 18, 2025
a094659
latency logs funcitionality
saishreeeee Jun 19, 2025
695a07d
merge
saishreeeee Jun 19, 2025
fc918d6
fixed type of return value in get_session_id_hex() in thrift backend
saishreeeee Jun 19, 2025
d7c75d7
debug on TelemetryClientFactory lock
saishreeeee Jun 19, 2025
b6b0f89
formatting
saishreeeee Jun 19, 2025
50a1206
type notation for _waiters
saishreeeee Jun 19, 2025
39a0530
called connection.close() in test_arraysize_buffer_size_passthrough
saishreeeee Jun 19, 2025
413427f
run all unit tests
saishreeeee Jun 19, 2025
6b1d1b8
more debugging
saishreeeee Jun 19, 2025
8f5e5ba
removed the connection.close() from that test, put debug statement be…
saishreeeee Jun 19, 2025
2dc00ba
more debug
saishreeeee Jun 19, 2025
1ff03d4
more more more
saishreeeee Jun 19, 2025
6ff07c8
why
saishreeeee Jun 19, 2025
395049a
whywhy
saishreeeee Jun 19, 2025
4466821
thread name
saishreeeee Jun 19, 2025
34b63e4
added teardown to all tests except finalizer test (gc collect)
saishreeeee Jun 20, 2025
49082fb
added the get_attribute functions to the classes
saishreeeee Jun 20, 2025
ed1db9d
removed tearDown, added connection.close() to first test
saishreeeee Jun 20, 2025
9fa5a89
finally
saishreeeee Jun 21, 2025
14433c4
remove debugging
saishreeeee Jun 22, 2025
ef4ca13
added test for export_latency_log, made mock of thrift backend with r…
saishreeeee Jun 23, 2025
152e0da
Merge branch 'telemetry' into PECOBLR-554
saishreeeee Jun 23, 2025
b5bf165
added multi threaded tests
saishreeeee Jun 23, 2025
307a8cc
formatting
saishreeeee Jun 23, 2025
0fd46d4
added TelemetryExtractor, removed multithreaded tests
saishreeeee Jun 25, 2025
f6f50b2
formatting
saishreeeee Jun 25, 2025
1163ebe
fixes in test
saishreeeee Jun 25, 2025
4b6ace0
fix in telemetry extractor
saishreeeee Jun 25, 2025
7171718
Merge branch 'telemetry' into PECOBLR-554
saishreeeee Jun 25, 2025
a059a03
Merge branch 'telemetry' into PECOBLR-554
saishreeeee Jun 25, 2025
4d56141
added doc strings to latency_logger, abstracted export_telemetry_log
saishreeeee Jun 30, 2025
27295c2
statement type, unit test fix
saishreeeee Jun 30, 2025
b558bc8
unit test fix
saishreeeee Jun 30, 2025
01853bc
statement type changes
saishreeeee Jul 1, 2025
45f74d0
test_fetches fix
saishreeeee Jul 1, 2025
149d4a8
added mocks to resolve the errors caused by log_latency decorator in …
saishreeeee Jul 1, 2025
e031663
removed function in test_fetches cuz it is only used once
saishreeeee Jul 1, 2025
142b9a8
added _safe_call which returns None in case of errors in the get func…
saishreeeee Jul 2, 2025
2a26965
removed the changes in test_client and test_fetches
saishreeeee Jul 2, 2025
ae90dee
removed the changes in test_fetches
saishreeeee Jul 2, 2025
acc9904
test_telemetry
saishreeeee Jul 3, 2025
a847122
removed test
saishreeeee Jul 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/databricks/sql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
DriverConnectionParameters,
HostDetails,
)

from databricks.sql.telemetry.latency_logger import log_latency

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -745,6 +745,7 @@ def _handle_staging_operation(
session_id_hex=self.connection.get_session_id_hex(),
)

@log_latency()
def _handle_staging_put(
self, presigned_url: str, local_file: str, headers: Optional[dict] = None
):
Expand Down Expand Up @@ -784,6 +785,7 @@ def _handle_staging_put(
+ "but not yet applied on the server. It's possible this command may fail later."
)

@log_latency()
def _handle_staging_get(
self, local_file: str, presigned_url: str, headers: Optional[dict] = None
):
Expand Down Expand Up @@ -811,6 +813,7 @@ def _handle_staging_get(
with open(local_file, "wb") as fp:
fp.write(r.content)

@log_latency()
def _handle_staging_remove(
self, presigned_url: str, headers: Optional[dict] = None
):
Expand All @@ -824,6 +827,7 @@ def _handle_staging_remove(
session_id_hex=self.connection.get_session_id_hex(),
)

@log_latency()
def execute(
self,
operation: str,
Expand Down Expand Up @@ -914,6 +918,7 @@ def execute(

return self

@log_latency()
def execute_async(
self,
operation: str,
Expand Down Expand Up @@ -1039,6 +1044,7 @@ def executemany(self, operation, seq_of_parameters):
self.execute(operation, parameters)
return self

@log_latency()
def catalogs(self) -> "Cursor":
"""
Get all available catalogs.
Expand All @@ -1062,6 +1068,7 @@ def catalogs(self) -> "Cursor":
)
return self

@log_latency()
def schemas(
self, catalog_name: Optional[str] = None, schema_name: Optional[str] = None
) -> "Cursor":
Expand Down Expand Up @@ -1090,6 +1097,7 @@ def schemas(
)
return self

@log_latency()
def tables(
self,
catalog_name: Optional[str] = None,
Expand Down Expand Up @@ -1125,6 +1133,7 @@ def tables(
)
return self

@log_latency()
def columns(
self,
catalog_name: Optional[str] = None,
Expand Down Expand Up @@ -1379,6 +1388,7 @@ def _fill_results_buffer(self):
self.results = results
self.has_more_rows = has_more_rows

@log_latency()
def _convert_columnar_table(self, table):
column_names = [c[0] for c in self.description]
ResultRow = Row(*column_names)
Expand All @@ -1391,6 +1401,7 @@ def _convert_columnar_table(self, table):

return result

@log_latency()
def _convert_arrow_table(self, table):
column_names = [c[0] for c in self.description]
ResultRow = Row(*column_names)
Expand Down
152 changes: 152 additions & 0 deletions src/databricks/sql/telemetry/latency_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import time
import functools
from typing import Optional
from databricks.sql.telemetry.telemetry_client import TelemetryClientFactory
from databricks.sql.telemetry.models.event import (
SqlExecutionEvent,
)
from databricks.sql.telemetry.models.enums import ExecutionResultFormat, StatementType
from databricks.sql.utils import ColumnQueue, CloudFetchQueue, ArrowQueue
from uuid import UUID


class TelemetryExtractor:
Comment thread
saishreeeee marked this conversation as resolved.
def __init__(self, obj):
self._obj = obj

def __getattr__(self, name):
return getattr(self._obj, name)

def get_session_id_hex(self):
pass

def get_statement_id(self):
pass

def get_statement_type(self):
pass

def get_is_compressed(self):
pass

def get_execution_result(self):
pass

def get_retry_count(self):
pass


class CursorExtractor(TelemetryExtractor):
def get_statement_id(self) -> Optional[str]:
return self.query_id

def get_session_id_hex(self) -> Optional[str]:
return self.connection.get_session_id_hex()

def get_is_compressed(self) -> bool:
return self.connection.lz4_compression

def get_execution_result(self) -> ExecutionResultFormat:
if self.active_result_set is None:
return ExecutionResultFormat.FORMAT_UNSPECIFIED

Comment thread
saishreeeee marked this conversation as resolved.
if isinstance(self.active_result_set.results, ColumnQueue):
return ExecutionResultFormat.COLUMNAR_INLINE
elif isinstance(self.active_result_set.results, CloudFetchQueue):
return ExecutionResultFormat.EXTERNAL_LINKS
elif isinstance(self.active_result_set.results, ArrowQueue):
return ExecutionResultFormat.INLINE_ARROW
return ExecutionResultFormat.FORMAT_UNSPECIFIED

def get_retry_count(self) -> int:
if (
hasattr(self.thrift_backend, "retry_policy")
and self.thrift_backend.retry_policy
):
return len(self.thrift_backend.retry_policy.history)
return 0

def get_statement_type(self) -> StatementType:
# TODO: Implement this
Comment thread
saishreeeee marked this conversation as resolved.
Outdated
return StatementType.SQL


class ResultSetExtractor(TelemetryExtractor):
def get_statement_id(self) -> Optional[str]:
if self.command_id:
return str(UUID(bytes=self.command_id.operationId.guid))
return None

def get_session_id_hex(self) -> Optional[str]:
return self.connection.get_session_id_hex()

def get_is_compressed(self) -> bool:
return self.lz4_compressed

def get_execution_result(self) -> ExecutionResultFormat:
if isinstance(self.results, ColumnQueue):
return ExecutionResultFormat.COLUMNAR_INLINE
elif isinstance(self.results, CloudFetchQueue):
return ExecutionResultFormat.EXTERNAL_LINKS
elif isinstance(self.results, ArrowQueue):
return ExecutionResultFormat.INLINE_ARROW
return ExecutionResultFormat.FORMAT_UNSPECIFIED
Comment thread
saishreeeee marked this conversation as resolved.

def get_statement_type(self) -> StatementType:
# TODO: Implement this
Comment thread
saishreeeee marked this conversation as resolved.
Outdated
return StatementType.SQL

def get_retry_count(self) -> int:
if (
hasattr(self.thrift_backend, "retry_policy")
and self.thrift_backend.retry_policy
):
return len(self.thrift_backend.retry_policy.history)
return 0


def get_extractor(obj):
if obj.__class__.__name__ == "Cursor":
Comment thread
saishreeeee marked this conversation as resolved.
return CursorExtractor(obj)
elif obj.__class__.__name__ == "ResultSet":
return ResultSetExtractor(obj)
else:
return TelemetryExtractor(obj)
Comment thread
saishreeeee marked this conversation as resolved.
Outdated


def log_latency():
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
Comment thread
saishreeeee marked this conversation as resolved.
start_time = time.perf_counter()
result = None
try:
result = func(self, *args, **kwargs)
return result
finally:
end_time = time.perf_counter()
duration_ms = int((end_time - start_time) * 1000)

extractor = get_extractor(self)
session_id_hex = extractor.get_session_id_hex()
Comment thread
saishreeeee marked this conversation as resolved.
Outdated
statement_id = extractor.get_statement_id()

sql_exec_event = SqlExecutionEvent(
statement_type=extractor.get_statement_type(),
is_compressed=extractor.get_is_compressed(),
execution_result=extractor.get_execution_result(),
retry_count=extractor.get_retry_count(),
)

telemetry_client = TelemetryClientFactory.get_telemetry_client(
session_id_hex
)
telemetry_client.export_latency_log(
latency_ms=duration_ms,
sql_execution_event=sql_exec_event,
sql_statement_id=statement_id,
)

return wrapper

return decorator
40 changes: 38 additions & 2 deletions src/databricks/sql/telemetry/telemetry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import requests
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Optional
from typing import Dict, Optional, List
from databricks.sql.telemetry.models.event import (
TelemetryEvent,
DriverSystemConfiguration,
Expand Down Expand Up @@ -112,6 +112,10 @@ def export_initial_telemetry_log(self, driver_connection_params, user_agent):
def export_failure_log(self, error_name, error_message):
raise NotImplementedError("Subclasses must implement export_failure_log")

@abstractmethod
def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id):
raise NotImplementedError("Subclasses must implement export_latency_log")

@abstractmethod
def close(self):
raise NotImplementedError("Subclasses must implement close")
Expand All @@ -136,6 +140,9 @@ def export_initial_telemetry_log(self, driver_connection_params, user_agent):
def export_failure_log(self, error_name, error_message):
pass

def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id):
pass

def close(self):
pass

Expand Down Expand Up @@ -299,6 +306,32 @@ def export_failure_log(self, error_name, error_message):
except Exception as e:
logger.debug("Failed to export failure log: %s", e)

def export_latency_log(self, latency_ms, sql_execution_event, sql_statement_id):
logger.debug("Exporting latency log for connection %s", self._session_id_hex)
try:
telemetry_frontend_log = TelemetryFrontendLog(
frontend_log_event_id=str(uuid.uuid4()),
context=FrontendLogContext(
client_context=TelemetryClientContext(
timestamp_millis=int(time.time() * 1000),
user_agent=self._user_agent,
)
),
entry=FrontendLogEntry(
sql_driver_log=TelemetryEvent(
session_id=self._session_id_hex,
system_configuration=TelemetryHelper.get_driver_system_configuration(),
driver_connection_params=self._driver_connection_params,
sql_statement_id=sql_statement_id,
sql_operation=sql_execution_event,
operation_latency_ms=latency_ms,
)
),
)
Comment thread
saishreeeee marked this conversation as resolved.
Outdated
self._export_event(telemetry_frontend_log)
except Exception as e:
logger.debug("Failed to export latency log: %s", e)

def close(self):
"""Flush remaining events before closing"""
logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex)
Expand Down Expand Up @@ -431,6 +464,9 @@ def close(session_id_hex):
logger.debug(
"No more telemetry clients, shutting down thread pool executor"
)
TelemetryClientFactory._executor.shutdown(wait=True)
try:
TelemetryClientFactory._executor.shutdown(wait=True)
except Exception as e:
logger.debug("Failed to shutdown thread pool executor: %s", e)
TelemetryClientFactory._executor = None
TelemetryClientFactory._initialized = False
22 changes: 18 additions & 4 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def new(cls):
cls.apply_property_to_mock(ThriftBackendMock, staging_allowed_local_path=None)
MockTExecuteStatementResp = MagicMock(spec=TExecuteStatementResp())

# Mock retry_policy with history attribute
mock_retry_policy = Mock()
mock_retry_policy.history = []
cls.apply_property_to_mock(ThriftBackendMock, retry_policy=mock_retry_policy)

cls.apply_property_to_mock(
MockTExecuteStatementResp,
description=None,
Expand Down Expand Up @@ -69,6 +74,15 @@ def apply_property_to_mock(self, mock_obj, **kwargs):
prop = PropertyMock(**kwargs)
setattr(type(mock_obj), key, prop)

@classmethod
def mock_thrift_backend_with_retry_policy(cls): # Required for log_latency() decorator
"""Create a simple thrift_backend mock with retry_policy for basic tests."""
mock_thrift_backend = Mock()
mock_retry_policy = Mock()
mock_retry_policy.history = []
mock_thrift_backend.retry_policy = mock_retry_policy
return mock_thrift_backend


class ClientTestSuite(unittest.TestCase):
"""
Expand Down Expand Up @@ -318,7 +332,7 @@ def test_executing_multiple_commands_uses_the_most_recent_command(
mock_result_sets[1].fetchall.assert_called_once_with()

def test_closed_cursor_doesnt_allow_operations(self):
cursor = client.Cursor(Mock(), Mock())
cursor = client.Cursor(Mock(), ThriftBackendMockFactory.mock_thrift_backend_with_retry_policy())
cursor.close()

with self.assertRaises(Error) as e:
Expand Down Expand Up @@ -380,7 +394,7 @@ def test_get_schemas_parameters_passed_to_thrift_backend(self, mock_thrift_backe
for req_args in req_args_combinations:
req_args = {k: v for k, v in req_args.items() if v != "NOT_SET"}
with self.subTest(req_args=req_args):
mock_thrift_backend = Mock()
mock_thrift_backend = ThriftBackendMockFactory.mock_thrift_backend_with_retry_policy()

cursor = client.Cursor(Mock(), mock_thrift_backend)
cursor.schemas(**req_args)
Expand All @@ -403,7 +417,7 @@ def test_get_tables_parameters_passed_to_thrift_backend(self, mock_thrift_backen
for req_args in req_args_combinations:
req_args = {k: v for k, v in req_args.items() if v != "NOT_SET"}
with self.subTest(req_args=req_args):
mock_thrift_backend = Mock()
mock_thrift_backend = ThriftBackendMockFactory.mock_thrift_backend_with_retry_policy()

cursor = client.Cursor(Mock(), mock_thrift_backend)
cursor.tables(**req_args)
Expand All @@ -426,7 +440,7 @@ def test_get_columns_parameters_passed_to_thrift_backend(self, mock_thrift_backe
for req_args in req_args_combinations:
req_args = {k: v for k, v in req_args.items() if v != "NOT_SET"}
with self.subTest(req_args=req_args):
mock_thrift_backend = Mock()
mock_thrift_backend = ThriftBackendMockFactory.mock_thrift_backend_with_retry_policy()

cursor = client.Cursor(Mock(), mock_thrift_backend)
cursor.columns(**req_args)
Expand Down
Loading
Loading