Skip to content

Commit 5cdb0c5

Browse files
QuentinAmbardQuentin Ambardclaude
authored
Improve pipeline tools: wait for completion, concise errors, better defaults (#366)
* Add early validation for catalog/schema/volume existence - Add _validate_volume_path() to check schema and volume exist before LLM work - Fail fast with clear ValueError instead of stalling on upload failures - Add timing logs for LLM calls to help debug performance - Reduces wasted time/cost when destination doesn't exist 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * feat: Make upload_folder follow cp -r semantics for folder name handling - Without trailing slash: preserves source folder name (pipeline → dest/pipeline/) - With trailing slash or /*: copies contents only (pipeline/ → dest/) - Supports both Unix (/) and Windows (\) path separators 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: Reduce get_pipeline_events default from 100 to 5 Events contain verbose stack traces that can overflow output buffers. Default of 5 captures recent update lifecycle (INITIALIZING → FAILED) with error details without overwhelming the response. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * Improve get_pipeline_events defaults and add filtering options - Reduce default max_results from 100 to 5 (events contain verbose stack traces) - Add filter parameter with default "level='ERROR'" to focus on errors - Add update_id parameter to filter events by specific pipeline update - Update docstrings with filter examples and usage guidance 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * Add wait option to start_update with default=True - start_update now waits for pipeline completion by default (wait=True) - Returns state, success, duration_seconds, and errors on completion - On failure, automatically fetches ERROR/WARN events for that update_id - Default timeout is 5 minutes (configurable) - Change get_pipeline_events default filter to include WARN level 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * Improve get_update to auto-fetch errors and hide verbose config - get_update now returns a simplified dict with state, success, cause, errors - Auto-fetches ERROR/WARN events when update fails (like start_update) - Add include_config=False (default) to hide verbose pipeline configuration - Fix get_pipeline_events update_id filter to use client-side filtering (API doesn't support origin.update_id in filter expressions) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * Add error_summary for concise error messages in pipeline tools - Add _extract_error_summary() to get clean error messages without stack traces - start_update and get_update now return error_summary by default - Add full_error_details=False parameter to get full stack traces if needed - Falls back to event.message if no exception message found Example output: { "state": "FAILED", "error_summary": ["SparkException: File '...' was not found..."] } 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * Shorten MCP tool docstrings for pipelines Reduce verbosity in tool descriptions - the LLM will read the return values anyway, so detailed return schemas aren't needed in docstrings. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * Simplify pipeline MCP tool docs and use event_log_level - Remove verbose SDK option lists from extra_settings docs - Replace filter param with event_log_level (ERROR, WARN, INFO) - WARN includes ERROR, INFO includes all events 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * Add SDP acronym to create_pipeline docstring 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: Remove unused imports 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: Line too long lint errors 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Quentin Ambard <quentin.ambard@databricks.com> Co-authored-by: Claude <noreply@anthropic.com>
1 parent 357d85f commit 5cdb0c5

File tree

8 files changed

+376
-115
lines changed

8 files changed

+376
-115
lines changed

