diff --git a/src/snowflake/snowpark/_internal/data_source/utils.py b/src/snowflake/snowpark/_internal/data_source/utils.py index f3fbebbd71..626529abfa 100644 --- a/src/snowflake/snowpark/_internal/data_source/utils.py +++ b/src/snowflake/snowpark/_internal/data_source/utils.py @@ -196,6 +196,8 @@ def _task_fetch_data_from_source_with_retry( parquet_queue: Union[mp.Queue, queue.Queue], stop_event: threading.Event = None, ): + start = time.perf_counter() + logger.debug(f"Partition {partition_idx} fetch start") _retry_run( _task_fetch_data_from_source, worker, @@ -204,6 +206,10 @@ def _task_fetch_data_from_source_with_retry( parquet_queue, stop_event, ) + end = time.perf_counter() + logger.debug( + f"Partition {partition_idx} fetch finished, used {end - start} seconds" + ) def _upload_and_copy_into_table( @@ -253,6 +259,8 @@ def _upload_and_copy_into_table_with_retry( on_error: Optional[str] = "abort_statement", statements_params: Optional[Dict[str, str]] = None, ): + start = time.perf_counter() + logger.debug(f"Parquet file {parquet_id} upload and copy into table start") try: _retry_run( _upload_and_copy_into_table, @@ -269,6 +277,10 @@ def _upload_and_copy_into_table_with_retry( # proactively close the buffer to release memory parquet_buffer.close() backpressure_semaphore.release() + end = time.perf_counter() + logger.debug( + f"Parquet file {parquet_id} upload and copy into table finished, used {end - start} seconds" + ) def _retry_run(func: Callable, *args, **kwargs) -> Any: diff --git a/tests/integ/test_data_source_api.py b/tests/integ/test_data_source_api.py index 69403dabed..ad736ab2aa 100644 --- a/tests/integ/test_data_source_api.py +++ b/tests/integ/test_data_source_api.py @@ -387,7 +387,7 @@ def test_partition_unsupported_type(session): @pytest.mark.parametrize("fetch_with_process", [True, False]) -def test_telemetry(session, fetch_with_process): +def test_telemetry(session, fetch_with_process, caplog): with patch( "snowflake.snowpark._internal.telemetry.TelemetryClient.send_data_source_perf_telemetry" ) as mock_telemetry: @@ -406,6 +406,14 @@ def test_telemetry(session, fetch_with_process): assert "upload_and_copy_into_sf_table_duration" in telemetry_json assert "end_to_end_duration" in telemetry_json + assert "upload and copy into table start" in caplog.text + if not fetch_with_process: + assert "fetch start" in caplog.text + + assert "upload and copy into table finished, used" in caplog.text + if not fetch_with_process: + assert "fetch finished, used" in caplog.text + @pytest.mark.parametrize("fetch_with_process", [True, False]) def test_telemetry_tracking(caplog, session, fetch_with_process):