Skip to content

Commit dff5042

Browse files
authored
Merge pull request #22 from thisisqubika/DC-320-migration-report
DC 320: Migration Report
2 parents ec9c580 + f1211e5 commit dff5042

8 files changed

Lines changed: 519 additions & 15 deletions

File tree

.github/workflows/databricks-asset-bundle-deploy.yml

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,4 @@ jobs:
5757
databricks bundle deploy -t dev
5858
databricks bundle run databricks_job_executor_app -t dev
5959
databricks bundle summary
60-
databricks bundle resources
61-
62-
# - name: Sync app source to workspace
63-
# working-directory: databricks_job_executor
64-
# run: |
65-
# WORKSPACE_PATH="/Workspace/Shared/databricks_job_executor"
66-
# databricks workspace mkdirs "$WORKSPACE_PATH" || true
67-
# databricks workspace import-dir . "$WORKSPACE_PATH" --overwrite
68-
69-
# - name: Deploy app
70-
# working-directory: databricks_job_executor
71-
# run: |
72-
# databricks apps deploy dbx-job-executor-app-2 --source-code-path /Workspace/Shared/databricks_job_executor
73-
# databricks apps update dbx-job-executor-app-2 --default-source-code-path /Workspace/Shared/databricks_job_executor || true
60+
databricks bundle resources

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ description = "Reusable python logic for Snowflake - Databricks Migration."
55
authors = ["Samuel Solarte samuel.solarte@qubika.com"]
66
packages = [
77
{ include = "migration_accelerator_package", from = "src" },
8-
{ include = "artifact_translation_package", from = "src" }
8+
{ include = "artifact_translation_package", from = "src" },
9+
{ include = "migration_report_package", from = "src" }
910
]
1011

1112

@@ -39,6 +40,7 @@ build-backend = "poetry.core.masonry.api"
3940

4041
[tool.poetry.scripts]
4142
migration-accelerator = "migration_accelerator_package.main:main"
43+
migration-report = "migration_report_package.main:main"
4244
snowpark-reader = "migration_accelerator_package.snowpark:main"
4345
snowflake-validator = "migration_accelerator_package.ingestion_validation:main"
4446
grant-transformer = "migration_accelerator_package.grant_transformation:main"

resources/jobs.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ resources:
4545
python_wheel_task:
4646
entry_point: translation-module
4747
package_name: migration_accelerator_package
48+
49+
- task_key: migration_report_task
50+
depends_on:
51+
- task_key: artifact_translation_task
52+
existing_cluster_id: 1214-215558-eghymads
53+
libraries:
54+
- whl: "../dist/*.whl"
55+
python_wheel_task:
56+
entry_point: migration-report
57+
package_name: migration_accelerator_package
4858

