99from dbt .adapters .databricks .api_client import CommandExecution , DatabricksApiClient , WorkflowJobApi
1010from dbt .adapters .databricks .credentials import DatabricksCredentials
1111from dbt .adapters .databricks .logging import logger
12- from dbt .adapters .databricks .python_models .python_config import ParsedPythonModel
12+ from dbt .adapters .databricks .python_models .python_config import (
13+ ParsedPythonModel ,
14+ PythonPackagesConfig ,
15+ )
1316from dbt .adapters .databricks .python_models .run_tracking import PythonRunTracker
1417
1518DEFAULT_TIMEOUT = 60 * 60 * 24
19+ NOTEBOOK_SEPARATOR = "\n \n # COMMAND ----------\n \n "
1620
1721
1822class PythonSubmitter (ABC ):
1923 """Interface for submitting Python models to run on Databricks."""
2024
25+ def __init__ (self , packages_config : PythonPackagesConfig ) -> None :
26+ self .packages_config = packages_config
27+
2128 @abstractmethod
2229 def submit (self , compiled_code : str ) -> None :
2330 """Submit the compiled code to Databricks."""
2431 pass
2532
33+ def _prepare_code_with_notebook_scoped_packages (
34+ self , compiled_code : str , separator : str = NOTEBOOK_SEPARATOR
35+ ) -> str :
36+ """Prepend notebook-scoped package installation commands to the compiled code."""
37+ if not self .packages_config .packages or not self .packages_config .notebook_scoped :
38+ return compiled_code
39+
40+ index_url = (
41+ f"--index-url { self .packages_config .index_url } "
42+ if self .packages_config .index_url
43+ else ""
44+ )
45+ # Build the %pip install command for notebook-scoped packages
46+ packages = " " .join (self .packages_config .packages )
47+ pip_install_cmd = f"%pip install { index_url } -q { packages } "
48+ logger .debug (f"Adding notebook-scoped package installation: { pip_install_cmd } " )
49+
50+ # Add extra restart python command for Databricks runtimes 13.0 and above
51+ restart_cmd = "dbutils.library.restartPython()"
52+
53+ # Prepend the pip install command to the compiled code
54+ return f"{ pip_install_cmd } { separator } { restart_cmd } { separator } { compiled_code } "
55+
2656
2757class BaseDatabricksHelper (PythonJobHelper ):
2858 """Base helper for python models on Databricks."""
@@ -63,16 +93,24 @@ class PythonCommandSubmitter(PythonSubmitter):
6393 """Submitter for Python models using the Command API."""
6494
6595 def __init__ (
66- self , api_client : DatabricksApiClient , tracker : PythonRunTracker , cluster_id : str
96+ self ,
97+ api_client : DatabricksApiClient ,
98+ tracker : PythonRunTracker ,
99+ cluster_id : str ,
100+ parsed_model : ParsedPythonModel ,
67101 ) -> None :
68102 self .api_client = api_client
69103 self .tracker = tracker
70104 self .cluster_id = cluster_id
105+ super ().__init__ (parsed_model .config .python_packages_config )
71106
72107 @override
73108 def submit (self , compiled_code : str ) -> None :
74109 logger .debug ("Submitting Python model using the Command API." )
75110
111+ # Prepare code with notebook-scoped package installation if needed
112+ compiled_code = self ._prepare_code_with_notebook_scoped_packages (compiled_code )
113+
76114 context_id = self .api_client .command_contexts .create (self .cluster_id )
77115 command_exec : Optional [CommandExecution ] = None
78116 try :
@@ -252,16 +290,24 @@ def get_library_config(
252290 packages : list [str ],
253291 index_url : Optional [str ],
254292 additional_libraries : list [dict [str , Any ]],
293+ notebook_scoped_libraries : bool = False ,
255294) -> dict [str , Any ]:
256- """Update the job configuration with the required libraries."""
295+ """
296+ Update the job configuration with the required libraries.
297+
298+ If notebook_scoped_libraries is True, packages are not included in the library config
299+ as they will be installed via %pip install in the notebook itself.
300+ """
257301
258302 libraries = []
259303
260- for package in packages :
261- if index_url :
262- libraries .append ({"pypi" : {"package" : package , "repo" : index_url }})
263- else :
264- libraries .append ({"pypi" : {"package" : package }})
304+ # Only add packages to cluster-level libraries if not using notebook-scoped
305+ if not notebook_scoped_libraries :
306+ for package in packages :
307+ if index_url :
308+ libraries .append ({"pypi" : {"package" : package , "repo" : index_url }})
309+ else :
310+ libraries .append ({"pypi" : {"package" : package }})
265311
266312 for library in additional_libraries :
267313 libraries .append (library )
@@ -286,7 +332,10 @@ def __init__(
286332 packages = parsed_model .config .packages
287333 index_url = parsed_model .config .index_url
288334 additional_libraries = parsed_model .config .additional_libs
289- library_config = get_library_config (packages , index_url , additional_libraries )
335+ notebook_scoped_libraries = parsed_model .config .notebook_scoped_libraries
336+ library_config = get_library_config (
337+ packages , index_url , additional_libraries , notebook_scoped_libraries
338+ )
290339 self .cluster_spec = {** cluster_spec , ** library_config }
291340 self .job_grants = parsed_model .config .python_job_config .grants
292341 self .additional_job_settings = parsed_model .config .python_job_config .dict ()
@@ -335,11 +384,14 @@ def __init__(
335384 tracker : PythonRunTracker ,
336385 uploader : PythonNotebookUploader ,
337386 config_compiler : PythonJobConfigCompiler ,
387+ parsed_model : ParsedPythonModel ,
338388 ) -> None :
339389 self .api_client = api_client
340390 self .tracker = tracker
341391 self .uploader = uploader
342392 self .config_compiler = config_compiler
393+ self .parsed_model = parsed_model
394+ super ().__init__ (parsed_model .config .python_packages_config )
343395
344396 @staticmethod
345397 def create (
@@ -356,12 +408,17 @@ def create(
356408 parsed_model ,
357409 cluster_spec ,
358410 )
359- return PythonNotebookSubmitter (api_client , tracker , notebook_uploader , config_compiler )
411+ return PythonNotebookSubmitter (
412+ api_client , tracker , notebook_uploader , config_compiler , parsed_model
413+ )
360414
361415 @override
362416 def submit (self , compiled_code : str ) -> None :
363417 logger .debug ("Submitting Python model using the Job Run API." )
364418
419+ # Prepare code with notebook-scoped package installation if needed
420+ compiled_code = self ._prepare_code_with_notebook_scoped_packages (compiled_code )
421+
365422 file_path = self .uploader .upload (compiled_code )
366423 job_config = self .config_compiler .compile (file_path )
367424
@@ -444,7 +501,12 @@ def build_submitter(self) -> PythonSubmitter:
444501 {"existing_cluster_id" : self .cluster_id },
445502 )
446503 else :
447- return PythonCommandSubmitter (self .api_client , self .tracker , self .cluster_id or "" )
504+ return PythonCommandSubmitter (
505+ self .api_client ,
506+ self .tracker ,
507+ self .cluster_id or "" ,
508+ self .parsed_model ,
509+ )
448510
449511 @override
450512 def validate_config (self ) -> None :
@@ -572,6 +634,7 @@ def __init__(
572634 workflow_creater : PythonWorkflowCreator ,
573635 job_grants : dict [str , list [dict [str , str ]]],
574636 acls : list [dict [str , str ]],
637+ parsed_model : ParsedPythonModel ,
575638 ) -> None :
576639 self .api_client = api_client
577640 self .tracker = tracker
@@ -581,6 +644,7 @@ def __init__(
581644 self .workflow_creater = workflow_creater
582645 self .job_grants = job_grants
583646 self .acls = acls
647+ super ().__init__ (parsed_model .config .python_packages_config )
584648
585649 @staticmethod
586650 def create (
@@ -599,6 +663,7 @@ def create(
599663 workflow_creater ,
600664 parsed_model .config .python_job_config .grants ,
601665 parsed_model .config .access_control_list ,
666+ parsed_model ,
602667 )
603668
604669 @override
@@ -611,6 +676,7 @@ def submit(self, compiled_code: str) -> None:
611676 logger .debug (
612677 f"[Workflow Debug] Compiled code preview: { compiled_code [:preview_len ]} ..."
613678 )
679+ compiled_code = self ._prepare_code_with_notebook_scoped_packages (compiled_code )
614680
615681 file_path = self .uploader .upload (compiled_code )
616682 logger .debug (f"[Workflow Debug] Uploaded notebook to: { file_path } " )
0 commit comments