Skip to content

Commit bcb81a9

Browse files
SNOW-2241408: finer grade telemetry for dbapi (#3745)
1 parent 0d4f089 commit bcb81a9

2 files changed

Lines changed: 21 additions & 1 deletion

File tree

src/snowflake/snowpark/_internal/data_source/utils.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ def _task_fetch_data_from_source_with_retry(
196196
parquet_queue: Union[mp.Queue, queue.Queue],
197197
stop_event: threading.Event = None,
198198
):
199+
start = time.perf_counter()
200+
logger.debug(f"Partition {partition_idx} fetch start")
199201
_retry_run(
200202
_task_fetch_data_from_source,
201203
worker,
@@ -204,6 +206,10 @@ def _task_fetch_data_from_source_with_retry(
204206
parquet_queue,
205207
stop_event,
206208
)
209+
end = time.perf_counter()
210+
logger.debug(
211+
f"Partition {partition_idx} fetch finished, used {end - start} seconds"
212+
)
207213

208214

209215
def _upload_and_copy_into_table(
@@ -253,6 +259,8 @@ def _upload_and_copy_into_table_with_retry(
253259
on_error: Optional[str] = "abort_statement",
254260
statements_params: Optional[Dict[str, str]] = None,
255261
):
262+
start = time.perf_counter()
263+
logger.debug(f"Parquet file {parquet_id} upload and copy into table start")
256264
try:
257265
_retry_run(
258266
_upload_and_copy_into_table,
@@ -269,6 +277,10 @@ def _upload_and_copy_into_table_with_retry(
269277
# proactively close the buffer to release memory
270278
parquet_buffer.close()
271279
backpressure_semaphore.release()
280+
end = time.perf_counter()
281+
logger.debug(
282+
f"Parquet file {parquet_id} upload and copy into table finished, used {end - start} seconds"
283+
)
272284

273285

274286
def _retry_run(func: Callable, *args, **kwargs) -> Any:

tests/integ/test_data_source_api.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ def test_partition_unsupported_type(session):
387387

388388

389389
@pytest.mark.parametrize("fetch_with_process", [True, False])
390-
def test_telemetry(session, fetch_with_process):
390+
def test_telemetry(session, fetch_with_process, caplog):
391391
with patch(
392392
"snowflake.snowpark._internal.telemetry.TelemetryClient.send_data_source_perf_telemetry"
393393
) as mock_telemetry:
@@ -406,6 +406,14 @@ def test_telemetry(session, fetch_with_process):
406406
assert "upload_and_copy_into_sf_table_duration" in telemetry_json
407407
assert "end_to_end_duration" in telemetry_json
408408

409+
assert "upload and copy into table start" in caplog.text
410+
if not fetch_with_process:
411+
assert "fetch start" in caplog.text
412+
413+
assert "upload and copy into table finished, used" in caplog.text
414+
if not fetch_with_process:
415+
assert "fetch finished, used" in caplog.text
416+
409417

410418
@pytest.mark.parametrize("fetch_with_process", [True, False])
411419
def test_telemetry_tracking(caplog, session, fetch_with_process):

0 commit comments

Comments
 (0)