4959
# Using existing cluster "data-migration" (ID: 1214-215558-eghymads); no job_clusters or spark_version needed.
5060

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
# Migration Report Graph
2+
3+
A LangGraph-based system for creating a migration report based on the translation and evaluation results obtained from the DDL Translation Graph.
4+
5+
## Project Structure
6+
7+
```
8+
migration_report_package/
9+
├── graph_builder.py # LangGraph construction
10+
├── main.py # Main entry point for the migration report
11+
├── report_llm.py # LLM configuration for report generation
12+
```
13+
14+
## Usage
15+
16+
Process JSON files where each file contains a specific artifact type. The artifact type is determined from the filename (e.g., `tables.json`, `views.json`).
17+
18+
```bash
19+
# Search for the "output" folder on the Default location and save the report there
20+
python main.py
21+
22+
# Search for the "output" folder on the Default location and save the report on a custom location
23+
python main.py --md_output ./custom_location
24+
25+
# Search for the "output" folder save the report on a custom location
26+
python main.py --output_dir ./custom_location --md_output ./custom_location
27+
```
28+
29+
### Programmatic Usage
30+
31+
```python
32+
from graph_builder import MigrationReportGraph
33+
34+
# Create report from "output" folder in input_dir and save it in output_dir
35+
graph = MigrationReportGraph()
36+
md_report, json_report = graph.run(input_dir)
37+
38+
with open(os.path.join(output_dir, "migration_report.md"), "w", encoding="utf-8") as f:
39+
f.write(md_report)
40+
41+
print("JSON Report: ",json_report)
42+
```
43+
44+
## Migration Report Example
45+
46+
```markdown
47+
# Migration Report
48+
## Overview
49+
The migration process has completed with the following summary:
50+
- Total artifacts migrated: 8
51+
- Total errors: 0
52+
- Total warnings: 0
53+
- Total successes: 8
54+
- Validation errors: 2
55+
56+
## Detailed Results per Artifact Type
57+
58+
### Schemas
59+
| Artifact Name | Type | Status |
60+
| --- | --- | --- |
61+
| BRONZE_LAYER | schemas | success |
62+
| SILVER_LAYER | schemas | success |
63+
| GOLD_LAYER | schemas | success |
64+
65+
### Tables
66+
| Artifact Name | Type | Status |
67+
| --- | --- | --- |
68+
| EXAMPLE_TABLE_1 | tables | error |
69+
| EXAMPLE_TABLE_2 | tables | error |
70+
71+
### Views
72+
| Artifact Name | Type | Status |
73+
| --- | --- | --- |
74+
| ACTIVE_USERS_VIEW | views | success |
75+
| SALES_SUMMARY_VIEW | views | success |
76+
| INVENTORY_STATUS_VIEW | views | success |
77+
78+
## Error and Warning Sections
79+
80+
### Errors
81+
No errors were reported during the migration.
82+
83+
### Warnings
84+
No warnings were reported during the migration.
85+
86+
## Objects Requiring Manual Review
87+
The following objects require manual review due to validation errors or other issues:
88+
- EXAMPLE_TABLE_1 (tables): Validation failed with syntax error.
89+
- EXAMPLE_TABLE_2 (tables): Validation failed with syntax error.
90+
91+
## Summary of AI-assisted vs Rule-based Outputs
92+
The migration utilized a combination of AI-assisted and rule-based approaches. The exact distribution is not available in the provided data.
93+
94+
## Performance Metrics
95+
- Total duration: 31.17 seconds
96+
- Stage durations:
97+
- translate_tables: 22.95 seconds
98+
- translate_views: 5.57 seconds
99+
- translate_schemas: 2.44 seconds
100+
101+
## Analysis
102+
103+
### Common Translation Errors
104+
- The translation of tables resulted in Python code that failed validation due to syntax errors.
105+
106+
### Patterns in Warnings or Inconsistencies
107+
- No warnings were reported, but the validation errors for tables indicate a potential inconsistency in the translation process.
108+
109+
### Success Rate per Artifact Type
110+
- Schemas: 100% success (3/3)
111+
- Tables: 0% success (0/2) due to validation errors
112+
- Views: 100% success (3/3)
113+
114+
### Unsupported or Partially Supported Features
115+
- The translation process generated Python code for tables, which is not directly executable in Databricks. This indicates a potential gap in the translation rules for tables.
116+
117+
### Dependencies that Failed or Were Skipped
118+
- The tables (EXAMPLE_TABLE_1 and EXAMPLE_TABLE_2) failed due to validation errors, indicating potential issues with dependencies or the translation process.
119+
120+
### Recommendations for Improving Translation Rules
121+
1. Review and adjust the translation rules for tables to directly generate valid Databricks DDL instead of Python code.
122+
2. Enhance the validation step to catch syntax errors early in the translation process.
123+
124+
### Suggested Workaround for Unsupported Features
125+
For tables, manually review and convert the generated Python code into valid Databricks DDL statements. Ensure that the data types and constraints are correctly translated.
126+
```
127+
128+
## Requirements
129+
130+
- Python 3.7+
131+
- LangChain ecosystem

src/migration_report_package/__init__.py

