Skip to content

Commit e548ee5

Browse files
sfc-gh-qdingCortex Code
andcommitted
SNOW-3392625: Add artifact_repository support to dbapi udtf_configs
Forward the artifact_repository parameter from dbapi() udtf_configs through to session.udtf.register(), enabling users to specify a custom artifact repository (e.g. PyPI) for packages used by the internal UDTF created during distributed dbapi ingestion. Changes: - dataframe_reader.py: extract artifact_repository from udtf_configs - datasource_partitioner.py: forward through _udtf_ingestion wrapper - base_driver.py: accept and forward to session.udtf.register() - CHANGELOG.md: add new feature entry - test_data_source_api.py: add test_dbapi_udtf_artifact_repository .... Generated with [Cortex Code](https://docs.snowflake.com/en/user-guide/cortex-code/cortex-code) Co-Authored-By: Cortex Code <noreply@snowflake.com>
1 parent 7c853e1 commit e548ee5

5 files changed

Lines changed: 53 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
### Snowpark Python API Updates
66

7+
#### New Features
8+
9+
- Added `artifact_repository` support to `udtf_configs` in `session.read.dbapi()`, enabling users to specify a custom artifact repository (e.g. PyPI) for packages used by the internal UDTF during distributed ingestion.
10+
711
#### Bug Fixes
812

913
- Fixed a bug where `TRY_CAST` reader option is ignored when calling `DataFrameReader.schema().csv()`.

src/snowflake/snowpark/_internal/data_source/datasource_partitioner.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ def _udtf_ingestion(
184184
fetch_size: int = 1000,
185185
imports: Optional[List[str]] = None,
186186
packages: Optional[List[str]] = None,
187+
artifact_repository: Optional[str] = None,
187188
session_init_statement: Optional[List[str]] = None,
188189
query_timeout: Optional[int] = 0,
189190
statement_params: Optional[Dict[str, str]] = None,
@@ -197,6 +198,7 @@ def _udtf_ingestion(
197198
fetch_size,
198199
imports,
199200
packages,
201+
artifact_repository,
200202
session_init_statement,
201203
query_timeout,
202204
statement_params,

src/snowflake/snowpark/_internal/data_source/drivers/base_driver.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ def udtf_ingestion(
161161
fetch_size: int = 1000,
162162
imports: Optional[List[str]] = None,
163163
packages: Optional[List[str]] = None,
164+
artifact_repository: Optional[str] = None,
164165
session_init_statement: Optional[List[str]] = None,
165166
query_timeout: Optional[int] = 0,
166167
statement_params: Optional[Dict[str, str]] = None,
@@ -187,6 +188,7 @@ def udtf_ingestion(
187188
external_access_integrations=[external_access_integrations],
188189
packages=packages or UDTF_PACKAGE_MAP.get(self.dbms_type),
189190
imports=imports,
191+
artifact_repository=artifact_repository,
190192
statement_params=statement_params,
191193
_emit_ast=_emit_ast, # internal function call, _emit_ast will be set to False by the caller
192194
)

src/snowflake/snowpark/dataframe_reader.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2305,6 +2305,7 @@ def create_oracledb_connection():
23052305
fetch_size=fetch_size,
23062306
imports=udtf_configs.get("imports", None),
23072307
packages=udtf_configs.get("packages", None),
2308+
artifact_repository=udtf_configs.get("artifact_repository", None),
23082309
session_init_statement=session_init_statement,
23092310
query_timeout=query_timeout,
23102311
statement_params=statements_params_for_telemetry,

tests/integ/datasource/test_oracledb.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,3 +416,47 @@ def create_connection_with_params(user=None, password=None, dsn=None, **kwargs):
416416
udtf_configs=udtf_configs,
417417
)
418418
Utils.check_answer(df, oracledb_real_data)
419+
420+
421+
@pytest.mark.udf
422+
def test_udtf_ingestion_oracledb_with_artifact_repository(session):
423+
"""Verify artifact_repository in udtf_configs is forwarded to the UDTF creation DDL."""
424+
from tests.parameters import ORACLEDB_CONNECTION_PARAMETERS
425+
426+
his = session.query_history()
427+
428+
def create_connection_oracledb():
429+
import oracledb
430+
431+
host = ORACLEDB_CONNECTION_PARAMETERS["host"]
432+
port = ORACLEDB_CONNECTION_PARAMETERS["port"]
433+
service_name = ORACLEDB_CONNECTION_PARAMETERS["service_name"]
434+
username = ORACLEDB_CONNECTION_PARAMETERS["username"]
435+
password = ORACLEDB_CONNECTION_PARAMETERS["password"]
436+
dsn = f"{host}:{port}/{service_name}"
437+
connection = oracledb.connect(user=username, password=password, dsn=dsn)
438+
return connection
439+
440+
df = session.read.dbapi(
441+
create_connection_oracledb,
442+
table="ALL_TYPE_TABLE",
443+
udtf_configs={
444+
"external_access_integration": ORACLEDB_TEST_EXTERNAL_ACCESS_INTEGRATION,
445+
"artifact_repository": "SNOWPARK_PYTHON_TEST_REPOSITORY",
446+
"packages": ["oracledb", "snowflake-snowpark-python", "cloudpickle"],
447+
},
448+
).order_by("ID")
449+
450+
Utils.check_answer(df, oracledb_real_data)
451+
assert df.schema == oracledb_real_schema
452+
453+
# check that the UDTF creation DDL includes the artifact repository
454+
found = False
455+
for q in his.queries:
456+
if (
457+
"CREATE" in q.sql_text.upper()
458+
and "FUNCTION" in q.sql_text.upper()
459+
and "SNOWPARK_PYTHON_TEST_REPOSITORY" in q.sql_text
460+
):
461+
found = True
462+
assert found, "artifact_repository not found in UDTF creation DDL"

0 commit comments

Comments
 (0)