99from dbt .adapters .databricks .api_client import CommandExecution , DatabricksApiClient , WorkflowJobApi
1010from dbt .adapters .databricks .credentials import DatabricksCredentials
1111from dbt .adapters .databricks .logging import logger
12- from dbt .adapters .databricks .python_models .python_config import ParsedPythonModel
12+ from dbt .adapters .databricks .python_models .python_config import (
13+ ParsedPythonModel ,
14+ PythonPackagesConfig ,
15+ )
1316from dbt .adapters .databricks .python_models .run_tracking import PythonRunTracker
1417
1518DEFAULT_TIMEOUT = 60 * 60 * 24
19+ NOTEBOOK_SEPARATOR = "\n \n # COMMAND ----------\n \n "
1620
1721
1822class PythonSubmitter (ABC ):
1923 """Interface for submitting Python models to run on Databricks."""
2024
25+ def __init__ (self , packages_config : PythonPackagesConfig ) -> None :
26+ self .packages_config = packages_config
27+
2128 @abstractmethod
2229 def submit (self , compiled_code : str ) -> None :
2330 """Submit the compiled code to Databricks."""
2431 pass
2532
33+ def _prepare_code_with_notebook_scoped_packages (
34+ self , compiled_code : str , separator : str = NOTEBOOK_SEPARATOR
35+ ) -> str :
36+ """
37+ Prepend notebook-scoped package installation commands to the compiled code.
38+
39+ If notebook-scoped flag is not set, or if there are no packages to install,
40+ returns the original compiled code.
41+ """
42+ if not self .packages_config .packages or not self .packages_config .notebook_scoped :
43+ return compiled_code
44+
45+ index_url = (
46+ f"--index-url { self .packages_config .index_url } "
47+ if self .packages_config .index_url
48+ else ""
49+ )
50+ # Build the %pip install command for notebook-scoped packages
51+ packages = " " .join (self .packages_config .packages )
52+ pip_install_cmd = f"%pip install { index_url } -q { packages } "
53+ logger .debug (f"Adding notebook-scoped package installation: { pip_install_cmd } " )
54+
55+ # Add extra restart python command for Databricks runtimes 13.0 and above
56+ restart_cmd = "dbutils.library.restartPython()"
57+
58+ # Prepend the pip install command to the compiled code
59+ return f"{ pip_install_cmd } { separator } { restart_cmd } { separator } { compiled_code } "
60+
2661
2762class BaseDatabricksHelper (PythonJobHelper ):
2863 """Base helper for python models on Databricks."""
@@ -63,16 +98,24 @@ class PythonCommandSubmitter(PythonSubmitter):
6398 """Submitter for Python models using the Command API."""
6499
65100 def __init__ (
66- self , api_client : DatabricksApiClient , tracker : PythonRunTracker , cluster_id : str
101+ self ,
102+ api_client : DatabricksApiClient ,
103+ tracker : PythonRunTracker ,
104+ cluster_id : str ,
105+ parsed_model : ParsedPythonModel ,
67106 ) -> None :
68107 self .api_client = api_client
69108 self .tracker = tracker
70109 self .cluster_id = cluster_id
110+ super ().__init__ (parsed_model .config .python_packages_config )
71111
72112 @override
73113 def submit (self , compiled_code : str ) -> None :
74114 logger .debug ("Submitting Python model using the Command API." )
75115
116+ # Prepare code with notebook-scoped package installation if needed
117+ compiled_code = self ._prepare_code_with_notebook_scoped_packages (compiled_code )
118+
76119 context_id = self .api_client .command_contexts .create (self .cluster_id )
77120 command_exec : Optional [CommandExecution ] = None
78121 try :
@@ -252,16 +295,24 @@ def get_library_config(
252295 packages : list [str ],
253296 index_url : Optional [str ],
254297 additional_libraries : list [dict [str , Any ]],
298+ notebook_scoped_libraries : bool = False ,
255299) -> dict [str , Any ]:
256- """Update the job configuration with the required libraries."""
300+ """
301+ Update the job configuration with the required libraries.
302+
303+ If notebook_scoped_libraries is True, packages are not included in the library config
304+ as they will be installed via %pip install in the notebook itself.
305+ """
257306
258307 libraries = []
259308
260- for package in packages :
261- if index_url :
262- libraries .append ({"pypi" : {"package" : package , "repo" : index_url }})
263- else :
264- libraries .append ({"pypi" : {"package" : package }})
309+ # Only add packages to cluster-level libraries if not using notebook-scoped
310+ if not notebook_scoped_libraries :
311+ for package in packages :
312+ if index_url :
313+ libraries .append ({"pypi" : {"package" : package , "repo" : index_url }})
314+ else :
315+ libraries .append ({"pypi" : {"package" : package }})
265316
266317 for library in additional_libraries :
267318 libraries .append (library )
@@ -286,7 +337,10 @@ def __init__(
286337 packages = parsed_model .config .packages
287338 index_url = parsed_model .config .index_url
288339 additional_libraries = parsed_model .config .additional_libs
289- library_config = get_library_config (packages , index_url , additional_libraries )
340+ notebook_scoped_libraries = parsed_model .config .notebook_scoped_libraries
341+ library_config = get_library_config (
342+ packages , index_url , additional_libraries , notebook_scoped_libraries
343+ )
290344 self .cluster_spec = {** cluster_spec , ** library_config }
291345 self .job_grants = parsed_model .config .python_job_config .grants
292346 self .additional_job_settings = parsed_model .config .python_job_config .dict ()
@@ -335,11 +389,14 @@ def __init__(
335389 tracker : PythonRunTracker ,
336390 uploader : PythonNotebookUploader ,
337391 config_compiler : PythonJobConfigCompiler ,
392+ parsed_model : ParsedPythonModel ,
338393 ) -> None :
339394 self .api_client = api_client
340395 self .tracker = tracker
341396 self .uploader = uploader
342397 self .config_compiler = config_compiler
398+ self .parsed_model = parsed_model
399+ super ().__init__ (parsed_model .config .python_packages_config )
343400
344401 @staticmethod
345402 def create (
@@ -356,12 +413,17 @@ def create(
356413 parsed_model ,
357414 cluster_spec ,
358415 )
359- return PythonNotebookSubmitter (api_client , tracker , notebook_uploader , config_compiler )
416+ return PythonNotebookSubmitter (
417+ api_client , tracker , notebook_uploader , config_compiler , parsed_model
418+ )
360419
361420 @override
362421 def submit (self , compiled_code : str ) -> None :
363422 logger .debug ("Submitting Python model using the Job Run API." )
364423
424+ # Prepare code with notebook-scoped package installation if needed
425+ compiled_code = self ._prepare_code_with_notebook_scoped_packages (compiled_code )
426+
365427 file_path = self .uploader .upload (compiled_code )
366428 job_config = self .config_compiler .compile (file_path )
367429
@@ -444,7 +506,12 @@ def build_submitter(self) -> PythonSubmitter:
444506 {"existing_cluster_id" : self .cluster_id },
445507 )
446508 else :
447- return PythonCommandSubmitter (self .api_client , self .tracker , self .cluster_id or "" )
509+ return PythonCommandSubmitter (
510+ self .api_client ,
511+ self .tracker ,
512+ self .cluster_id or "" ,
513+ self .parsed_model ,
514+ )
448515
449516 @override
450517 def validate_config (self ) -> None :
@@ -572,6 +639,7 @@ def __init__(
572639 workflow_creater : PythonWorkflowCreator ,
573640 job_grants : dict [str , list [dict [str , str ]]],
574641 acls : list [dict [str , str ]],
642+ parsed_model : ParsedPythonModel ,
575643 ) -> None :
576644 self .api_client = api_client
577645 self .tracker = tracker
@@ -581,6 +649,7 @@ def __init__(
581649 self .workflow_creater = workflow_creater
582650 self .job_grants = job_grants
583651 self .acls = acls
652+ super ().__init__ (parsed_model .config .python_packages_config )
584653
585654 @staticmethod
586655 def create (
@@ -599,6 +668,7 @@ def create(
599668 workflow_creater ,
600669 parsed_model .config .python_job_config .grants ,
601670 parsed_model .config .access_control_list ,
671+ parsed_model ,
602672 )
603673
604674 @override
@@ -611,6 +681,7 @@ def submit(self, compiled_code: str) -> None:
611681 logger .debug (
612682 f"[Workflow Debug] Compiled code preview: { compiled_code [:preview_len ]} ..."
613683 )
684+ compiled_code = self ._prepare_code_with_notebook_scoped_packages (compiled_code )
614685
615686 file_path = self .uploader .upload (compiled_code )
616687 logger .debug (f"[Workflow Debug] Uploaded notebook to: { file_path } " )
0 commit comments