Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/snowflake/snowpark/_internal/data_source/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ def _task_fetch_data_from_source_with_retry(
parquet_queue: Union[mp.Queue, queue.Queue],
stop_event: threading.Event = None,
):
start = time.perf_counter()
logger.debug(f"Partition {partition_idx} fetch start")
_retry_run(
_task_fetch_data_from_source,
worker,
Expand All @@ -204,6 +206,10 @@ def _task_fetch_data_from_source_with_retry(
parquet_queue,
stop_event,
)
end = time.perf_counter()
logger.debug(
f"Partition {partition_idx} fetch finished, used {end - start} seconds"
)


def _upload_and_copy_into_table(
Expand Down Expand Up @@ -253,6 +259,8 @@ def _upload_and_copy_into_table_with_retry(
on_error: Optional[str] = "abort_statement",
statements_params: Optional[Dict[str, str]] = None,
):
start = time.perf_counter()
logger.debug(f"Parquet file {parquet_id} upload and copy into table start")
try:
_retry_run(
_upload_and_copy_into_table,
Expand All @@ -269,6 +277,10 @@ def _upload_and_copy_into_table_with_retry(
# proactively close the buffer to release memory
parquet_buffer.close()
backpressure_semaphore.release()
end = time.perf_counter()
logger.debug(
f"Parquet file {parquet_id} upload and copy into table finished, used {end - start} seconds"
)


def _retry_run(func: Callable, *args, **kwargs) -> Any:
Expand Down
10 changes: 9 additions & 1 deletion tests/integ/test_data_source_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def test_partition_unsupported_type(session):


@pytest.mark.parametrize("fetch_with_process", [True, False])
def test_telemetry(session, fetch_with_process):
def test_telemetry(session, fetch_with_process, caplog):
with patch(
"snowflake.snowpark._internal.telemetry.TelemetryClient.send_data_source_perf_telemetry"
) as mock_telemetry:
Expand All @@ -406,6 +406,14 @@ def test_telemetry(session, fetch_with_process):
assert "upload_and_copy_into_sf_table_duration" in telemetry_json
assert "end_to_end_duration" in telemetry_json

assert "upload and copy into table start" in caplog.text
if not fetch_with_process:
assert "fetch start" in caplog.text

assert "upload and copy into table finished, used" in caplog.text
if not fetch_with_process:
assert "fetch finished, used" in caplog.text


@pytest.mark.parametrize("fetch_with_process", [True, False])
def test_telemetry_tracking(caplog, session, fetch_with_process):
Expand Down