Whitespace-only changes.
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
from typing import Any, Dict, List, Optional, Annotated, TypedDict
2+
from typing_extensions import TypedDict
3+
from langgraph.graph import StateGraph, END, START
4+
from langchain_core.runnables import RunnableConfig
5+
from report_llm import generate_report
6+
from datetime import datetime
7+
import os
8+
9+
import json
10+
11+
12+
class MigrationState(TypedDict):
13+
"""State for the migration graph execution."""
14+
input_dir: str
15+
latest_dir: str
16+
raw: Dict[str, Any]
17+
cleaned_raw: Dict[str, Any]
18+
count: Dict[str, Any]
19+
json_report: Dict[str, Any]
20+
md_report: str
21+
22+
def input_node(state: MigrationState) -> MigrationState:
23+
"""Input node for the migration graph."""
24+
input_path = state["input_dir"]
25+
if not os.path.exists(input_path):
26+
raise FileNotFoundError(f"Output folder not found: {input_path}")
27+
## Get output with most recent timestamp
28+
output_dirs = []
29+
for name in os.listdir(input_path):
30+
res = os.path.join(input_path, name)
31+
if not os.path.isdir(res):
32+
continue
33+
run_dt = datetime.strptime(name, "%Y-%m-%dT%H-%M-%SZ")
34+
output_dirs.append((run_dt, res))
35+
36+
_ , latest = max(output_dirs, key=lambda x: x[0])
37+
state["latest_dir"] = latest
38+
## Get translation results and evaluation notes
39+
raw = {"translation_results": [], "evaluation": []}
40+
for name in os.listdir(latest):
41+
out = os.path.join(latest, name)
42+
if os.path.isdir(out):
43+
for file_path in os.listdir(out):
44+
file = os.path.join(out, file_path)
45+
if "evaluation" in os.path.basename(file).lower():
46+
with open(file, "r", encoding="utf-8") as f:
47+
raw["evaluation"].append(json.load(f))
48+
else:
49+
if "translation_results.json" in os.path.basename(out).lower():
50+
with open(out, "r", encoding="utf-8") as f:
51+
raw["translation_results"].append(json.load(f))
52+
state["raw"] = raw
53+
return state
54+
55+
def clean_raw(obj: Any) -> Any:
56+
if obj is None:
57+
return None
58+
59+
# Handle dicts
60+
if isinstance(obj, dict):
61+
cleaned = {}
62+
for k, v in obj.items():
63+
pruned = clean_raw(v)
64+
if pruned not in (None, {}, [], ""):
65+
cleaned[k] = pruned
66+
return cleaned or None
67+
68+
# Handle lists
69+
if isinstance(obj, list):
70+
cleaned = []
71+
for item in obj:
72+
pruned = clean_raw(item)
73+
if pruned not in (None, {}, [], ""):
74+
cleaned.append(pruned)
75+
return cleaned or None
76+
77+
# Handle strings (remove empty and prune to MAX_LEN length)
78+
MAX_LEN = 150
79+
if isinstance(obj, str):
80+
s = obj.strip()
81+
if not s:
82+
return None
83+
return s if len(s) <= MAX_LEN else s[:MAX_LEN] + "…"
84+
85+
return obj
86+
87+
88+
def clean_raw_node(state: MigrationState) -> MigrationState:
89+
"""clean raw data by removing empty values and pruning long strings"""
90+
state["cleaned_raw"] = clean_raw(state["raw"])
91+
return state
92+
93+
def count_node(state: MigrationState) -> MigrationState:
94+
"""Count trnslated artifacts, errors, warnings and validation errors for the report."""
95+
count = {"artifact_type": {}, "migration_errors": 0, "migration_warnings": 0, "successes": 0, "validation_errors": 0}
96+
for trans in state.get("cleaned_raw", {}).get("translation_results", []):
97+
for type, value in trans.get("observability", {}).get("artifact_counts", {}).items():
98+
if count["artifact_type"].get(type) is None:
99+
count["artifact_type"][type] = value
100+
count["successes"] += value
101+
else:
102+
count["artifact_type"][type] += value
103+
count["successes"] += value
104+
count["migration_errors"] += trans["observability"]["total_errors"]
105+
count["migration_warnings"] += trans["observability"]["total_warnings"]
106+
for eval in state.get("cleaned_raw", {}).get("evaluation", []):
107+
for res in eval.get("validation", {}).get("results", []):
108+
count["validation_errors"] += (1 if not res.get("syntax_valid", True) else 0)
109+
state["count"] = count
110+
return state
111+
112+
def report_node(state: MigrationState) -> MigrationState:
113+
"""Create report with LLM."""
114+
result = generate_report(state["cleaned_raw"], state["count"])
115+
return {**state, "md_report": result, "json_report": state["count"]}
116+
117+
class MigrationReportGraph:
118+
def __init__(self, run_id: Optional[str] = None):
119+
"""
120+
Initialize migration report graph.
121+
122+
Args:
123+
run_id: Unique identifier for this run
124+
"""
125+
# Create the StateGraph
126+
self.graph = StateGraph(MigrationState)
127+
128+
# Add nodes
129+
self.graph.add_node("input", input_node)
130+
self.graph.add_node("clean", clean_raw_node)
131+
self.graph.add_node("count", count_node)
132+
self.graph.add_node("report", report_node)
133+
134+
self.graph.add_edge(START, "input")
135+
self.graph.add_edge("input", "clean")
136+
self.graph.add_edge("clean", "count")
137+
self.graph.add_edge("count", "report")
138+
self.graph.add_edge("report", END)
139+
140+
# Compile the graph
141+
self.compiled_graph = self.graph.compile()
142+
143+
def run(self, input_path: str) -> Dict[str, Any]:
144+
try:
145+
initial_state: MigrationState = {
146+
"input_dir": input_path,
147+
"latest_dir": None,
148+
"raw": [],
149+
"count": None,
150+
"json_report": None,
151+
"md_report": None
152+
}
153+
154+
final_state = self.compiled_graph.invoke(initial_state)
155+
report = final_state["md_report"] or {}
156+
json_report = final_state["json_report"] or {}
157+
latest_dir = final_state["latest_dir"] or {}
158+
return report[0], json_report, latest_dir
159+
160+
except Exception as e:
161+
raise
162+

0 commit comments

Comments
 (0)