databricks-mcp-server/databricks_mcp_server/tools/file.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,13 @@ def upload_folder(
2323
Uses parallel uploads with ThreadPoolExecutor for performance.
2424
Automatically handles all file types.
2525
26+
Follows `cp -r` semantics:
27+
- With trailing slash or /* (e.g., "pipeline/" or "pipeline/*"): copies contents into workspace_folder
28+
- Without trailing slash (e.g., "pipeline"): creates workspace_folder/pipeline/
29+
2630
Args:
27-
local_folder: Path to local folder to upload
31+
local_folder: Path to local folder to upload. Add trailing slash to copy
32+
contents only, omit to preserve folder name.
2833
workspace_folder: Target path in Databricks workspace
2934
(e.g., "/Workspace/Users/user@example.com/my-project")
3035
max_workers: Maximum parallel upload threads (default: 10)

databricks-mcp-server/databricks_mcp_server/tools/pdf.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from ..server import mcp
1212

1313

14-
@mcp.tool
14+
@mcp.tool(timeout=300.0)
1515
def generate_and_upload_pdfs(
1616
catalog: str,
1717
schema: str,
@@ -101,7 +101,7 @@ def generate_and_upload_pdfs(
101101
}
102102

103103

104-
@mcp.tool
104+
@mcp.tool(timeout=60.0)
105105
def generate_and_upload_pdf(
106106
title: str,
107107
description: str,

databricks-mcp-server/databricks_mcp_server/tools/pipelines.py

Lines changed: 71 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -37,20 +37,18 @@ def create_pipeline(
3737
extra_settings: Dict[str, Any] = None,
3838
) -> Dict[str, Any]:
3939
"""
40-
Create a new Spark Declarative Pipeline (Unity Catalog, serverless by default).
40+
Create a new Spark Declarative Pipeline (SDP). Unity Catalog, serverless by default.
4141
4242
Args:
4343
name: Pipeline name
44-
root_path: Root folder for source code (added to Python sys.path for imports)
44+
root_path: Root folder for source code (added to sys.path)
4545
catalog: Unity Catalog name
4646
schema: Schema name for output tables
47-
workspace_file_paths: List of workspace file paths (raw .sql or .py files)
48-
extra_settings: Optional dict with additional pipeline settings (clusters,
49-
continuous, development, photon, edition, channel, event_log, configuration,
50-
notifications, tags, serverless, etc.). Explicit parameters take precedence.
47+
workspace_file_paths: List of workspace .sql or .py file paths
48+
extra_settings: Additional pipeline settings dict
5149
5250
Returns:
53-
Dictionary with pipeline_id of the created pipeline.
51+
Dict with pipeline_id.
5452
"""
5553
# Auto-inject default tags into extra_settings; user tags take precedence
5654
extra_settings = extra_settings or {}
@@ -116,13 +114,11 @@ def update_pipeline(
116114
root_path: New root folder for source code
117115
catalog: New catalog name
118116
schema: New schema name
119-
workspace_file_paths: New list of file paths (raw .sql or .py files)
120-
extra_settings: Optional dict with additional pipeline settings (clusters,
121-
continuous, development, photon, edition, channel, event_log, configuration,
122-
notifications, tags, serverless, etc.). Explicit parameters take precedence.
117+
workspace_file_paths: New list of .sql or .py file paths
118+
extra_settings: Additional pipeline settings dict
123119
124120
Returns:
125-
Dictionary with status message.
121+
Dict with status.
126122
"""
127123
_update_pipeline(
128124
pipeline_id=pipeline_id,
@@ -157,51 +153,70 @@ def delete_pipeline(pipeline_id: str) -> Dict[str, str]:
157153
return {"status": "deleted"}
158154

159155

160-
@mcp.tool
156+
@mcp.tool(timeout=300)
161157
def start_update(
162158
pipeline_id: str,
163159
refresh_selection: List[str] = None,
164160
full_refresh: bool = False,
165161
full_refresh_selection: List[str] = None,
166162
validate_only: bool = False,
167-
) -> Dict[str, str]:
163+
wait: bool = True,
164+
timeout: int = 300,
165+
full_error_details: bool = False,
166+
) -> Dict[str, Any]:
168167
"""
169-
Start a pipeline update or dry-run validation.
168+
Start a pipeline update. Waits for completion by default.
170169
171170
Args:
172171
pipeline_id: Pipeline ID
173-
refresh_selection: List of table names to refresh
174-
full_refresh: If True, performs full refresh of all tables
175-
full_refresh_selection: List of table names for full refresh
176-
validate_only: If True, validates without updating data (dry run)
172+
refresh_selection: Table names to refresh
173+
full_refresh: Full refresh all tables
174+
full_refresh_selection: Table names for full refresh
175+
validate_only: Dry run without updating data
176+
wait: Wait for completion (default: True)
177+
timeout: Max wait time in seconds (default: 300)
178+
full_error_details: Include full stack traces (default: False)
177179
178180
Returns:
179-
Dictionary with update_id for polling status.
181+
Dict with update_id, state, success, error_summary if failed.
180182
"""
181-
update_id = _start_update(
183+
return _start_update(
182184
pipeline_id=pipeline_id,
183185
refresh_selection=refresh_selection,
184186
full_refresh=full_refresh,
185187
full_refresh_selection=full_refresh_selection,
186188
validate_only=validate_only,
189+
wait=wait,
190+
timeout=timeout,
191+
full_error_details=full_error_details,
187192
)
188-
return {"update_id": update_id}
189193

190194

191195
@mcp.tool
192-
def get_update(pipeline_id: str, update_id: str) -> Dict[str, Any]:
196+
def get_update(
197+
pipeline_id: str,
198+
update_id: str,
199+
include_config: bool = False,
200+
full_error_details: bool = False,
201+
) -> Dict[str, Any]:
193202
"""
194-
Get pipeline update status and results.
203+
Get pipeline update status. Auto-fetches errors if failed.
195204
196205
Args:
197206
pipeline_id: Pipeline ID
198207
update_id: Update ID from start_update
208+
include_config: Include full pipeline config (default: False)
209+
full_error_details: Include full stack traces (default: False)
199210
200211
Returns:
201-
Dictionary with update status (QUEUED, RUNNING, COMPLETED, FAILED, etc.)
212+
Dict with update_id, state, success, error_summary if failed.
202213
"""
203-
result = _get_update(pipeline_id=pipeline_id, update_id=update_id)
204-
return result.as_dict() if hasattr(result, "as_dict") else vars(result)
214+
return _get_update(
215+
pipeline_id=pipeline_id,
216+
update_id=update_id,
217+
include_config=include_config,
218+
full_error_details=full_error_details,
219+
)
205220

206221

207222
@mcp.tool
@@ -222,21 +237,33 @@ def stop_pipeline(pipeline_id: str) -> Dict[str, str]:
222237
@mcp.tool
223238
def get_pipeline_events(
224239
pipeline_id: str,
225-
max_results: int = 100,
240+
max_results: int = 5,
241+
event_log_level: str = "WARN",
242+
update_id: str = None,
226243
) -> List[Dict[str, Any]]:
227244
"""
228-
Get pipeline events, issues, and error messages.
229-
230-
Use this to debug pipeline failures.
245+
Get pipeline events for debugging. Returns ERROR/WARN by default.
231246
232247
Args:
233248
pipeline_id: Pipeline ID
234-
max_results: Maximum number of events to return (default: 100)
249+
max_results: Max events to return (default: 5)
250+
event_log_level: ERROR, WARN (includes ERROR), or INFO (all events)
251+
update_id: Filter to specific update
235252
236253
Returns:
237-
List of event dictionaries with error details.
254+
List of event dicts with error details.
238255
"""
239-
events = _get_pipeline_events(pipeline_id=pipeline_id, max_results=max_results)
256+
# Convert log level to filter expression
257+
level_filters = {
258+
"ERROR": "level='ERROR'",
259+
"WARN": "level in ('ERROR', 'WARN')",
260+
"INFO": "", # No filter = all events
261+
}
262+
filter_expr = level_filters.get(event_log_level.upper(), level_filters["WARN"])
263+
264+
events = _get_pipeline_events(
265+
pipeline_id=pipeline_id, max_results=max_results, filter=filter_expr, update_id=update_id
266+
)
240267
return [e.as_dict() if hasattr(e, "as_dict") else vars(e) for e in events]
241268

242269

@@ -254,67 +281,22 @@ def create_or_update_pipeline(
254281
extra_settings: Dict[str, Any] = None,
255282
) -> Dict[str, Any]:
256283
"""
257-
Create a new pipeline or update an existing one with the same name.
258-
259-
This is the main tool for pipeline management. It:
260-
1. Searches for an existing pipeline with the same name (or uses 'id' from extra_settings)
261-
2. Creates a new pipeline or updates the existing one
262-
3. Optionally starts a pipeline run with full refresh
263-
4. Optionally waits for the run to complete and returns detailed results
264-
265-
Uses Unity Catalog and serverless compute by default.
284+
Create or update a pipeline by name, optionally run it. Uses Unity Catalog + serverless.
266285
267286
Args:
268287
name: Pipeline name (used for lookup and creation)
269-
root_path: Root folder for source code (added to Python sys.path for imports)
270-
catalog: Unity Catalog name for output tables
288+
root_path: Root folder for source code (added to sys.path)
289+
catalog: Unity Catalog name
271290
schema: Schema name for output tables
272-
workspace_file_paths: List of workspace file paths (raw .sql or .py files)
273-
start_run: If True, start a pipeline update after create/update (default: False)
274-
wait_for_completion: If True, wait for run to complete (default: False)
275-
full_refresh: If True, perform full refresh when starting (default: True)
276-
timeout: Maximum wait time in seconds (default: 1800 = 30 minutes)
277-
extra_settings: Optional dict with additional pipeline settings. Supports all SDK
278-
options: clusters, continuous, development, photon, edition, channel, event_log,
279-
configuration, notifications, tags, serverless, etc.
280-
If 'id' is provided, the pipeline will be updated instead of created.
281-
Explicit parameters (name, root_path, catalog, schema) take precedence.
291+
workspace_file_paths: List of workspace .sql or .py file paths
292+
start_run: Start pipeline update after create/update
293+
wait_for_completion: Wait for run to complete
294+
full_refresh: Full refresh when starting (default: True)
295+
timeout: Max wait time in seconds (default: 1800)
296+
extra_settings: Additional pipeline settings dict
282297
283298
Returns:
284-
Dictionary with detailed status:
285-
- pipeline_id: The pipeline ID
286-
- pipeline_name: The pipeline name
287-
- created: True if newly created, False if updated
288-
- success: True if all operations succeeded
289-
- state: Final state if run was started (COMPLETED, FAILED, etc.)
290-
- duration_seconds: Time taken if waited
291-
- error_message: Error message if failed
292-
- errors: List of detailed errors if failed
293-
- message: Human-readable status message
294-
295-
Example usage:
296-
# Just create/update the pipeline
297-
create_or_update_pipeline(name="my_pipeline", ...)
298-
299-
# Create/update and run immediately
300-
create_or_update_pipeline(name="my_pipeline", ..., start_run=True)
301-
302-
# Create/update, run, and wait for completion
303-
create_or_update_pipeline(
304-
name="my_pipeline", ...,
305-
start_run=True,
306-
wait_for_completion=True
307-
)
308-
309-
# Create with custom settings (non-serverless, development mode)
310-
create_or_update_pipeline(
311-
name="my_pipeline", ...,
312-
extra_settings={
313-
"serverless": False,
314-
"development": True,
315-
"clusters": [{"label": "default", "num_workers": 2}]
316-
}
317-
)
299+
Dict with pipeline_id, created (bool), success, state, error_summary if failed.
318300
"""
319301
# Auto-inject default tags into extra_settings; user tags take precedence
320302
extra_settings = extra_settings or {}

databricks-mcp-server/databricks_mcp_server/tools/workspace.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import Any, Dict, List, Optional
77

88
from databricks_tools_core.auth import (
9-
clear_active_workspace,
109
get_active_workspace,
1110
get_workspace_client,
1211
set_active_workspace,

databricks-tools-core/databricks_tools_core/file/workspace.py

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,15 @@ def upload_folder(
146146
Uses parallel uploads with ThreadPoolExecutor for performance.
147147
Automatically handles all file types using ImportFormat.AUTO.
148148
149+
Follows `cp -r` semantics:
150+
- With trailing slash or /* (e.g., "pipeline/" or "pipeline/*"): copies contents into workspace_folder
151+
- Without trailing slash (e.g., "pipeline"): creates workspace_folder/pipeline/
152+
149153
Args:
150-
local_folder: Path to local folder to upload
154+
local_folder: Path to local folder to upload. Add trailing slash to copy
155+
contents only, omit to preserve folder name.
151156
workspace_folder: Target path in Databricks workspace
152-
(e.g., "/Users/user@example.com/my-project")
157+
(e.g., "/Workspace/Users/user@example.com/my-project")
153158
max_workers: Maximum number of parallel upload threads (default: 10)
154159
overwrite: Whether to overwrite existing files (default: True)
155160
@@ -161,25 +166,47 @@ def upload_folder(
161166
ValueError: If local folder is not a directory
162167
163168
Example:
169+
>>> # Copy folder preserving name: creates /Workspace/.../dest/my-project/
164170
>>> result = upload_folder(
165171
... local_folder="/path/to/my-project",
166-
... workspace_folder="/Users/me@example.com/my-project"
172+
... workspace_folder="/Workspace/Users/me@example.com/dest"
173+
... )
174+
>>> # Copy contents only: files go directly into /Workspace/.../dest/
175+
>>> result = upload_folder(
176+
... local_folder="/path/to/my-project/",
177+
... workspace_folder="/Workspace/Users/me@example.com/dest"
167178
... )
168179
>>> print(f"Uploaded {result.successful}/{result.total_files} files")
169180
>>> if not result.success:
170181
... for failed in result.get_failed_uploads():
171182
... print(f"Failed: {failed.local_path} - {failed.error}")
172183
"""
184+
# Check if user wants to copy contents only (trailing slash or /*) or preserve folder name
185+
# Supports: "folder/", "folder/*", "folder\\*" (Windows)
186+
copy_contents_suffixes = ("/", os.sep, "/*", os.sep + "*")
187+
copy_contents_only = local_folder.endswith(copy_contents_suffixes)
188+
189+
# Strip /* or * suffix before validation
190+
clean_local_folder = local_folder.rstrip("*").rstrip("/").rstrip(os.sep)
191+
173192
# Validate local folder
174-
local_folder = os.path.abspath(local_folder)
175-
if not os.path.exists(local_folder):
176-
raise FileNotFoundError(f"Local folder not found: {local_folder}")
177-
if not os.path.isdir(local_folder):
178-
raise ValueError(f"Path is not a directory: {local_folder}")
193+
local_folder_abs = os.path.abspath(clean_local_folder)
194+
if not os.path.exists(local_folder_abs):
195+
raise FileNotFoundError(f"Local folder not found: {local_folder_abs}")
196+
if not os.path.isdir(local_folder_abs):
197+
raise ValueError(f"Path is not a directory: {local_folder_abs}")
179198

180199
# Normalize workspace path (remove trailing slash)
181200
workspace_folder = workspace_folder.rstrip("/")
182201

202+
# If not copying contents only, append the source folder name to destination
203+
if not copy_contents_only:
204+
folder_name = os.path.basename(local_folder_abs)
205+
workspace_folder = f"{workspace_folder}/{folder_name}"
206+
207+
# Use absolute path for file collection
208+
local_folder = local_folder_abs
209+
183210
# Initialize client
184211
w = get_workspace_client()
185212

0 commit comments

Comments
 (0)