diff --git a/.gitignore b/.gitignore index b7faf40..a03afab 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,8 @@ share/python-wheels/ .installed.cfg *.egg MANIFEST +translation_graph/tests/integration/example_data + # PyInstaller # Usually these files are written by a python script from a template diff --git a/MAPPING_EXAMPLES.md b/MAPPING_EXAMPLES.md new file mode 100644 index 0000000..b227930 --- /dev/null +++ b/MAPPING_EXAMPLES.md @@ -0,0 +1,812 @@ +# Snowflake to Databricks Migration Mapping Examples + +This document provides concrete examples of Snowflake objects and their corresponding Databricks equivalents, based on actual object structures extracted from Snowflake. + +## Table of Contents + +1. [Data Structures](#data-structures) +2. [Data Transportation & Streaming](#data-transportation--streaming) +3. [Governance & Security](#governance--security) +4. [Metadata & Object Properties](#metadata--object-properties) +5. [Programmatic & Logical Objects](#programmatic--logical-objects) + +--- + +## Data Structures + +### Database → Catalog + +**Snowflake Input:** +```json +{ + "database": "DATA_MIGRATION_DB" +} +``` + +**Databricks Output:** +```sql +CREATE CATALOG IF NOT EXISTS data_migration_db +COMMENT 'Migrated from Snowflake database DATA_MIGRATION_DB'; +``` + +**Mapping Rule:** Direct equivalent - Database maps directly to Catalog in Unity Catalog. + +--- + +### Schema → Schema + +**Snowflake Input:** +```json +{ + "schema": "DATA_MIGRATION_SCHEMA", + "database": "DATA_MIGRATION_DB" +} +``` + +**Databricks Output:** +```sql +CREATE SCHEMA IF NOT EXISTS data_migration_db.data_migration_schema +COMMENT 'Migrated from Snowflake schema DATA_MIGRATION_SCHEMA'; +``` + +**Mapping Rule:** Direct equivalent - Schema structure is directly compatible. + +--- + +### Tables (Permanent) → Managed Tables + +**Snowflake Input:** +```json +{ + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "table_name": "DATA_MIGRATION_SOURCE", + "table_type": "BASE TABLE", + "row_count": 0, + "bytes": 0, + "created": "2025-11-20 12:53:20.787000-08:00", + "last_altered": "2025-11-20 13:05:49.773000-08:00", + "comment": "Data migration source table with various data types", + "columns": [ + { + "column_name": "SOURCE_ID", + "data_type": "NUMBER", + "character_maximum_length": null, + "numeric_precision": 38, + "numeric_scale": 0, + "is_nullable": "NO", + "column_default": null, + "comment": null + }, + { + "column_name": "SOURCE_NAME", + "data_type": "TEXT", + "character_maximum_length": 100, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "SOURCE_TYPE", + "data_type": "TEXT", + "character_maximum_length": 100, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "SOURCE_CONNECTION", + "data_type": "TEXT", + "character_maximum_length": 255, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "SOURCE_STATUS", + "data_type": "TEXT", + "character_maximum_length": 50, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "MIGRATION_TIMESTAMP", + "data_type": "TIMESTAMP_NTZ", + "character_maximum_length": null, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "IS_ACTIVE", + "data_type": "BOOLEAN", + "character_maximum_length": null, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "RECORD_COUNT", + "data_type": "NUMBER", + "character_maximum_length": null, + "numeric_precision": 18, + "numeric_scale": 0, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "METADATA", + "data_type": "VARIANT", + "character_maximum_length": null, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "TAGS", + "data_type": "ARRAY", + "character_maximum_length": null, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "CONFIG", + "data_type": "OBJECT", + "character_maximum_length": null, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "CREATED_AT", + "data_type": "TIMESTAMP_LTZ", + "character_maximum_length": null, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "YES", + "column_default": "CURRENT_TIMESTAMP()", + "comment": null + }, + { + "column_name": "UPDATED_AT", + "data_type": "TIMESTAMP_LTZ", + "character_maximum_length": null, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "YES", + "column_default": "CURRENT_TIMESTAMP()", + "comment": null + } + ], + "sample_data": [] +} +``` + +**Databricks Output:** +```sql +CREATE TABLE IF NOT EXISTS data_migration_db.data_migration_schema.data_migration_source ( + source_id BIGINT NOT NULL, + source_name VARCHAR(100), + source_type VARCHAR(100), + source_connection VARCHAR(255), + source_status VARCHAR(50), + migration_timestamp TIMESTAMP, + is_active BOOLEAN, + record_count BIGINT, + metadata STRING, + tags ARRAY, + config MAP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP(), + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP() +) +COMMENT 'Data migration source table with various data types'; +``` + +--- + +**Snowflake Input (Second Table Example):** +```json +{ + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "table_name": "DATA_MIGRATION_TARGET", + "table_type": "BASE TABLE", + "row_count": 0, + "bytes": 0, + "created": "2025-11-20 12:53:55.414000-08:00", + "last_altered": "2025-11-20 12:53:55.569000-08:00", + "comment": "Data migration target table with clustering and time travel", + "columns": [ + { + "column_name": "TARGET_ID", + "data_type": "NUMBER", + "numeric_precision": 38, + "numeric_scale": 0, + "is_nullable": "NO", + "column_default": null, + "comment": null + }, + { + "column_name": "SOURCE_ID", + "data_type": "NUMBER", + "numeric_precision": 38, + "numeric_scale": 0, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "TARGET_NAME", + "data_type": "TEXT", + "character_maximum_length": 200, + "is_nullable": "NO", + "column_default": null, + "comment": null + }, + { + "column_name": "TARGET_TYPE", + "data_type": "TEXT", + "character_maximum_length": 100, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "MIGRATION_DATE", + "data_type": "DATE", + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "MIGRATION_STATUS", + "data_type": "TEXT", + "character_maximum_length": 50, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "TOTAL_RECORDS", + "data_type": "NUMBER", + "numeric_precision": 18, + "numeric_scale": 0, + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "SUCCESS_COUNT", + "data_type": "NUMBER", + "numeric_precision": 10, + "numeric_scale": 0, + "is_nullable": "YES", + "column_default": "0", + "comment": null + }, + { + "column_name": "ERROR_COUNT", + "data_type": "NUMBER", + "numeric_precision": 10, + "numeric_scale": 0, + "is_nullable": "YES", + "column_default": "0", + "comment": null + }, + { + "column_name": "IS_ACTIVE", + "data_type": "BOOLEAN", + "is_nullable": "YES", + "column_default": "TRUE", + "comment": null + }, + { + "column_name": "MIGRATION_CONFIG", + "data_type": "VARIANT", + "is_nullable": "YES", + "column_default": null, + "comment": null + }, + { + "column_name": "CREATED_AT", + "data_type": "TIMESTAMP_LTZ", + "is_nullable": "YES", + "column_default": "CURRENT_TIMESTAMP()", + "comment": null + }, + { + "column_name": "UPDATED_AT", + "data_type": "TIMESTAMP_LTZ", + "is_nullable": "YES", + "column_default": "CURRENT_TIMESTAMP()", + "comment": null + } + ], + "sample_data": [] +} +``` + +**Databricks Output:** +```sql +CREATE TABLE IF NOT EXISTS data_migration_db.data_migration_schema.data_migration_target ( + target_id BIGINT NOT NULL, + source_id BIGINT, + target_name VARCHAR(200) NOT NULL, + target_type VARCHAR(100), + migration_date DATE, + migration_status VARCHAR(50), + total_records BIGINT, + success_count INT DEFAULT 0, + error_count INT DEFAULT 0, + is_active BOOLEAN DEFAULT TRUE, + migration_config STRING, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP(), + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP() +) +COMMENT 'Data migration target table with clustering and time travel'; +``` + +**Data Type Mappings:** +- `NUMBER(38,0)` → `BIGINT` +- `NUMBER(18,0)` → `BIGINT` +- `NUMBER(10,0)` → `INT` +- `NUMBER(precision, scale)` → `DECIMAL(precision, scale)` +- `TEXT(length)` → `VARCHAR(length)` +- `TIMESTAMP_NTZ` → `TIMESTAMP` +- `TIMESTAMP_LTZ` → `TIMESTAMP` +- `DATE` → `DATE` +- `BOOLEAN` → `BOOLEAN` +- `VARIANT` → `STRING` +- `ARRAY` → `ARRAY` +- `OBJECT` → `MAP` + +**Mapping Rule:** Direct equivalent - Permanent tables (BASE TABLE) map to managed tables. + +--- + +### Views (Non-materialized) → Views + +**Snowflake Input:** +```json +{ + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "view_name": "DATA_MIGRATION_MONTHLY_SUMMARY", + "view_definition": "CREATE OR REPLACE VIEW data_migration_monthly_summary AS\nSELECT \n DATE_TRUNC('MONTH', migration_date) AS month,\n COUNT(DISTINCT target_id) AS target_count,\n COUNT(DISTINCT source_id) AS source_count,\n SUM(total_records) AS total_records_migrated,\n AVG(total_records) AS avg_records_per_target\nFROM data_migration_target\nWHERE migration_status = 'COMPLETED'\nGROUP BY DATE_TRUNC('MONTH', migration_date);", + "created": "2025-11-20 12:57:08.768000-08:00", + "comment": null +} +``` + +**Databricks Output:** +```sql +CREATE OR REPLACE VIEW data_migration_db.data_migration_schema.data_migration_monthly_summary AS +SELECT + DATE_TRUNC('MONTH', migration_date) AS month, + COUNT(DISTINCT target_id) AS target_count, + COUNT(DISTINCT source_id) AS source_count, + SUM(total_records) AS total_records_migrated, + AVG(total_records) AS avg_records_per_target +FROM data_migration_db.data_migration_schema.data_migration_target +WHERE migration_status = 'COMPLETED' +GROUP BY DATE_TRUNC('MONTH', migration_date); +``` + +**Mapping Rule:** Direct equivalent - Views map directly. Note: ACL restrictions may need to be enforced via permissions. + +--- + +## Data Transportation & Streaming + +### Stage → Volume + +**Snowflake Input:** +```json +{ + "created_on": "2025-11-20 13:05:15.203000-08:00", + "name": "DATA_MIGRATION_SOURCE_STAGE", + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "url": "", + "has_credentials": "N", + "has_encryption_key": "N", + "owner": "SYSADMIN", + "comment": "Stage for data migration source files", + "region": null, + "type": "INTERNAL", + "cloud": null, + "notification_channel": null, + "storage_integration": null, + "endpoint": null, + "owner_role_type": "ROLE", + "directory_enabled": "Y" +} +``` + +**Databricks Output:** +```sql +CREATE VOLUME IF NOT EXISTS data_migration_db.data_migration_schema.data_migration_source_stage +COMMENT 'Stage for data migration source files'; +-- Note: Schema scoping differences - Databricks volumes are scoped within catalogs/schemas +-- Internal stages map to volumes in Databricks +``` + +**Mapping Rule:** Imperfect match - Schema isn't automatically matched in Volumes. Schema scoping differences apply. + +--- + +### Stream → Delta Change Data Feed + +**Snowflake Input:** +```json +{ + "created_on": "2025-11-20 13:05:49.616000-08:00", + "name": "DATA_MIGRATION_SOURCE_CHANGES", + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "owner": "SYSADMIN", + "comment": "Stream to track data migration source table changes", + "table_name": "DATA_MIGRATION_DB.DATA_MIGRATION_SCHEMA.DATA_MIGRATION_SOURCE", + "source_type": "Table", + "base_tables": "DATA_MIGRATION_DB.DATA_MIGRATION_SCHEMA.DATA_MIGRATION_SOURCE", + "type": "DELTA", + "stale": "false", + "mode": "DEFAULT", + "stale_after": "2025-12-05 08:22:43.058000-08:00", + "invalid_reason": "N/A", + "owner_role_type": "ROLE" +} +``` + +**Databricks Output:** +```sql +-- Enable Change Data Feed on the source table +ALTER TABLE data_migration_db.data_migration_schema.data_migration_source +SET TBLPROPERTIES (delta.enableChangeDataFeed = true); + +-- Create a view to read change data (equivalent to stream) +CREATE VIEW data_migration_db.data_migration_schema.data_migration_source_changes AS +SELECT * FROM table_changes('data_migration_db.data_migration_schema.data_migration_source', 0); +``` + +**Mapping Rule:** Direct equivalent - Streams map to Delta Change Data Feed. + +--- + +### Pipe (Snowpipe) → Auto Loader + +**Snowflake Input:** +```json +{ + "created_on": "2025-11-20 13:05:58.512000-08:00", + "name": "DATA_MIGRATION_SOURCE_PIPE", + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "definition": "COPY INTO data_migration_source\n FROM @data_migration_source_stage\n FILE_FORMAT = data_migration_csv_format", + "owner": "SYSADMIN", + "notification_channel": "sfc-br-ds1-3-customer-stage/33za0000-s/", + "comment": "", + "integration": null, + "pattern": null, + "error_integration": null, + "owner_role_type": "ROLE", + "invalid_reason": null, + "kind": "STAGE" +} +``` + +**Databricks Output:** +```sql +-- Create streaming table with Auto Loader +CREATE OR REFRESH STREAMING TABLE data_migration_db.data_migration_schema.data_migration_source_streaming +AS SELECT * FROM cloud_files( + 's3://bucket/path/', + 'csv', + map( + 'header', 'true', + 'delimiter', ',', + 'skipRows', '1' + ) +); +-- Note: Maps Snowpipe COPY INTO from stage to Auto Loader cloud_files function +``` + +**Mapping Rule:** Direct equivalent - Snowpipe maps to Auto Loader. + +--- + +## Programmatic & Logical Objects + +### Stored Procedure (SQL) → Unity Catalog SQL Procedure + +**Snowflake Input (Simple Procedure):** +```json +{ + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "procedure_name": "DATA_MIGRATION_GET_TARGETS", + "procedure_definition": "\n SELECT \n target_id,\n migration_date,\n migration_status,\n total_records\n FROM data_migration_target\n WHERE source_id = source_id_param\n ORDER BY migration_date DESC;\n", + "created": "2025-11-20 13:02:56.806000-08:00", + "last_altered": "2025-11-20 13:02:56.806000-08:00", + "comment": null +} +``` + +**Databricks Output:** +```sql +CREATE PROCEDURE data_migration_db.data_migration_schema.data_migration_get_targets( + source_id_param BIGINT +) +RETURNS TABLE ( + target_id BIGINT, + migration_date DATE, + migration_status STRING, + total_records BIGINT +) +LANGUAGE SQL +RETURN + SELECT + target_id, + migration_date, + migration_status, + total_records + FROM data_migration_db.data_migration_schema.data_migration_target + WHERE source_id = source_id_param + ORDER BY migration_date DESC; +``` + +--- + +**Snowflake Input (Complex Procedure with Variables):** +```json +{ + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "procedure_name": "DATA_MIGRATION_UPDATE_STATUS", + "procedure_definition": "\nDECLARE\n current_success NUMBER;\n current_error NUMBER;\n new_success NUMBER;\n new_error NUMBER;\nBEGIN\n -- Get current counts\n SELECT success_count, error_count INTO current_success, current_error\n FROM data_migration_target\n WHERE target_id = target_id_param;\n \n -- Calculate new counts\n new_success := current_success + success_count_change;\n new_error := current_error + error_count_change;\n \n -- Update counts\n UPDATE data_migration_target\n SET success_count = new_success,\n error_count = new_error,\n total_records = new_success + new_error,\n updated_at = CURRENT_TIMESTAMP()\n WHERE target_id = target_id_param;\n \n RETURN 'Status updated: Success=' || new_success || ', Error=' || new_error;\nEND;\n", + "created": "2025-11-20 13:03:14.658000-08:00", + "last_altered": "2025-11-20 13:03:14.658000-08:00", + "comment": null +} +``` + +**Databricks Output:** +```sql +CREATE PROCEDURE data_migration_db.data_migration_schema.data_migration_update_status( + target_id_param BIGINT, + success_count_change INT, + error_count_change INT +) +RETURNS STRING +LANGUAGE SQL +BEGIN + DECLARE current_success INT; + DECLARE current_error INT; + DECLARE new_success INT; + DECLARE new_error INT; + + -- Get current counts + SELECT success_count, error_count INTO current_success, current_error + FROM data_migration_db.data_migration_schema.data_migration_target + WHERE target_id = target_id_param; + + -- Calculate new counts + SET new_success = current_success + success_count_change; + SET new_error = current_error + error_count_change; + + -- Update counts + UPDATE data_migration_db.data_migration_schema.data_migration_target + SET success_count = new_success, + error_count = new_error, + total_records = new_success + new_error, + updated_at = CURRENT_TIMESTAMP() + WHERE target_id = target_id_param; + + RETURN CONCAT('Status updated: Success=', CAST(new_success AS STRING), ', Error=', CAST(new_error AS STRING)); +END; +``` + +**Mapping Rule:** Direct equivalent - SQL procedures map directly to Unity Catalog SQL procedures. + +--- + +### UDF (SQL) → Unity Catalog SQL UDF + +**Snowflake Input:** +```json +{ + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "function_name": "DATA_MIGRATION_GET_TOP_SOURCES", + "function_definition": "\n SELECT \n source_id,\n source_name,\n total_records_migrated\n FROM (\n SELECT \n s.source_id,\n s.source_name,\n COALESCE(SUM(t.total_records), 0) AS total_records_migrated,\n ROW_NUMBER() OVER (ORDER BY COALESCE(SUM(t.total_records), 0) DESC) AS rn\n FROM data_migration_source s\n LEFT JOIN data_migration_target t ON s.source_id = t.source_id AND t.migration_status = 'COMPLETED'\n GROUP BY s.source_id, s.source_name\n )\n WHERE rn <= limit_count\n ORDER BY total_records_migrated DESC\n", + "created": "2025-11-20 13:04:39.533000-08:00", + "last_altered": "2025-11-20 13:04:39.533000-08:00", + "comment": null +} +``` + +**Databricks Output:** +```sql +CREATE FUNCTION data_migration_db.data_migration_schema.data_migration_get_top_sources(limit_count INT) +RETURNS TABLE ( + source_id BIGINT, + source_name STRING, + total_records_migrated BIGINT +) +RETURN + SELECT + source_id, + source_name, + total_records_migrated + FROM ( + SELECT + s.source_id, + s.source_name, + COALESCE(SUM(t.total_records), 0) AS total_records_migrated, + ROW_NUMBER() OVER (ORDER BY COALESCE(SUM(t.total_records), 0) DESC) AS rn + FROM data_migration_db.data_migration_schema.data_migration_source s + LEFT JOIN data_migration_db.data_migration_schema.data_migration_target t + ON s.source_id = t.source_id AND t.migration_status = 'COMPLETED' + GROUP BY s.source_id, s.source_name + ) + WHERE rn <= limit_count + ORDER BY total_records_migrated DESC; +``` + +**Mapping Rule:** Direct equivalent - SQL UDFs map directly to Unity Catalog SQL UDFs. + +--- + +### Sequence → Alternative (Identity Column) (REVIEW) + +**Snowflake Input:** +```json +{ + "name": "DATA_MIGRATION_TARGET_ID_SEQ", + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "next_value": 1, + "interval": 1, + "created_on": "2025-11-20 13:04:50.173000-08:00", + "owner": "SYSADMIN", + "comment": "Sequence for data migration target IDs", + "owner_role_type": "ROLE", + "ordered": "N" +} +``` + +**Databricks Output:** +```sql +-- Databricks doesn't support sequences like Snowflake +-- Alternative: Use IDENTITY column in table definition +-- For existing table, sequence usage would need to be replaced with: +-- 1. IDENTITY columns (for new tables) +-- 2. Application-generated IDs/UUIDs +-- 3. Manual sequence management via a sequence table + +-- Example: If creating new table with sequence +CREATE TABLE data_migration_db.data_migration_schema.data_migration_target_new ( + target_id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1), + ... +); +``` + +**Mapping Rule:** Imperfect match - Sequences not directly supported. Use IDENTITY columns or application logic. + +--- + +### File Format → File Format + +**Snowflake Input (CSV Format):** +```json +{ + "created_on": "2025-11-20 13:05:27.056000-08:00", + "name": "DATA_MIGRATION_CSV_FORMAT", + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "type": "CSV", + "owner": "SYSADMIN", + "comment": "CSV file format for data migration", + "format_options": "{\"TYPE\":\"CSV\",\"RECORD_DELIMITER\":\"\\n\",\"FIELD_DELIMITER\":\",\",\"FILE_EXTENSION\":null,\"SKIP_HEADER\":1,\"PARSE_HEADER\":false,\"DATE_FORMAT\":\"AUTO\",\"TIME_FORMAT\":\"AUTO\",\"TIMESTAMP_FORMAT\":\"AUTO\",\"BINARY_FORMAT\":\"HEX\",\"ESCAPE\":\"NONE\",\"ESCAPE_UNENCLOSED_FIELD\":\"\\\\\",\"TRIM_SPACE\":true,\"FIELD_OPTIONALLY_ENCLOSED_BY\":\"\\\"\",\"NULL_IF\":[\"\\\\N\"],\"COMPRESSION\":\"AUTO\",\"ERROR_ON_COLUMN_COUNT_MISMATCH\":false,\"VALIDATE_UTF8\":true,\"SKIP_BLANK_LINES\":false,\"REPLACE_INVALID_CHARACTERS\":false,\"EMPTY_FIELD_AS_NULL\":true,\"SKIP_BYTE_ORDER_MARK\":true,\"ENCODING\":\"UTF8\",\"MULTI_LINE\":true}", + "owner_role_type": "ROLE" +} +``` + +**Databricks Output:** +```sql +CREATE FILE FORMAT IF NOT EXISTS data_migration_db.data_migration_schema.data_migration_csv_format +TYPE = CSV +FIELD_DELIMITER = ',' +RECORD_DELIMITER = '\n' +SKIP_HEADER = 1 +TRIM_SPACE = true +FIELD_OPTIONALLY_ENCLOSED_BY = '"' +NULL_IF = ('\\N') +COMPRESSION = AUTO +ENCODING = UTF8 +COMMENT 'CSV file format for data migration'; +``` + +--- + +**Snowflake Input (JSON Format):** +```json +{ + "created_on": "2025-11-20 13:05:34.371000-08:00", + "name": "DATA_MIGRATION_JSON_FORMAT", + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "type": "JSON", + "owner": "SYSADMIN", + "comment": "JSON file format for data migration", + "format_options": "{\"TYPE\":\"JSON\",\"FILE_EXTENSION\":null,\"DATE_FORMAT\":\"AUTO\",\"TIME_FORMAT\":\"AUTO\",\"TIMESTAMP_FORMAT\":\"AUTO\",\"BINARY_FORMAT\":\"HEX\",\"TRIM_SPACE\":false,\"NULL_IF\":[],\"COMPRESSION\":\"AUTO\",\"ENABLE_OCTAL\":false,\"ALLOW_DUPLICATE\":false,\"STRIP_OUTER_ARRAY\":true,\"STRIP_NULL_VALUES\":false,\"IGNORE_UTF8_ERRORS\":false,\"REPLACE_INVALID_CHARACTERS\":false,\"SKIP_BYTE_ORDER_MARK\":true,\"MULTI_LINE\":true}", + "owner_role_type": "ROLE" +} +``` + +**Databricks Output:** +```sql +CREATE FILE FORMAT IF NOT EXISTS data_migration_db.data_migration_schema.data_migration_json_format +TYPE = JSON +DATE_FORMAT = 'AUTO' +TIME_FORMAT = 'AUTO' +TIMESTAMP_FORMAT = 'AUTO' +COMPRESSION = AUTO +STRIP_OUTER_ARRAY = true +MULTI_LINE = true +COMMENT 'JSON file format for data migration'; +``` + +**Mapping Rule:** Direct equivalent - File formats map directly. + +--- + +## Summary Table + +| Snowflake Object | Databricks Equivalent | Mapping Type | Notes | +|-----------------|----------------------|--------------|-------| +| Database | Catalog | Direct | None | +| Schema | Schema | Direct | None | +| Table (BASE TABLE) | Managed Table | Direct | None | +| View (Non-materialized) | View | Direct | ACL restrictions optional | +| Stage (INTERNAL) | Volume | Imperfect | Schema scoping differences | +| Stream (DELTA) | Delta Change Data Feed | Direct | None | +| Pipe (Snowpipe) | Auto Loader | Direct | None | +| Stored Procedure (SQL) | Unity Catalog SQL Procedure | Direct | None | +| UDF (SQL) | Unity Catalog SQL UDF | Direct | None | +| Sequence | IDENTITY Column / Alternative | Imperfect | Sequences not directly supported | +| File Format (CSV) | File Format | Direct | None | +| File Format (JSON) | File Format | Direct | None | + +--- + +## Notes + +- **Direct Equivalent**: One-to-one mapping with no special considerations +- **Imperfect Match**: Requires additional handling, limitations, or alternative approaches +- All examples use the actual metadata structure format extracted from Snowflake +- Generated SQL follows Databricks Unity Catalog syntax +- Data type conversions are handled automatically by the translation prompts +- Object names are converted to lowercase following Databricks naming conventions +- Full catalog.schema.object qualification is used in Databricks SQL for clarity diff --git a/requirements.txt b/requirements.txt index 1ddcc6c..a62d406 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,22 @@ +# Snowflake dependencies (for data extraction) snowflake-connector-python>=3.0.0 snowflake-snowpark-python>=1.0.0 python-dotenv>=1.0.0 +# Translation graph dependencies +langchain>=0.1.0 +langchain-core>=0.1.0 +langchain-community>=0.1.0 +langgraph>=0.1.0 + +# Databricks LLM support +databricks-langchain>=0.1.0 + +# Environment variable management +python-dotenv>=1.0.0 + +# Testing framework (optional) +pytest>=7.0.0 +pytest-cov>=4.0.0 +pytest-asyncio>=0.21.0 + diff --git a/translation_graph/README.md b/translation_graph/README.md index 2cbd6c8..4c2200f 100644 --- a/translation_graph/README.md +++ b/translation_graph/README.md @@ -16,11 +16,19 @@ translation_graph/ │ └── *_prompts.py # Prompt templates for each node ├── utils/ │ ├── types.py # Shared data types -│ └── llm_utils.py # LLM creation utilities +│ ├── llm_utils.py # LLM creation utilities +│ └── file_processor.py # File-based processing utilities +├── docs/ +│ ├── DATABRICKS.md # Databricks deployment guide +│ └── README.md # Documentation index ├── tests/ -│ └── test_graph.py # Test suite +│ └── integration/ +│ ├── run_example.py # Basic example usage +│ ├── databricks_job_notebook.py # Parameterized job notebook +│ └── databricks_notebook_example.py # Example Databricks notebook ├── graph_builder.py # LangGraph construction -├── run_example.py # Example usage +├── run_file_processor.py # File-based processor entry point +├── databricks_job.py # Databricks job entry point └── requirements.txt # Python dependencies ``` @@ -89,7 +97,62 @@ config = DDLConfig({ ## Usage -### Basic Example +### File-Based Processing (Recommended) + +Process JSON files where each file contains a specific artifact type. The artifact type is determined from the filename (e.g., `tables.json`, `views.json`). + +```bash +# Process a single file +python run_file_processor.py tables.json --batch-size 10 + +# Process multiple files +python run_file_processor.py tables.json views.json schemas.json --batch-size 10 + +# Save results to a file +python run_file_processor.py tables.json --batch-size 10 --output results.json +``` + +**JSON File Structure:** +Each JSON file should have a top-level key matching the artifact type: +```json +{ + "tables": [ + {"database_name": "...", "schema_name": "...", "table_name": "...", ...}, + ... + ] +} +``` + +**Filename conventions:** +- `tables.json`, `table.json` → tables +- `views.json`, `view.json` → views +- `schemas.json`, `schema.json` → schemas +- `procedures.json`, `procedure.json` → procedures +- `roles.json`, `role.json` → roles +- And similar patterns for other artifact types + +**Batching:** +- Large files are automatically split into batches +- Each batch is processed separately and results are aggregated +- Default batch size is 10 artifacts per batch +- Adjust with `--batch-size` parameter + +### Programmatic Usage + +```python +from graph_builder import build_translation_graph +from utils.file_processor import create_batches_from_file + +# Process a JSON file with batching +graph = build_translation_graph() +batches = create_batches_from_file("tables.json", batch_size=10) + +# Process all batches +result = graph.run_batches(batches) +print(result) +``` + +### Basic Example (Direct Batch) ```python from graph_builder import build_translation_graph @@ -144,14 +207,44 @@ The system uses a hierarchical configuration system with: See `config/ddl_config.py` for detailed configuration options. -## Testing +## Testing and Examples + +### Integration Tests -Run the test suite: +Run the comprehensive integration test with example data: ```bash -python -m pytest tests/test_graph.py -v +# Run full workflow test with example JSON files +python tests/integration/test_full_workflow.py ``` +This comprehensive test demonstrates: +- Loading example JSON files from `tests/integration/example_data/` +- Processing tables, views, and schemas through the translation graph +- Real LLM-powered SQL DDL generation +- Result validation and persistence + +### File Processor Example + +Process your own JSON files directly: + +```bash +# Process JSON files with the file processor +python run_file_processor.py tables.json --batch-size 10 + +# Generate SQL files instead of JSON +python databricks_job.py --input-files tables.json --output-format sql --output-path ./output/ +``` + +### Example Data + +Sample JSON files are provided in `tests/integration/example_data/`: +- `tables.json` - Sample table definitions (2 tables) +- `views.json` - Sample view definitions (2 views) +- `schemas.json` - Sample schema definitions (1 schema) + +See [tests/integration/README.md](tests/integration/README.md) for complete documentation. + ## Development To implement real functionality: @@ -170,8 +263,34 @@ The system includes comprehensive error handling: - Translation errors are captured and reported - Graceful degradation with error metadata +## Databricks Deployment + +For using this translation graph in Databricks jobs and pipelines, see the [Databricks Deployment Guide](docs/DATABRICKS.md). + +### Quick Start for Databricks + +```python +from translation_graph.databricks_job import process_translation_job + +result = process_translation_job( + input_files=["dbfs:/FileStore/data/tables.json"], + output_path="dbfs:/FileStore/results/results.json", + batch_size=10 +) +``` + +### Key Features for Databricks + +- **DBFS and Volume support**: Handles `dbfs:/` and `/Volumes/` paths automatically +- **Batch processing**: Processes large files in configurable batches +- **Job integration**: Ready-to-use entry points for Databricks jobs +- **Pipeline support**: Compatible with Databricks Pipelines +- **Unity Catalog**: Works with Unity Catalog Volumes + +See [docs/DATABRICKS.md](docs/DATABRICKS.md) for complete deployment instructions. + ## Requirements - Python 3.7+ - LangChain ecosystem -- OpenAI API or Databricks endpoint access +- Databricks endpoint access (for LLM calls) diff --git a/translation_graph/config/ddl_config.py b/translation_graph/config/ddl_config.py index c880139..bf845b3 100644 --- a/translation_graph/config/ddl_config.py +++ b/translation_graph/config/ddl_config.py @@ -39,6 +39,24 @@ class DDLConfig: "endpoint": "databricks-llama-4-maverick" } }, + "database_translator": { + "provider": "databricks", + "model": "databricks-llama-4-maverick", + "temperature": 0.2, + "max_tokens": 4000, + "additional_params": { + "endpoint": "databricks-llama-4-maverick" + } + }, + "schemas_translator": { + "provider": "databricks", + "model": "databricks-llama-4-maverick", + "temperature": 0.2, + "max_tokens": 4000, + "additional_params": { + "endpoint": "databricks-llama-4-maverick" + } + }, "tables_translator": { "provider": "databricks", "model": "databricks-llama-4-maverick", @@ -57,7 +75,7 @@ class DDLConfig: "endpoint": "databricks-llama-4-maverick" } }, - "schemas_translator": { + "stages_translator": { "provider": "databricks", "model": "databricks-llama-4-maverick", "temperature": 0.2, @@ -66,7 +84,16 @@ class DDLConfig: "endpoint": "databricks-llama-4-maverick" } }, - "procedures_translator": { + "streams_translator": { + "provider": "databricks", + "model": "databricks-llama-4-maverick", + "temperature": 0.2, + "max_tokens": 4000, + "additional_params": { + "endpoint": "databricks-llama-4-maverick" + } + }, + "pipes_translator": { "provider": "databricks", "model": "databricks-llama-4-maverick", "temperature": 0.2, @@ -83,6 +110,87 @@ class DDLConfig: "additional_params": { "endpoint": "databricks-llama-4-maverick" } + }, + "grants_translator": { + "provider": "databricks", + "model": "databricks-llama-4-maverick", + "temperature": 0.2, + "max_tokens": 4000, + "additional_params": { + "endpoint": "databricks-llama-4-maverick" + } + }, + "tags_translator": { + "provider": "databricks", + "model": "databricks-llama-4-maverick", + "temperature": 0.2, + "max_tokens": 4000, + "additional_params": { + "endpoint": "databricks-llama-4-maverick" + } + }, + "comments_translator": { + "provider": "databricks", + "model": "databricks-llama-4-maverick", + "temperature": 0.2, + "max_tokens": 4000, + "additional_params": { + "endpoint": "databricks-llama-4-maverick" + } + }, + "masking_policies_translator": { + "provider": "databricks", + "model": "databricks-llama-4-maverick", + "temperature": 0.2, + "max_tokens": 4000, + "additional_params": { + "endpoint": "databricks-llama-4-maverick" + } + }, + "udfs_translator": { + "provider": "databricks", + "model": "databricks-llama-4-maverick", + "temperature": 0.2, + "max_tokens": 4000, + "additional_params": { + "endpoint": "databricks-llama-4-maverick" + } + }, + "procedures_translator": { + "provider": "databricks", + "model": "databricks-llama-4-maverick", + "temperature": 0.2, + "max_tokens": 4000, + "additional_params": { + "endpoint": "databricks-llama-4-maverick" + } + }, + "sequences_translator": { + "provider": "databricks", + "model": "databricks-llama-4-maverick", + "temperature": 0.2, + "max_tokens": 4000, + "additional_params": { + "endpoint": "databricks-llama-4-maverick" + } + }, + "file_formats_translator": { + "provider": "databricks", + "model": "databricks-llama-4-maverick", + "temperature": 0.2, + "max_tokens": 4000, + "additional_params": { + "endpoint": "databricks-llama-4-maverick" + } + }, + "external_locations_translator": { + "provider": "databricks", + "model": "databricks-llama-4-maverick", + "temperature": 0.2, + "max_tokens": 4000, + "additional_params": { + "endpoint": "databricks-llama-4-maverick" + } } }, "processing": { diff --git a/translation_graph/databricks_job.py b/translation_graph/databricks_job.py new file mode 100644 index 0000000..929dc9d --- /dev/null +++ b/translation_graph/databricks_job.py @@ -0,0 +1,392 @@ +#!/usr/bin/env python3 +""" +Databricks Job Entry Point for Translation Graph + +This module provides a Databricks-compatible interface for running the translation graph +in Databricks jobs and pipelines. It handles DBFS paths, Volumes, and Databricks-specific +configuration. +""" + +import os +import json +import sys +from typing import List, Dict, Any, Optional +from pathlib import Path + +# Add current directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from graph_builder import build_translation_graph +from utils.file_processor import create_batches_from_file, process_files +from utils.types import ArtifactBatch + + +def get_dbfs_path(filepath: str) -> str: + """ + Convert a Databricks path (dbfs:/ or /Volumes/) to a local filesystem path. + + Args: + filepath: Path that may be in DBFS or Volumes format + + Returns: + Local filesystem path + """ + if filepath.startswith("dbfs:/"): + return filepath.replace("dbfs:/", "/dbfs/") + elif filepath.startswith("/Volumes/"): + return filepath + elif filepath.startswith("/dbfs/"): + return filepath + else: + return filepath + + +def process_translation_job( + input_files: List[str], + output_path: Optional[str] = None, + batch_size: int = 10, + context: Optional[Dict[str, Any]] = None, + output_format: str = "json" +) -> Dict[str, Any]: + """ + Main entry point for Databricks job processing. + + Args: + input_files: List of JSON file paths (supports dbfs:/, /Volumes/, or local paths) + output_path: Optional output path for results (default: writes to job output) + batch_size: Number of artifacts per batch + context: Optional context dictionary + + Returns: + Translation results dictionary + """ + if context is None: + context = {} + + context["job_type"] = "databricks" + context["batch_size"] = batch_size + + print(f"Starting translation job") + print(f"Input files: {input_files}") + print(f"Batch size: {batch_size}") + print(f"Output path: {output_path}") + print(f"Output format: {output_format}") + print("-" * 50) + + graph = build_translation_graph() + + all_batches = [] + for filepath in input_files: + local_path = get_dbfs_path(filepath) + print(f"Processing file: {local_path}") + + if not os.path.exists(local_path): + raise FileNotFoundError(f"File not found: {local_path}") + + batches = create_batches_from_file(local_path, batch_size, context) + all_batches.extend(batches) + print(f" Created {len(batches)} batch(es) from {local_path}") + + print(f"\nTotal batches to process: {len(all_batches)}") + print("-" * 50) + + result = graph.run_batches(all_batches) + + if output_path: + if output_format == "json": + output_local_path = get_dbfs_path(output_path) + output_dir = os.path.dirname(output_local_path) + if output_dir and not os.path.exists(output_dir): + os.makedirs(output_dir, exist_ok=True) + + with open(output_local_path, 'w', encoding='utf-8') as f: + json.dump(result, f, indent=2, default=str) + + print(f"\nJSON results saved to: {output_local_path}") + elif output_format == "sql": + save_sql_files(result, output_path) + + return result + + +def process_from_dbutils( + input_files: List[str], + output_path: Optional[str] = None, + batch_size: int = 10, + output_format: str = "json" +) -> Dict[str, Any]: + """ + Process translation job using dbutils for file access. + + This is useful when files are in DBFS and you want to use dbutils.fs operations. + + Args: + input_files: List of file paths (dbfs:/ paths) + output_path: Optional output path + batch_size: Number of artifacts per batch + + Returns: + Translation results dictionary + """ + try: + import dbutils + except ImportError: + raise ImportError( + "dbutils not available. Use process_translation_job() instead or run in Databricks environment." + ) + + context = { + "job_type": "databricks", + "batch_size": batch_size, + "using_dbutils": True + } + + graph = build_translation_graph() + all_batches = [] + + for filepath in input_files: + print(f"Reading file: {filepath}") + + if filepath.startswith("dbfs:/"): + content = dbutils.fs.head(filepath) + local_temp_path = f"/tmp/{os.path.basename(filepath)}" + + with open(local_temp_path, 'w', encoding='utf-8') as f: + f.write(content) + + batches = create_batches_from_file(local_temp_path, batch_size, context) + all_batches.extend(batches) + + os.remove(local_temp_path) + else: + batches = create_batches_from_file(filepath, batch_size, context) + all_batches.extend(batches) + + result = graph.run_batches(all_batches) + + if output_path: + if output_format == "json": + output_json = json.dumps(result, indent=2, default=str) + dbutils.fs.put(output_path, output_json) + print(f"JSON results saved to: {output_path}") + elif output_format == "sql": + save_sql_files_dbutils(result, output_path) + + return result + + +def process_from_volume( + volume_path: str, + artifact_types: Optional[List[str]] = None, + output_path: Optional[str] = None, + batch_size: int = 10, + output_format: str = "json" +) -> Dict[str, Any]: + """ + Process all JSON files from a Volume directory. + + Args: + volume_path: Path to Volume directory (e.g., /Volumes/catalog/schema/volume_name/) + artifact_types: Optional list of artifact types to process (if None, processes all) + output_path: Optional output path for results + batch_size: Number of artifacts per batch + + Returns: + Translation results dictionary + """ + if not volume_path.startswith("/Volumes/"): + raise ValueError(f"Volume path must start with /Volumes/, got: {volume_path}") + + if not os.path.exists(volume_path): + raise FileNotFoundError(f"Volume path not found: {volume_path}") + + json_files = [] + for file in os.listdir(volume_path): + if file.endswith('.json'): + if artifact_types is None: + json_files.append(os.path.join(volume_path, file)) + else: + file_lower = file.lower() + if any(artifact_type.lower() in file_lower for artifact_type in artifact_types): + json_files.append(os.path.join(volume_path, file)) + + if not json_files: + print(f"No JSON files found in {volume_path}") + return {"metadata": {"total_results": 0, "errors": []}} + + print(f"Found {len(json_files)} JSON file(s) to process") + return process_translation_job(json_files, output_path, batch_size) + + + +def save_sql_files(result: Dict[str, Any], output_base_path: str): + """ + Save SQL results as separate SQL files for each artifact type. + + Args: + result: Translation results dictionary + output_base_path: Base path where SQL files will be saved + """ + output_local_path = get_dbfs_path(output_base_path) + output_dir = os.path.dirname(output_local_path) if os.path.isfile(output_local_path) else output_local_path + + if not os.path.exists(output_dir): + os.makedirs(output_dir, exist_ok=True) + + artifact_types = ['tables', 'views', 'schemas', 'databases', 'sequences', 'stages', 'streams', 'pipes', 'roles', 'grants', 'tags', 'comments', 'masking_policies', 'udfs', 'procedures', 'external_locations', 'file_formats'] + + total_sql_files = 0 + total_sql_statements = 0 + + for artifact_type in artifact_types: + if artifact_type in result and result[artifact_type]: + sql_statements = result[artifact_type] + if sql_statements: + # Create SQL file name + sql_filename = f"{artifact_type}.sql" + sql_filepath = os.path.join(output_dir, sql_filename) + + with open(sql_filepath, 'w', encoding='utf-8') as f: + f.write(f"-- {artifact_type.upper()} DDL - Generated by Translation Graph\n") + f.write(f"-- Generated: {os.path.basename(output_base_path)}\n\n") + + for i, sql in enumerate(sql_statements, 1): + # Clean SQL (remove markdown code blocks if present) + clean_sql = sql + if clean_sql.startswith('```sql'): + clean_sql = clean_sql[6:] + if clean_sql.endswith('```'): + clean_sql = clean_sql[:-3] + clean_sql = clean_sql.strip() + + f.write(f"-- Statement {i}\n") + f.write(clean_sql) + f.write(";\n\n") + + total_sql_files += 1 + total_sql_statements += len(sql_statements) + print(f" ✓ Saved {len(sql_statements)} {artifact_type} SQL statements to {sql_filename}") + + print(f"\nSQL files saved to: {output_dir}") + print(f"Total SQL files: {total_sql_files}") + print(f"Total SQL statements: {total_sql_statements}") + + +def save_sql_files_dbutils(result: Dict[str, Any], output_base_path: str): + """ + Save SQL results as separate SQL files using dbutils. + + Args: + result: Translation results dictionary + output_base_path: Base path where SQL files will be saved (dbfs:/ paths) + """ + import dbutils + + # Ensure output_base_path ends with a directory separator + if not output_base_path.endswith('/'): + output_base_path += '/' + + artifact_types = ['tables', 'views', 'schemas', 'databases', 'sequences', 'stages', 'streams', 'pipes', 'roles', 'grants', 'tags', 'comments', 'masking_policies', 'udfs', 'procedures', 'external_locations', 'file_formats'] + + total_sql_files = 0 + total_sql_statements = 0 + + for artifact_type in artifact_types: + if artifact_type in result and result[artifact_type]: + sql_statements = result[artifact_type] + if sql_statements: + # Create SQL file path + sql_filename = f"{artifact_type}.sql" + sql_filepath = f"{output_base_path}{sql_filename}" + + sql_content = f"-- {artifact_type.upper()} DDL - Generated by Translation Graph\n" + sql_content += f"-- Generated: {output_base_path}\n\n" + + for i, sql in enumerate(sql_statements, 1): + # Clean SQL (remove markdown code blocks if present) + clean_sql = sql + if clean_sql.startswith('```sql'): + clean_sql = clean_sql[6:] + if clean_sql.endswith('```'): + clean_sql = clean_sql[:-3] + clean_sql = clean_sql.strip() + + sql_content += f"-- Statement {i}\n" + sql_content += clean_sql + sql_content += ";\n\n" + + dbutils.fs.put(sql_filepath, sql_content) + + total_sql_files += 1 + total_sql_statements += len(sql_statements) + print(f" ✓ Saved {len(sql_statements)} {artifact_type} SQL statements to {sql_filename}") + + print(f"\nSQL files saved to: {output_base_path}") + print(f"Total SQL files: {total_sql_files}") + print(f"Total SQL statements: {total_sql_statements}") + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser( + description="Databricks translation job entry point" + ) + parser.add_argument( + "--input-files", + nargs="+", + required=True, + help="Input JSON files (supports dbfs:/, /Volumes/, or local paths)" + ) + parser.add_argument( + "--output-path", + type=str, + help="Output path for results (JSON or directory for SQL files)" + ) + parser.add_argument( + "--batch-size", + type=int, + default=10, + help="Number of artifacts per batch (default: 10)" + ) + parser.add_argument( + "--volume-path", + type=str, + help="Process all JSON files from a Volume directory" + ) + parser.add_argument( + "--artifact-types", + nargs="+", + help="Filter artifact types when using --volume-path" + ) + parser.add_argument( + "--output-format", + choices=["json", "sql"], + default="json", + help="Output format: json or sql files (default: json)" + ) + + args = parser.parse_args() + + if args.volume_path: + result = process_from_volume( + args.volume_path, + args.artifact_types, + args.output_path, + args.batch_size, + args.output_format + ) + else: + result = process_translation_job( + args.input_files, + args.output_path, + args.batch_size, + output_format=args.output_format + ) + + print("\n" + "=" * 50) + print("Translation Job Completed") + print("=" * 50) + print(f"Total results: {result.get('metadata', {}).get('total_results', 0)}") + print(f"Errors: {len(result.get('metadata', {}).get('errors', []))}") + + diff --git a/translation_graph/databricks_job.py.backup b/translation_graph/databricks_job.py.backup new file mode 100644 index 0000000..76f1ec8 --- /dev/null +++ b/translation_graph/databricks_job.py.backup @@ -0,0 +1,391 @@ +#!/usr/bin/env python3 +""" +Databricks Job Entry Point for Translation Graph + +This module provides a Databricks-compatible interface for running the translation graph +in Databricks jobs and pipelines. It handles DBFS paths, Volumes, and Databricks-specific +configuration. +""" + +import os +import json +import sys +from typing import List, Dict, Any, Optional +from pathlib import Path + +# Add current directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from graph_builder import build_translation_graph +from utils.file_processor import create_batches_from_file, process_files +from utils.types import ArtifactBatch + + +def get_dbfs_path(filepath: str) -> str: + """ + Convert a Databricks path (dbfs:/ or /Volumes/) to a local filesystem path. + + Args: + filepath: Path that may be in DBFS or Volumes format + + Returns: + Local filesystem path + """ + if filepath.startswith("dbfs:/"): + return filepath.replace("dbfs:/", "/dbfs/") + elif filepath.startswith("/Volumes/"): + return filepath + elif filepath.startswith("/dbfs/"): + return filepath + else: + return filepath + + +def process_translation_job( + input_files: List[str], + output_path: Optional[str] = None, + batch_size: int = 10, + context: Optional[Dict[str, Any]] = None, + output_format: str = "json" +) -> Dict[str, Any]: + """ + Main entry point for Databricks job processing. + + Args: + input_files: List of JSON file paths (supports dbfs:/, /Volumes/, or local paths) + output_path: Optional output path for results (default: writes to job output) + batch_size: Number of artifacts per batch + context: Optional context dictionary + + Returns: + Translation results dictionary + """ + if context is None: + context = {} + + context["job_type"] = "databricks" + context["batch_size"] = batch_size + + print(f"Starting translation job") + print(f"Input files: {input_files}") + print(f"Batch size: {batch_size}") + print(f"Output path: {output_path}") + print(f"Output format: {output_format}") + print("-" * 50) + + graph = build_translation_graph() + + all_batches = [] + for filepath in input_files: + local_path = get_dbfs_path(filepath) + print(f"Processing file: {local_path}") + + if not os.path.exists(local_path): + raise FileNotFoundError(f"File not found: {local_path}") + + batches = create_batches_from_file(local_path, batch_size, context) + all_batches.extend(batches) + print(f" Created {len(batches)} batch(es) from {local_path}") + + print(f"\nTotal batches to process: {len(all_batches)}") + print("-" * 50) + + result = graph.run_batches(all_batches) + + if output_path: + if output_format == "json": + output_local_path = get_dbfs_path(output_path) + output_dir = os.path.dirname(output_local_path) + if output_dir and not os.path.exists(output_dir): + os.makedirs(output_dir, exist_ok=True) + + with open(output_local_path, 'w', encoding='utf-8') as f: + json.dump(result, f, indent=2, default=str) + + print(f"\nJSON results saved to: {output_local_path}") + elif output_format == "sql": + save_sql_files(result, output_path) + + return result + + +def process_from_dbutils( + input_files: List[str], + output_path: Optional[str] = None, + batch_size: int = 10, + output_format: str = "json" +) -> Dict[str, Any]: + """ + Process translation job using dbutils for file access. + + This is useful when files are in DBFS and you want to use dbutils.fs operations. + + Args: + input_files: List of file paths (dbfs:/ paths) + output_path: Optional output path + batch_size: Number of artifacts per batch + + Returns: + Translation results dictionary + """ + try: + import dbutils + except ImportError: + raise ImportError( + "dbutils not available. Use process_translation_job() instead or run in Databricks environment." + ) + + context = { + "job_type": "databricks", + "batch_size": batch_size, + "using_dbutils": True + } + + graph = build_translation_graph() + all_batches = [] + + for filepath in input_files: + print(f"Reading file: {filepath}") + + if filepath.startswith("dbfs:/"): + content = dbutils.fs.head(filepath) + local_temp_path = f"/tmp/{os.path.basename(filepath)}" + + with open(local_temp_path, 'w', encoding='utf-8') as f: + f.write(content) + + batches = create_batches_from_file(local_temp_path, batch_size, context) + all_batches.extend(batches) + + os.remove(local_temp_path) + else: + batches = create_batches_from_file(filepath, batch_size, context) + all_batches.extend(batches) + + result = graph.run_batches(all_batches) + + if output_path: + if output_format == "json": + output_json = json.dumps(result, indent=2, default=str) + dbutils.fs.put(output_path, output_json) + print(f"JSON results saved to: {output_path}") + elif output_format == "sql": + save_sql_files_dbutils(result, output_path) + + return result + + +def process_from_volume( + volume_path: str, + artifact_types: Optional[List[str]] = None, + output_path: Optional[str] = None, + batch_size: int = 10, + output_format: str = "json" +) -> Dict[str, Any]: + """ + Process all JSON files from a Volume directory. + + Args: + volume_path: Path to Volume directory (e.g., /Volumes/catalog/schema/volume_name/) + artifact_types: Optional list of artifact types to process (if None, processes all) + output_path: Optional output path for results + batch_size: Number of artifacts per batch + + Returns: + Translation results dictionary + """ + if not volume_path.startswith("/Volumes/"): + raise ValueError(f"Volume path must start with /Volumes/, got: {volume_path}") + + if not os.path.exists(volume_path): + raise FileNotFoundError(f"Volume path not found: {volume_path}") + + json_files = [] + for file in os.listdir(volume_path): + if file.endswith('.json'): + if artifact_types is None: + json_files.append(os.path.join(volume_path, file)) + else: + file_lower = file.lower() + if any(artifact_type.lower() in file_lower for artifact_type in artifact_types): + json_files.append(os.path.join(volume_path, file)) + + if not json_files: + print(f"No JSON files found in {volume_path}") + return {"metadata": {"total_results": 0, "errors": []}} + + print(f"Found {len(json_files)} JSON file(s) to process") + return process_translation_job(json_files, output_path, batch_size) + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser( + description="Databricks translation job entry point" + ) + parser.add_argument( + "--input-files", + nargs="+", + required=True, + help="Input JSON files (supports dbfs:/, /Volumes/, or local paths)" + ) + parser.add_argument( + "--output-path", + type=str, + help="Output path for results (JSON or directory for SQL files)" + ) + parser.add_argument( + "--batch-size", + type=int, + default=10, + help="Number of artifacts per batch (default: 10)" + ) + parser.add_argument( + "--volume-path", + type=str, + help="Process all JSON files from a Volume directory" + ) + parser.add_argument( + "--artifact-types", + nargs="+", + help="Filter artifact types when using --volume-path" + ) + parser.add_argument( + "--output-format", + choices=["json", "sql"], + default="json", + help="Output format: json or sql files (default: json)" + ) + + args = parser.parse_args() + + if args.volume_path: + result = process_from_volume( + args.volume_path, + args.artifact_types, + args.output_path, + args.batch_size, + args.output_format + ) + else: + result = process_translation_job( + args.input_files, + args.output_path, + args.batch_size, + output_format=args.output_format + ) + + print("\n" + "=" * 50) + print("Translation Job Completed") + print("=" * 50) + print(f"Total results: {result.get('metadata', {}).get('total_results', 0)}") + print(f"Errors: {len(result.get('metadata', {}).get('errors', []))}") + + +def save_sql_files(result: Dict[str, Any], output_base_path: str): + """ + Save SQL results as separate SQL files for each artifact type. + + Args: + result: Translation results dictionary + output_base_path: Base path where SQL files will be saved + """ + output_local_path = get_dbfs_path(output_base_path) + output_dir = os.path.dirname(output_local_path) if os.path.isfile(output_local_path) else output_local_path + + if not os.path.exists(output_dir): + os.makedirs(output_dir, exist_ok=True) + + artifact_types = ['tables', 'views', 'schemas', 'databases', 'sequences', 'stages', 'streams', 'pipes', 'roles', 'grants', 'tags', 'comments', 'masking_policies', 'udfs', 'procedures', 'external_locations', 'file_formats'] + + total_sql_files = 0 + total_sql_statements = 0 + + for artifact_type in artifact_types: + if artifact_type in result and result[artifact_type]: + sql_statements = result[artifact_type] + if sql_statements: + # Create SQL file name + sql_filename = f"{artifact_type}.sql" + sql_filepath = os.path.join(output_dir, sql_filename) + + with open(sql_filepath, 'w', encoding='utf-8') as f: + f.write(f"-- {artifact_type.upper()} DDL - Generated by Translation Graph\n") + f.write(f"-- Generated: {os.path.basename(output_base_path)}\n\n") + + for i, sql in enumerate(sql_statements, 1): + # Clean SQL (remove markdown code blocks if present) + clean_sql = sql + if clean_sql.startswith('```sql'): + clean_sql = clean_sql[6:] + if clean_sql.endswith('```'): + clean_sql = clean_sql[:-3] + clean_sql = clean_sql.strip() + + f.write(f"-- Statement {i}\n") + f.write(clean_sql) + f.write(";\n\n") + + total_sql_files += 1 + total_sql_statements += len(sql_statements) + print(f" ✓ Saved {len(sql_statements)} {artifact_type} SQL statements to {sql_filename}") + + print(f"\nSQL files saved to: {output_dir}") + print(f"Total SQL files: {total_sql_files}") + print(f"Total SQL statements: {total_sql_statements}") + + +def save_sql_files_dbutils(result: Dict[str, Any], output_base_path: str): + """ + Save SQL results as separate SQL files using dbutils. + + Args: + result: Translation results dictionary + output_base_path: Base path where SQL files will be saved (dbfs:/ paths) + """ + import dbutils + + # Ensure output_base_path ends with a directory separator + if not output_base_path.endswith('/'): + output_base_path += '/' + + artifact_types = ['tables', 'views', 'schemas', 'databases', 'sequences', 'stages', 'streams', 'pipes', 'roles', 'grants', 'tags', 'comments', 'masking_policies', 'udfs', 'procedures', 'external_locations', 'file_formats'] + + total_sql_files = 0 + total_sql_statements = 0 + + for artifact_type in artifact_types: + if artifact_type in result and result[artifact_type]: + sql_statements = result[artifact_type] + if sql_statements: + # Create SQL file path + sql_filename = f"{artifact_type}.sql" + sql_filepath = f"{output_base_path}{sql_filename}" + + sql_content = f"-- {artifact_type.upper()} DDL - Generated by Translation Graph\n" + sql_content += f"-- Generated: {output_base_path}\n\n" + + for i, sql in enumerate(sql_statements, 1): + # Clean SQL (remove markdown code blocks if present) + clean_sql = sql + if clean_sql.startswith('```sql'): + clean_sql = clean_sql[6:] + if clean_sql.endswith('```'): + clean_sql = clean_sql[:-3] + clean_sql = clean_sql.strip() + + sql_content += f"-- Statement {i}\n" + sql_content += clean_sql + sql_content += ";\n\n" + + dbutils.fs.put(sql_filepath, sql_content) + + total_sql_files += 1 + total_sql_statements += len(sql_statements) + print(f" ✓ Saved {len(sql_statements)} {artifact_type} SQL statements to {sql_filename}") + + print(f"\nSQL files saved to: {output_base_path}") + print(f"Total SQL files: {total_sql_files}") + print(f"Total SQL statements: {total_sql_statements}") + diff --git a/translation_graph/docs/DATABRICKS.md b/translation_graph/docs/DATABRICKS.md new file mode 100644 index 0000000..f12a86c --- /dev/null +++ b/translation_graph/docs/DATABRICKS.md @@ -0,0 +1,360 @@ +# Databricks Deployment Guide + +Complete guide for deploying and using the Translation Graph in Databricks jobs and pipelines. + +## Quick Start + +### Installation + +**Option 1: Upload to DBFS (Quickest)** +```bash +cd translation_graph +zip -r translation_graph.zip . -x "*.pyc" "__pycache__/*" "*.git*" "*.md" +databricks fs cp translation_graph.zip dbfs:/FileStore/libraries/translation_graph.zip +``` +Then install in cluster: Libraries → Install New → DBFS → `translation_graph.zip` + +**Option 2: Use Repos (Recommended for CI/CD)** +1. Workspace → Repos → Add Repo → Connect Git repository +2. Files are automatically available in notebooks + +**Option 3: Direct Upload** +```python +# Upload translation_graph folder to DBFS or Volume, then: +import sys +sys.path.insert(0, '/dbfs/FileStore/translation_graph') +# or +sys.path.insert(0, '/Volumes/catalog/schema/volume_name/translation_graph') +``` + +### Basic Usage + +```python +from translation_graph.databricks_job import process_translation_job + +result = process_translation_job( + input_files=["dbfs:/FileStore/data/tables.json"], + output_path="dbfs:/FileStore/results/results.json", + batch_size=10 +) +``` + +## Deployment Options + +### In Databricks Notebooks + +```python +# Setup +import sys +sys.path.insert(0, '/dbfs/FileStore/translation_graph') + +from translation_graph.databricks_job import process_translation_job + +# Process files +input_files = [ + "dbfs:/FileStore/data/tables.json", + "dbfs:/FileStore/data/views.json" +] + +result = process_translation_job( + input_files=input_files, + output_path="dbfs:/FileStore/results/translation_results.json", + batch_size=10 +) + +# Display results +print(f"Total results: {result['metadata']['total_results']}") +``` + +### In Databricks Jobs + +1. **Create Job:** + - Jobs → Create Job → Notebook Task + - Notebook: `/Workspace/Users/your_email/translation_job_notebook` + +2. **Notebook Code:** + ```python + # Get parameters from job widgets + input_files = [f.strip() for f in dbutils.widgets.get("input_files").split(",")] + output_path = dbutils.widgets.get("output_path") + batch_size = int(dbutils.widgets.get("batch_size", "10")) + + from translation_graph.databricks_job import process_translation_job + + result = process_translation_job( + input_files=input_files, + output_path=output_path, + batch_size=batch_size + ) + + print(f"Translation completed: {result['metadata']['total_results']} results") + ``` + +3. **Job Parameters:** + - `input_files`: `dbfs:/FileStore/data/tables.json,dbfs:/FileStore/data/views.json` + - `output_path`: `dbfs:/FileStore/results/results.json` + - `batch_size`: `10` + +### In Databricks Pipelines + +```python +from translation_graph.databricks_job import process_translation_job + +# Get pipeline parameters +input_files = spark.conf.get("input_files", "").split(",") +output_path = spark.conf.get("output_path") +batch_size = int(spark.conf.get("batch_size", "10")) + +# Process translation +result = process_translation_job( + input_files=input_files, + output_path=output_path, + batch_size=batch_size +) + +# Save to Delta table (optional) +from pyspark.sql import Row +from pyspark.sql.functions import current_timestamp + +rows = [] +for artifact_type, artifacts in result.items(): + if artifact_type != "metadata" and isinstance(artifacts, list): + for artifact in artifacts: + rows.append(Row( + artifact_type=artifact_type, + artifact_ddl=artifact, + processed_at=current_timestamp() + )) + +df = spark.createDataFrame(rows) +df.write.format("delta").mode("overwrite").saveAsTable("translation_results") +``` + +## Common Usage Patterns + +### Process from Volume + +```python +from translation_graph.databricks_job import process_from_volume + +result = process_from_volume( + volume_path="/Volumes/main/default/migration_data/", + artifact_types=["tables", "views"], # Optional filter + output_path="dbfs:/FileStore/results/results.json", + batch_size=10 +) +``` + +### Process Multiple Files + +```python +input_files = [ + "dbfs:/FileStore/data/tables.json", + "dbfs:/FileStore/data/views.json", + "dbfs:/FileStore/data/schemas.json" +] + +result = process_translation_job( + input_files=input_files, + output_path="dbfs:/FileStore/results/all_results.json", + batch_size=10 +) +``` + +### Save Results to Delta Table + +```python +from pyspark.sql import Row +from pyspark.sql.functions import current_timestamp + +rows = [] +for artifact_type, artifacts in result.items(): + if artifact_type != "metadata" and isinstance(artifacts, list): + for artifact in artifacts: + rows.append(Row( + artifact_type=artifact_type, + artifact_ddl=artifact, + processed_at=current_timestamp() + )) + +df = spark.createDataFrame(rows) +df.write.format("delta").mode("overwrite").saveAsTable("translation_results") +``` + +## Configuration + +### Environment Variables + +Set in cluster environment variables or job configuration: + +```bash +# Required: Databricks LLM endpoint +DBX_ENDPOINT=databricks-llama-4-maverick + +# Optional configuration +DDL_BATCH_SIZE=10 +DDL_MAX_CONCURRENT=5 +DDL_TIMEOUT=300 +DDL_OUTPUT_DIR=/dbfs/FileStore/results +``` + +### Required Libraries + +Add to cluster: +- `langchain>=0.1.0` +- `langchain-core>=0.1.0` +- `langchain-community>=0.1.0` +- `databricks-langchain>=0.1.0` +- `python-dotenv>=1.0.0` + +### Cluster Configuration + +**Recommended settings:** +- **Runtime:** Databricks Runtime 13.3 LTS or later +- **Node Type:** Standard (i3.xlarge or larger for large files) +- **Workers:** 2-4 workers (for parallel processing) + +## File Path Formats + +- **DBFS:** `dbfs:/FileStore/data/tables.json` (automatically converted to `/dbfs/FileStore/data/tables.json`) +- **Volumes:** `/Volumes/catalog/schema/volume_name/tables.json` (direct access) +- **Local:** `/tmp/tables.json` (used as-is) + +### Reading from Different Sources + +**From DBFS:** +```python +input_files = ["dbfs:/FileStore/data/tables.json"] +``` + +**From Volume:** +```python +input_files = ["/Volumes/main/default/migration_data/tables.json"] +``` + +**From External Location:** +```python +# First copy to DBFS or Volume +dbutils.fs.cp("s3://bucket/data/tables.json", "dbfs:/FileStore/data/tables.json") +input_files = ["dbfs:/FileStore/data/tables.json"] +``` + +## Complete Workflow Example + +### Step 1: Prepare Input Files + +```python +# Upload JSON files to DBFS or Volume +dbutils.fs.cp( + "s3://your-bucket/snowflake_objects/tables.json", + "dbfs:/FileStore/migration/tables.json" +) +``` + +### Step 2: Run Translation + +```python +from translation_graph.databricks_job import process_translation_job + +result = process_translation_job( + input_files=[ + "dbfs:/FileStore/migration/tables.json", + "dbfs:/FileStore/migration/views.json" + ], + output_path="dbfs:/FileStore/migration/results/translation_results.json", + batch_size=10 +) +``` + +### Step 3: Save Results to Delta Table + +```python +import json +from pyspark.sql import Row +from pyspark.sql.functions import current_timestamp + +# Read results +with open("/dbfs/FileStore/migration/results/translation_results.json", "r") as f: + results = json.load(f) + +# Convert to DataFrame +rows = [] +for artifact_type, artifacts in results.items(): + if artifact_type != "metadata" and isinstance(artifacts, list): + for artifact in artifacts: + rows.append(Row( + artifact_type=artifact_type, + artifact_ddl=artifact, + processed_at=current_timestamp() + )) + +df = spark.createDataFrame(rows) +df.write.format("delta").mode("overwrite").saveAsTable("translation_results") +``` + +### Step 4: Execute Translated DDL (Optional) + +```python +# Execute translated DDL statements +for artifact_type, artifacts in result.items(): + if artifact_type != "metadata" and isinstance(artifacts, list): + for ddl in artifacts: + try: + spark.sql(ddl) + print(f"✓ Executed {artifact_type} DDL") + except Exception as e: + print(f"✗ Error executing {artifact_type}: {str(e)}") +``` + +## Troubleshooting + +### Common Issues + +1. **Import errors:** + ```python + # Make sure path is correct + import sys + sys.path.insert(0, '/dbfs/FileStore/translation_graph') + ``` + +2. **File not found:** + - Verify path format: `dbfs:/` vs `/dbfs/` + - Check file permissions + - Ensure Volume access is granted + +3. **LLM endpoint errors:** + ```python + # Set environment variable + import os + os.environ['DBX_ENDPOINT'] = 'your-endpoint-name' + ``` + +4. **Memory issues:** + - Reduce `batch_size` + - Use larger cluster nodes + - Process files sequentially + +## Best Practices + +1. **Use Volumes** for production data (better performance and access control) +2. **Set appropriate batch sizes** based on artifact complexity +3. **Monitor job logs** for translation errors +4. **Save intermediate results** for large migrations +5. **Use Delta tables** for storing translation results +6. **Implement retry logic** for transient failures +7. **Use job parameters** for flexible configuration + +## Unity Catalog Configuration + +If using Unity Catalog Volumes: + +```sql +-- Grant access to Volume +GRANT READ VOLUME ON VOLUME main.default.migration_data TO `your_user@example.com` +``` + +```python +# Use in code +volume_path = "/Volumes/main/default/migration_data/" +``` + diff --git a/translation_graph/docs/README.md b/translation_graph/docs/README.md new file mode 100644 index 0000000..b144610 --- /dev/null +++ b/translation_graph/docs/README.md @@ -0,0 +1,13 @@ +# Documentation + +This directory contains documentation for the Translation Graph. + +## Available Documentation + +- **[DATABRICKS.md](DATABRICKS.md)** - Complete guide for deploying and using the Translation Graph in Databricks jobs and pipelines. + +## Main Documentation + +For general project documentation, see the [main README](../README.md) in the root directory. + + diff --git a/translation_graph/graph_builder.py b/translation_graph/graph_builder.py index cfe49e1..60edcc9 100644 --- a/translation_graph/graph_builder.py +++ b/translation_graph/graph_builder.py @@ -1,23 +1,47 @@ -from typing import Any, Dict +from typing import Any, Dict, List, Optional from nodes.router import artifact_router from nodes.tables_translation import translate_tables from nodes.views_translation import translate_views from nodes.schemas_translation import translate_schemas from nodes.procedures_translation import translate_procedures from nodes.roles_translation import translate_roles +from nodes.database_translation import translate_databases +from nodes.stages_translation import translate_stages +from nodes.streams_translation import translate_streams +from nodes.pipes_translation import translate_pipes +from nodes.tags_translation import translate_tags +from nodes.comments_translation import translate_comments +from nodes.masking_policies_translation import translate_masking_policies +from nodes.grants_translation import translate_grants +from nodes.udfs_translation import translate_udfs +from nodes.sequences_translation import translate_sequences +from nodes.file_formats_translation import translate_file_formats +from nodes.external_locations_translation import translate_external_locations from nodes.aggregator import aggregate_translations -from utils.types import ArtifactBatch +from utils.types import ArtifactBatch, TranslationResult class TranslationGraph: def __init__(self): self.nodes = { "router": artifact_router, + "translate_databases": translate_databases, + "translate_schemas": translate_schemas, "translate_tables": translate_tables, "translate_views": translate_views, - "translate_schemas": translate_schemas, - "translate_procedures": translate_procedures, + "translate_stages": translate_stages, + "translate_streams": translate_streams, + "translate_pipes": translate_pipes, "translate_roles": translate_roles, + "translate_grants": translate_grants, + "translate_tags": translate_tags, + "translate_comments": translate_comments, + "translate_masking_policies": translate_masking_policies, + "translate_udfs": translate_udfs, + "translate_procedures": translate_procedures, + "translate_sequences": translate_sequences, + "translate_file_formats": translate_file_formats, + "translate_external_locations": translate_external_locations, "aggregate": aggregate_translations } @@ -25,11 +49,23 @@ def run(self, batch: ArtifactBatch) -> Dict[str, Any]: target_node = self.nodes["router"](batch) translation_functions = { + "databases": self.nodes["translate_databases"], + "schemas": self.nodes["translate_schemas"], "tables": self.nodes["translate_tables"], "views": self.nodes["translate_views"], - "schemas": self.nodes["translate_schemas"], + "stages": self.nodes["translate_stages"], + "external_locations": self.nodes["translate_external_locations"], + "streams": self.nodes["translate_streams"], + "pipes": self.nodes["translate_pipes"], + "roles": self.nodes["translate_roles"], + "grants": self.nodes["translate_grants"], + "tags": self.nodes["translate_tags"], + "comments": self.nodes["translate_comments"], + "masking_policies": self.nodes["translate_masking_policies"], + "udfs": self.nodes["translate_udfs"], "procedures": self.nodes["translate_procedures"], - "roles": self.nodes["translate_roles"] + "sequences": self.nodes["translate_sequences"], + "file_formats": self.nodes["translate_file_formats"] } if target_node not in translation_functions: @@ -40,6 +76,59 @@ def run(self, batch: ArtifactBatch) -> Dict[str, Any]: return final_result + def run_batches(self, batches: List[ArtifactBatch]) -> Dict[str, Any]: + """ + Process multiple batches and aggregate results. + + Args: + batches: List of ArtifactBatch objects to process + + Returns: + Aggregated translation results + """ + translation_results: List[TranslationResult] = [] + + for batch in batches: + target_node = self.nodes["router"](batch) + + translation_functions = { + "databases": self.nodes["translate_databases"], + "schemas": self.nodes["translate_schemas"], + "tables": self.nodes["translate_tables"], + "views": self.nodes["translate_views"], + "stages": self.nodes["translate_stages"], + "external_locations": self.nodes["translate_external_locations"], + "streams": self.nodes["translate_streams"], + "pipes": self.nodes["translate_pipes"], + "roles": self.nodes["translate_roles"], + "grants": self.nodes["translate_grants"], + "tags": self.nodes["translate_tags"], + "comments": self.nodes["translate_comments"], + "masking_policies": self.nodes["translate_masking_policies"], + "udfs": self.nodes["translate_udfs"], + "procedures": self.nodes["translate_procedures"], + "sequences": self.nodes["translate_sequences"], + "file_formats": self.nodes["translate_file_formats"] + } + + if target_node not in translation_functions: + raise ValueError(f"Unknown target node: {target_node}") + + translation_result = translation_functions[target_node](batch) + translation_results.append(translation_result) + + if translation_results: + final_result = self.nodes["aggregate"](*translation_results) + return final_result + + return { + "metadata": { + "total_results": 0, + "errors": [], + "processing_stats": {} + } + } + def build_translation_graph() -> TranslationGraph: return TranslationGraph() diff --git a/translation_graph/nodes/aggregator.py b/translation_graph/nodes/aggregator.py index 4c8f7b7..1c26261 100644 --- a/translation_graph/nodes/aggregator.py +++ b/translation_graph/nodes/aggregator.py @@ -20,25 +20,25 @@ def aggregate_translations(*results: TranslationResult) -> Dict[str, Any]: "metadata": {...} } """ - # Initialize the merged structure - merged = { - "tables": [], - "views": [], - "schemas": [], - "procedures": [], - "roles": [], - "metadata": { - "total_results": 0, - "errors": [], - "processing_stats": {} - } + all_artifact_types = { + "databases", "schemas", "tables", "views", "stages", "external_locations", + "streams", "pipes", "roles", "grants", "tags", "comments", + "masking_policies", "udfs", "procedures", "sequences", "file_formats" + } + + merged = {artifact_type: [] for artifact_type in all_artifact_types} + merged["metadata"] = { + "total_results": 0, + "errors": [], + "processing_stats": {} } - # Aggregate results by artifact type for result in results: artifact_type = result.artifact_type if artifact_type in merged: merged[artifact_type].extend(result.results) + else: + merged[artifact_type] = result.results # Collect errors if result.errors: diff --git a/translation_graph/nodes/comments_translation.py b/translation_graph/nodes/comments_translation.py new file mode 100644 index 0000000..dc51dfb --- /dev/null +++ b/translation_graph/nodes/comments_translation.py @@ -0,0 +1,25 @@ +from config.ddl_config import get_config +from prompts.comments_prompts import CommentsPrompts +from utils.types import ArtifactBatch, TranslationResult + + +def translate_comments(batch: ArtifactBatch) -> TranslationResult: + """ + Translate Snowflake comment artifacts to Databricks comments. + + Args: + batch: The artifact batch to translate + + Returns: + TranslationResult with translated comment DDL + """ + config = get_config() + llm = config.get_llm_for_node("comments_translator") + prompt = CommentsPrompts.create_prompt() + + return TranslationResult( + artifact_type="comments", + results=[""], + errors=[], + metadata={"count": len(batch.items)} + ) diff --git a/translation_graph/nodes/database_translation.py b/translation_graph/nodes/database_translation.py new file mode 100644 index 0000000..b24c039 --- /dev/null +++ b/translation_graph/nodes/database_translation.py @@ -0,0 +1,56 @@ +from prompts.database_prompts import DatabasePrompts +from utils.types import ArtifactBatch, TranslationResult +from utils.llm_utils import create_llm_for_node + + +def translate_databases(batch: ArtifactBatch) -> TranslationResult: + """ + Translate database artifacts to Databricks catalogs. + + Args: + batch: The artifact batch to translate + + Returns: + TranslationResult with translated database DDL + """ + llm = create_llm_for_node("database_translator") + + # Process each database in the batch + results = [] + errors = [] + + for database_json in batch.items: + try: + # Parse the database JSON + import json + database_metadata = json.loads(database_json) + + # Create prompt with context and database metadata + context = { + "source_db": batch.context.get("source_db", "snowflake"), + "target_db": batch.context.get("target_db", "databricks") + } + + prompt = DatabasePrompts.create_prompt( + context=context, + ddl=json.dumps(database_metadata, indent=2) + ) + + # Call the LLM to generate DDL + try: + response = llm.invoke(prompt) + ddl_result = response.content if hasattr(response, 'content') else str(response) + results.append(ddl_result.strip()) + except Exception as e: + results.append(f"-- Error generating DDL for database {database_metadata.get('database_name', 'unknown')}: {str(e)}") + errors.append(f"LLM error for database {database_metadata.get('database_name', 'unknown')}: {str(e)}") + + except Exception as e: + errors.append(f"Error processing database: {str(e)}") + + return TranslationResult( + artifact_type="databases", + results=results, + errors=errors, + metadata={"count": len(batch.items), "processed": len(results)} + ) diff --git a/translation_graph/nodes/external_locations_translation.py b/translation_graph/nodes/external_locations_translation.py new file mode 100644 index 0000000..b753bb3 --- /dev/null +++ b/translation_graph/nodes/external_locations_translation.py @@ -0,0 +1,26 @@ +from config.ddl_config import get_config +from prompts.external_locations_prompts import ExternalLocationsPrompts +from utils.types import ArtifactBatch, TranslationResult + + +def translate_external_locations(batch: ArtifactBatch) -> TranslationResult: + """ + Translate Snowflake external location artifacts to Databricks external locations. + + Args: + batch: The artifact batch to translate + + Returns: + TranslationResult with translated external location DDL + """ + config = get_config() + llm = config.get_llm_for_node("external_locations_translator") + prompt = ExternalLocationsPrompts.create_prompt() + + return TranslationResult( + artifact_type="external_locations", + results=[""], + errors=[], + metadata={"count": len(batch.items)} + ) + diff --git a/translation_graph/nodes/file_formats_translation.py b/translation_graph/nodes/file_formats_translation.py new file mode 100644 index 0000000..7430035 --- /dev/null +++ b/translation_graph/nodes/file_formats_translation.py @@ -0,0 +1,26 @@ +from config.ddl_config import get_config +from prompts.file_formats_prompts import FileFormatsPrompts +from utils.types import ArtifactBatch, TranslationResult + + +def translate_file_formats(batch: ArtifactBatch) -> TranslationResult: + """ + Translate Snowflake file format artifacts to Databricks file formats. + + Args: + batch: The artifact batch to translate + + Returns: + TranslationResult with translated file format DDL + """ + config = get_config() + llm = config.get_llm_for_node("file_formats_translator") + prompt = FileFormatsPrompts.create_prompt() + + return TranslationResult( + artifact_type="file_formats", + results=[""], + errors=[], + metadata={"count": len(batch.items)} + ) + diff --git a/translation_graph/nodes/grants_translation.py b/translation_graph/nodes/grants_translation.py new file mode 100644 index 0000000..06e17e6 --- /dev/null +++ b/translation_graph/nodes/grants_translation.py @@ -0,0 +1,25 @@ +from config.ddl_config import get_config +from prompts.grants_prompts import GrantsPrompts +from utils.types import ArtifactBatch, TranslationResult + + +def translate_grants(batch: ArtifactBatch) -> TranslationResult: + """ + Translate Snowflake grant artifacts to Databricks Unity Catalog privileges. + + Args: + batch: The artifact batch to translate + + Returns: + TranslationResult with translated grant DDL + """ + config = get_config() + llm = config.get_llm_for_node("grants_translator") + prompt = GrantsPrompts.create_prompt() + + return TranslationResult( + artifact_type="grants", + results=[""], + errors=[], + metadata={"count": len(batch.items)} + ) diff --git a/translation_graph/nodes/masking_policies_translation.py b/translation_graph/nodes/masking_policies_translation.py new file mode 100644 index 0000000..92d4f18 --- /dev/null +++ b/translation_graph/nodes/masking_policies_translation.py @@ -0,0 +1,25 @@ +from config.ddl_config import get_config +from prompts.masking_policies_prompts import MaskingPoliciesPrompts +from utils.types import ArtifactBatch, TranslationResult + + +def translate_masking_policies(batch: ArtifactBatch) -> TranslationResult: + """ + Translate Snowflake masking policy artifacts to Databricks Unity Catalog column masks (UDFs). + + Args: + batch: The artifact batch to translate + + Returns: + TranslationResult with translated masking policy DDL + """ + config = get_config() + llm = config.get_llm_for_node("masking_policies_translator") + prompt = MaskingPoliciesPrompts.create_prompt() + + return TranslationResult( + artifact_type="masking_policies", + results=[""], + errors=[], + metadata={"count": len(batch.items)} + ) diff --git a/translation_graph/nodes/pipes_translation.py b/translation_graph/nodes/pipes_translation.py new file mode 100644 index 0000000..a2ab22d --- /dev/null +++ b/translation_graph/nodes/pipes_translation.py @@ -0,0 +1,25 @@ +from config.ddl_config import get_config +from prompts.pipes_prompts import PipesPrompts +from utils.types import ArtifactBatch, TranslationResult + + +def translate_pipes(batch: ArtifactBatch) -> TranslationResult: + """ + Translate Snowflake pipe (Snowpipe) artifacts to Databricks Auto Loader. + + Args: + batch: The artifact batch to translate + + Returns: + TranslationResult with translated pipe DDL + """ + config = get_config() + llm = config.get_llm_for_node("pipes_translator") + prompt = PipesPrompts.create_prompt() + + return TranslationResult( + artifact_type="pipes", + results=[""], + errors=[], + metadata={"count": len(batch.items)} + ) diff --git a/translation_graph/nodes/procedures_translation.py b/translation_graph/nodes/procedures_translation.py index 2eae232..49d42bc 100644 --- a/translation_graph/nodes/procedures_translation.py +++ b/translation_graph/nodes/procedures_translation.py @@ -1,6 +1,6 @@ -from config.ddl_config import get_config from prompts.procedures_prompts import ProceduresPrompts from utils.types import ArtifactBatch, TranslationResult +from utils.llm_utils import create_llm_for_node def translate_procedures(batch: ArtifactBatch) -> TranslationResult: @@ -13,13 +13,44 @@ def translate_procedures(batch: ArtifactBatch) -> TranslationResult: Returns: TranslationResult with translated procedure DDL """ - config = get_config() - llm = config.get_llm_for_node("procedures_translator") - prompt = ProceduresPrompts.create_prompt() + llm = create_llm_for_node("procedures_translator") + + # Process each procedure in the batch + results = [] + errors = [] + + for procedure_json in batch.items: + try: + # Parse the procedure JSON + import json + procedure_metadata = json.loads(procedure_json) + + # Create prompt with context and procedure metadata + context = { + "source_db": batch.context.get("source_db", "snowflake"), + "target_db": batch.context.get("target_db", "databricks") + } + + prompt = ProceduresPrompts.create_prompt( + context=context, + procedure_metadata=json.dumps(procedure_metadata, indent=2) + ) + + # Call the LLM to generate DDL + try: + response = llm.invoke(prompt) + ddl_result = response.content if hasattr(response, 'content') else str(response) + results.append(ddl_result.strip()) + except Exception as e: + results.append(f"-- Error generating DDL for procedure {procedure_metadata.get('procedure_name', 'unknown')}: {str(e)}") + errors.append(f"LLM error for procedure {procedure_metadata.get('procedure_name', 'unknown')}: {str(e)}") + + except Exception as e: + errors.append(f"Error processing procedure: {str(e)}") return TranslationResult( artifact_type="procedures", - results=[""], - errors=[], - metadata={"count": len(batch.items)} + results=results, + errors=errors, + metadata={"count": len(batch.items), "processed": len(results)} ) diff --git a/translation_graph/nodes/router.py b/translation_graph/nodes/router.py index 5893150..790d260 100644 --- a/translation_graph/nodes/router.py +++ b/translation_graph/nodes/router.py @@ -6,15 +6,46 @@ def artifact_router(batch: ArtifactBatch) -> str: """ Route artifacts to the appropriate translation node. + + If artifact_type is already set in the batch, returns it directly without LLM routing. + Otherwise, uses LLM to determine the artifact type from DDL content. Args: batch: The artifact batch to route Returns: - String indicating the target node: "tables", "views", "schemas", "procedures", or "roles" + String indicating the target node: "databases", "schemas", "tables", "views", + "stages", "external_locations", "streams", "pipes", "roles", "grants", "tags", + "comments", "masking_policies", "udfs", "procedures", "sequences", "file_formats" """ + if batch.artifact_type: + valid_nodes = { + "databases", "schemas", "tables", "views", "stages", "external_locations", + "streams", "pipes", "roles", "grants", "tags", "comments", + "masking_policies", "udfs", "procedures", "sequences", "file_formats" + } + if batch.artifact_type in valid_nodes: + return batch.artifact_type + config = get_config() llm = config.get_llm_for_node("smart_router") prompt = RouterPrompts.create_prompt() + ddl_content = "\n".join(batch.items) if batch.items else "" + routing_prompt = f"{prompt}\n\nDDL Content:\n{ddl_content}" + + response = llm.invoke(routing_prompt) + + response_text = response.content if hasattr(response, 'content') else str(response) + + valid_nodes = { + "databases", "schemas", "tables", "views", "stages", "external_locations", + "streams", "pipes", "roles", "grants", "tags", "comments", + "masking_policies", "udfs", "procedures", "sequences", "file_formats" + } + + for node in valid_nodes: + if node.lower() in response_text.lower(): + return node + return "tables" diff --git a/translation_graph/nodes/schemas_translation.py b/translation_graph/nodes/schemas_translation.py index 086c1a9..24778d8 100644 --- a/translation_graph/nodes/schemas_translation.py +++ b/translation_graph/nodes/schemas_translation.py @@ -1,6 +1,6 @@ -from config.ddl_config import get_config from prompts.schemas_prompts import SchemasPrompts from utils.types import ArtifactBatch, TranslationResult +from utils.llm_utils import create_llm_for_node def translate_schemas(batch: ArtifactBatch) -> TranslationResult: @@ -13,13 +13,44 @@ def translate_schemas(batch: ArtifactBatch) -> TranslationResult: Returns: TranslationResult with translated schema DDL """ - config = get_config() - llm = config.get_llm_for_node("schemas_translator") - prompt = SchemasPrompts.create_prompt() + llm = create_llm_for_node("schemas_translator") + + # Process each schema in the batch + results = [] + errors = [] + + for schema_json in batch.items: + try: + # Parse the schema JSON + import json + schema_metadata = json.loads(schema_json) + + # Create prompt with context and schema metadata + context = { + "source_db": batch.context.get("source_db", "snowflake"), + "target_db": batch.context.get("target_db", "databricks") + } + + prompt = SchemasPrompts.create_prompt( + context=context, + schema_metadata=json.dumps(schema_metadata, indent=2) + ) + + # Call the LLM to generate DDL + try: + response = llm.invoke(prompt) + ddl_result = response.content if hasattr(response, 'content') else str(response) + results.append(ddl_result.strip()) + except Exception as e: + results.append(f"-- Error generating DDL for schema {schema_metadata.get('schema_name', 'unknown')}: {str(e)}") + errors.append(f"LLM error for schema {schema_metadata.get('schema_name', 'unknown')}: {str(e)}") + + except Exception as e: + errors.append(f"Error processing schema: {str(e)}") return TranslationResult( artifact_type="schemas", - results=[""], - errors=[], - metadata={"count": len(batch.items)} + results=results, + errors=errors, + metadata={"count": len(batch.items), "processed": len(results)} ) diff --git a/translation_graph/nodes/sequences_translation.py b/translation_graph/nodes/sequences_translation.py new file mode 100644 index 0000000..a95e934 --- /dev/null +++ b/translation_graph/nodes/sequences_translation.py @@ -0,0 +1,60 @@ +from prompts.sequences_prompts import SequencesPrompts +from utils.types import ArtifactBatch, TranslationResult +from utils.llm_utils import create_llm_for_node + + +def translate_sequences(batch: ArtifactBatch) -> TranslationResult: + """ + Translate Snowflake sequence artifacts to Databricks equivalents. + + Note: Databricks doesn't have sequences like Snowflake. This translator + provides guidance on alternative approaches. + + Args: + batch: The artifact batch to translate + + Returns: + TranslationResult with sequence migration guidance + """ + llm = create_llm_for_node("sequences_translator") + + # Process each sequence in the batch + results = [] + errors = [] + + for sequence_json in batch.items: + try: + # Parse the sequence JSON + import json + sequence_metadata = json.loads(sequence_json) + + # Create prompt with context and sequence metadata + context = { + "source_db": batch.context.get("source_db", "snowflake"), + "target_db": batch.context.get("target_db", "databricks") + } + + prompt = SequencesPrompts.create_prompt( + context=context, + sequence_metadata=json.dumps(sequence_metadata, indent=2) + ) + + # Call the LLM to generate DDL + try: + response = llm.invoke(prompt) + ddl_result = response.content if hasattr(response, 'content') else str(response) + results.append(ddl_result.strip()) + except Exception as e: + results.append(f"-- Error generating DDL for sequence {sequence_metadata.get('sequence_name', 'unknown')}: {str(e)}") + errors.append(f"LLM error for sequence {sequence_metadata.get('sequence_name', 'unknown')}: {str(e)}") + + except Exception as e: + errors.append(f"Error processing sequence: {str(e)}") + + return TranslationResult( + artifact_type="sequences", + results=results, + errors=errors, + metadata={"count": len(batch.items), "processed": len(results)} + ) + diff --git a/translation_graph/nodes/stages_translation.py b/translation_graph/nodes/stages_translation.py new file mode 100644 index 0000000..c8f6146 --- /dev/null +++ b/translation_graph/nodes/stages_translation.py @@ -0,0 +1,25 @@ +from config.ddl_config import get_config +from prompts.stages_prompts import StagesPrompts +from utils.types import ArtifactBatch, TranslationResult + + +def translate_stages(batch: ArtifactBatch) -> TranslationResult: + """ + Translate Snowflake stage artifacts to Databricks volumes. + + Args: + batch: The artifact batch to translate + + Returns: + TranslationResult with translated stage DDL + """ + config = get_config() + llm = config.get_llm_for_node("stages_translator") + prompt = StagesPrompts.create_prompt() + + return TranslationResult( + artifact_type="stages", + results=[""], + errors=[], + metadata={"count": len(batch.items)} + ) diff --git a/translation_graph/nodes/streams_translation.py b/translation_graph/nodes/streams_translation.py new file mode 100644 index 0000000..cd9c044 --- /dev/null +++ b/translation_graph/nodes/streams_translation.py @@ -0,0 +1,25 @@ +from config.ddl_config import get_config +from prompts.streams_prompts import StreamsPrompts +from utils.types import ArtifactBatch, TranslationResult + + +def translate_streams(batch: ArtifactBatch) -> TranslationResult: + """ + Translate Snowflake stream artifacts to Databricks Delta Change Data Feed. + + Args: + batch: The artifact batch to translate + + Returns: + TranslationResult with translated stream DDL + """ + config = get_config() + llm = config.get_llm_for_node("streams_translator") + prompt = StreamsPrompts.create_prompt() + + return TranslationResult( + artifact_type="streams", + results=[""], + errors=[], + metadata={"count": len(batch.items)} + ) diff --git a/translation_graph/nodes/tables_translation.py b/translation_graph/nodes/tables_translation.py index d7a76ed..89e8a6c 100644 --- a/translation_graph/nodes/tables_translation.py +++ b/translation_graph/nodes/tables_translation.py @@ -14,11 +14,43 @@ def translate_tables(batch: ArtifactBatch) -> TranslationResult: TranslationResult with translated table DDL """ llm = create_llm_for_node("tables_translator") - prompt = TablesPrompts.create_prompt() + + # Process each table in the batch + results = [] + errors = [] + + for table_json in batch.items: + try: + # Parse the table JSON + import json + table_metadata = json.loads(table_json) + + # Create prompt with context and table metadata + context = { + "source_db": batch.context.get("source_db", "snowflake"), + "target_db": batch.context.get("target_db", "databricks") + } + + prompt = TablesPrompts.create_prompt( + context=context, + table_metadata=json.dumps(table_metadata, indent=2) + ) + + # Call the LLM to generate DDL + try: + response = llm.invoke(prompt) + ddl_result = response.content if hasattr(response, 'content') else str(response) + results.append(ddl_result.strip()) + except Exception as e: + results.append(f"-- Error generating DDL for table {table_metadata.get('table_name', 'unknown')}: {str(e)}") + errors.append(f"LLM error for table {table_metadata.get('table_name', 'unknown')}: {str(e)}") + + except Exception as e: + errors.append(f"Error processing table: {str(e)}") return TranslationResult( artifact_type="tables", - results=[""], - errors=[], - metadata={"count": len(batch.items)} + results=results, + errors=errors, + metadata={"count": len(batch.items), "processed": len(results)} ) diff --git a/translation_graph/nodes/tags_translation.py b/translation_graph/nodes/tags_translation.py new file mode 100644 index 0000000..a545bef --- /dev/null +++ b/translation_graph/nodes/tags_translation.py @@ -0,0 +1,25 @@ +from config.ddl_config import get_config +from prompts.tags_prompts import TagsPrompts +from utils.types import ArtifactBatch, TranslationResult + + +def translate_tags(batch: ArtifactBatch) -> TranslationResult: + """ + Translate Snowflake tag artifacts to Databricks Unity Catalog tags. + + Args: + batch: The artifact batch to translate + + Returns: + TranslationResult with translated tag DDL + """ + config = get_config() + llm = config.get_llm_for_node("tags_translator") + prompt = TagsPrompts.create_prompt() + + return TranslationResult( + artifact_type="tags", + results=[""], + errors=[], + metadata={"count": len(batch.items)} + ) diff --git a/translation_graph/nodes/udfs_translation.py b/translation_graph/nodes/udfs_translation.py new file mode 100644 index 0000000..569183d --- /dev/null +++ b/translation_graph/nodes/udfs_translation.py @@ -0,0 +1,56 @@ +from prompts.udfs_prompts import UDFsPrompts +from utils.types import ArtifactBatch, TranslationResult +from utils.llm_utils import create_llm_for_node + + +def translate_udfs(batch: ArtifactBatch) -> TranslationResult: + """ + Translate Snowflake SQL UDF artifacts to Databricks Unity Catalog SQL UDFs. + + Args: + batch: The artifact batch to translate + + Returns: + TranslationResult with translated UDF DDL + """ + llm = create_llm_for_node("udfs_translator") + + # Process each UDF in the batch + results = [] + errors = [] + + for udf_json in batch.items: + try: + # Parse the UDF JSON + import json + udf_metadata = json.loads(udf_json) + + # Create prompt with context and UDF metadata + context = { + "source_db": batch.context.get("source_db", "snowflake"), + "target_db": batch.context.get("target_db", "databricks") + } + + prompt = UDFsPrompts.create_prompt( + context=context, + function_metadata=json.dumps(udf_metadata, indent=2) + ) + + # Call the LLM to generate DDL + try: + response = llm.invoke(prompt) + ddl_result = response.content if hasattr(response, 'content') else str(response) + results.append(ddl_result.strip()) + except Exception as e: + results.append(f"-- Error generating DDL for UDF {udf_metadata.get('function_name', 'unknown')}: {str(e)}") + errors.append(f"LLM error for UDF {udf_metadata.get('function_name', 'unknown')}: {str(e)}") + + except Exception as e: + errors.append(f"Error processing UDF: {str(e)}") + + return TranslationResult( + artifact_type="udfs", + results=results, + errors=errors, + metadata={"count": len(batch.items), "processed": len(results)} + ) diff --git a/translation_graph/nodes/views_translation.py b/translation_graph/nodes/views_translation.py index 086499e..9787524 100644 --- a/translation_graph/nodes/views_translation.py +++ b/translation_graph/nodes/views_translation.py @@ -1,6 +1,6 @@ -from config.ddl_config import get_config from prompts.views_prompts import ViewsPrompts from utils.types import ArtifactBatch, TranslationResult +from utils.llm_utils import create_llm_for_node def translate_views(batch: ArtifactBatch) -> TranslationResult: @@ -13,13 +13,44 @@ def translate_views(batch: ArtifactBatch) -> TranslationResult: Returns: TranslationResult with translated view DDL """ - config = get_config() - llm = config.get_llm_for_node("views_translator") - prompt = ViewsPrompts.create_prompt() + llm = create_llm_for_node("views_translator") + + # Process each view in the batch + results = [] + errors = [] + + for view_json in batch.items: + try: + # Parse the view JSON + import json + view_metadata = json.loads(view_json) + + # Create prompt with context and view metadata + context = { + "source_db": batch.context.get("source_db", "snowflake"), + "target_db": batch.context.get("target_db", "databricks") + } + + prompt = ViewsPrompts.create_prompt( + context=context, + ddl=view_metadata.get('view_definition', '') + ) + + # Call the LLM to generate DDL + try: + response = llm.invoke(prompt) + ddl_result = response.content if hasattr(response, 'content') else str(response) + results.append(ddl_result.strip()) + except Exception as e: + results.append(f"-- Error generating DDL for view {view_metadata.get('view_name', 'unknown')}: {str(e)}") + errors.append(f"LLM error for view {view_metadata.get('view_name', 'unknown')}: {str(e)}") + + except Exception as e: + errors.append(f"Error processing view: {str(e)}") return TranslationResult( artifact_type="views", - results=[""], - errors=[], - metadata={"count": len(batch.items)} + results=results, + errors=errors, + metadata={"count": len(batch.items), "processed": len(results)} ) diff --git a/translation_graph/prompts/comments_prompts.py b/translation_graph/prompts/comments_prompts.py new file mode 100644 index 0000000..1242b23 --- /dev/null +++ b/translation_graph/prompts/comments_prompts.py @@ -0,0 +1,26 @@ +from . import PromptBase + + +class CommentsPrompts(PromptBase): + """Prompts for comment translation.""" + + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake comments to Databricks comments. + +Your task is to translate Snowflake comment DDL statements to equivalent Databricks comment statements. + +Key mappings: +- Snowflake COMMENT → Databricks COMMENT +- Direct equivalent mapping +- Comments are used for documentation and metadata + +For each comment DDL statement, provide the equivalent Databricks SQL that adds comments to objects. + +Context: {context} +Input DDL: {ddl} + +Provide only the translated SQL statements for adding comments.""" + + @classmethod + def create_prompt(cls, **kwargs): + """Create comment translation system prompt.""" + return cls.system_prompt(**kwargs) diff --git a/translation_graph/prompts/database_prompts.py b/translation_graph/prompts/database_prompts.py new file mode 100644 index 0000000..cd1a9ff --- /dev/null +++ b/translation_graph/prompts/database_prompts.py @@ -0,0 +1,25 @@ +from . import PromptBase + + +class DatabasePrompts(PromptBase): + """Prompts for database to catalog translation.""" + + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake databases to Databricks Unity Catalog. + +Your task is to translate Snowflake database DDL statements to equivalent Databricks catalog creation statements. + +Key mappings: +- Snowflake DATABASE → Databricks CATALOG +- Direct equivalent mapping with no special considerations + +For each database DDL statement, provide the equivalent Databricks SQL that creates the catalog. + +Context: {context} +Input DDL: {ddl} + +Provide only the translated SQL statements, no explanations.""" + + @classmethod + def create_prompt(cls, **kwargs): + """Create database translation system prompt.""" + return cls.system_prompt(**kwargs) diff --git a/translation_graph/prompts/external_locations_prompts.py b/translation_graph/prompts/external_locations_prompts.py new file mode 100644 index 0000000..9255125 --- /dev/null +++ b/translation_graph/prompts/external_locations_prompts.py @@ -0,0 +1,34 @@ +from . import PromptBase + + +class ExternalLocationsPrompts(PromptBase): + """Prompts for external location translation.""" + + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake external locations to Databricks external locations. + +Your task is to generate Databricks external location DDL from Snowflake external location metadata structures. + +Key mappings: +- Snowflake EXTERNAL LOCATION → Databricks EXTERNAL LOCATION (Direct Equivalent) +- Storage integrations map to storage credentials in Databricks +- External locations define storage paths that can be referenced by external tables, stages, and volumes + +Important considerations for DDL generation: +- Convert URL paths to Databricks external location syntax +- Map storage integrations to storage credentials using WITH (STORAGE CREDENTIAL ...) +- Handle different cloud storage types (S3, Azure, GCS) +- Include location comments using COMMENT clause +- Generate proper CREATE EXTERNAL LOCATION syntax for Databricks Unity Catalog + +For each external location metadata object, generate the equivalent Databricks CREATE EXTERNAL LOCATION statement. + +Context: {context} +External Location Metadata: {external_location_metadata} + +Provide only the generated CREATE EXTERNAL LOCATION SQL statement, no explanations.""" + + @classmethod + def create_prompt(cls, **kwargs): + """Create external location translation system prompt.""" + return cls.system_prompt(**kwargs) + diff --git a/translation_graph/prompts/file_formats_prompts.py b/translation_graph/prompts/file_formats_prompts.py new file mode 100644 index 0000000..fdc240c --- /dev/null +++ b/translation_graph/prompts/file_formats_prompts.py @@ -0,0 +1,35 @@ +from . import PromptBase + + +class FileFormatsPrompts(PromptBase): + """Prompts for file format translation.""" + + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake file formats to Databricks file formats. + +Your task is to generate Databricks file format DDL from Snowflake file format metadata structures. + +Key mappings: +- Snowflake FILE FORMAT → Databricks FILE FORMAT (Direct Equivalent) +- File format definitions are directly compatible + +Important considerations for DDL generation: +- Convert format_options JSON to Databricks file format syntax +- Handle different format types: CSV, JSON, PARQUET, etc. +- Map format options to Databricks equivalents: + * CSV options: DELIMITER, HEADER, etc. + * JSON options: various parsing options +- Include format comments using COMMENT clause +- Generate proper CREATE FILE FORMAT syntax for Databricks + +For each file format metadata object, generate the equivalent Databricks CREATE FILE FORMAT statement. + +Context: {context} +File Format Metadata: {file_format_metadata} + +Provide only the generated CREATE FILE FORMAT SQL statement, no explanations.""" + + @classmethod + def create_prompt(cls, **kwargs): + """Create file format translation system prompt.""" + return cls.system_prompt(**kwargs) + diff --git a/translation_graph/prompts/grants_prompts.py b/translation_graph/prompts/grants_prompts.py new file mode 100644 index 0000000..e26410e --- /dev/null +++ b/translation_graph/prompts/grants_prompts.py @@ -0,0 +1,29 @@ +from . import PromptBase + + +class GrantsPrompts(PromptBase): + """Prompts for grant to Unity Catalog privilege translation.""" + + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake grants to Databricks Unity Catalog privileges. + +Your task is to translate Snowflake grant DDL statements to equivalent Databricks Unity Catalog privilege assignments. + +Key mappings and considerations: +- Snowflake GRANT → Databricks UNITY CATALOG PRIVILEGE on Catalog/Schema/Table/Volume +- IMPERFECT MATCH: Future grants unsupported +- Unity Catalog uses different privilege model and scoping +- Map role-based grants to appropriate UC securable objects + +For each grant DDL statement, provide the equivalent Databricks SQL that assigns privileges in Unity Catalog. + +Note: Future grants (grants on objects not yet created) are not supported in Unity Catalog. + +Context: {context} +Input DDL: {ddl} + +Provide only the translated privilege assignment statements, noting any unsupported future grant scenarios.""" + + @classmethod + def create_prompt(cls, **kwargs): + """Create grant translation system prompt.""" + return cls.system_prompt(**kwargs) diff --git a/translation_graph/prompts/masking_policies_prompts.py b/translation_graph/prompts/masking_policies_prompts.py new file mode 100644 index 0000000..9e3ad65 --- /dev/null +++ b/translation_graph/prompts/masking_policies_prompts.py @@ -0,0 +1,31 @@ +from . import PromptBase + + +class MaskingPoliciesPrompts(PromptBase): + """Prompts for masking policy to column mask (UDF) translation.""" + + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake masking policies to Databricks Unity Catalog column masks. + +Your task is to translate Snowflake masking policy DDL statements to equivalent Databricks Unity Catalog column mask UDFs. + +Key mappings and considerations: +- Snowflake MASKING POLICY → Databricks UNITY CATALOG COLUMN MASK (UDF) +- IMPERFECT MATCH: Supports dynamic masking; requires UDF creation +- Column masks in Databricks are implemented as UDFs +- Handle conditional masking logic and user/role-based access control + +For each masking policy DDL statement, provide: +1. The equivalent UDF creation statement +2. The column mask application statement + +Note: Databricks column masks require UDF creation and explicit application. + +Context: {context} +Input DDL: {ddl} + +Provide the translated UDF creation and column mask application statements.""" + + @classmethod + def create_prompt(cls, **kwargs): + """Create masking policy translation system prompt.""" + return cls.system_prompt(**kwargs) diff --git a/translation_graph/prompts/pipes_prompts.py b/translation_graph/prompts/pipes_prompts.py new file mode 100644 index 0000000..aa20e0f --- /dev/null +++ b/translation_graph/prompts/pipes_prompts.py @@ -0,0 +1,30 @@ +from . import PromptBase + + +class PipesPrompts(PromptBase): + """Prompts for pipe (Snowpipe) to Auto Loader translation.""" + + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake Snowpipe to Databricks Auto Loader. + +Your task is to translate Snowflake pipe DDL statements to equivalent Databricks Auto Loader configurations. + +Key mappings: +- Snowflake PIPE (Snowpipe) → Databricks AUTO LOADER +- Direct equivalent mapping +- Pipes continuously load data from stages into tables +- Auto Loader provides similar continuous ingestion capabilities + +For each pipe DDL statement, provide the equivalent Databricks SQL statements for Auto Loader setup, including: +- SQL commands for creating streaming tables with Auto Loader +- SQL statements for configuring continuous ingestion +- Any necessary table and streaming configurations + +Context: {context} +Input DDL: {ddl} + +Provide only the translated SQL statements for Auto Loader setup.""" + + @classmethod + def create_prompt(cls, **kwargs): + """Create pipe translation system prompt.""" + return cls.system_prompt(**kwargs) diff --git a/translation_graph/prompts/procedures_prompts.py b/translation_graph/prompts/procedures_prompts.py index fd6505d..89bcf72 100644 --- a/translation_graph/prompts/procedures_prompts.py +++ b/translation_graph/prompts/procedures_prompts.py @@ -4,7 +4,28 @@ class ProceduresPrompts(PromptBase): """Prompts for procedure translation.""" - SYSTEM_TEMPLATE = "" + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake SQL stored procedures to Databricks Unity Catalog SQL procedures. + +Your task is to generate Databricks stored procedure DDL from Snowflake procedure metadata structures. + +Key mappings: +- Snowflake STORED PROCEDURE (SQL) → Databricks UNITY CATALOG SQL PROCEDURE (Direct equivalent) +- Procedure structure and SQL logic are directly compatible + +Important considerations for DDL generation: +- Generate CREATE PROCEDURE statement with proper syntax +- Convert Snowflake SQL syntax to Databricks equivalents where needed +- Handle parameter definitions (infer from procedure body if not explicitly provided) +- Convert procedure body SQL logic and syntax differences +- Add procedure comments using COMMENT clause +- Generate proper procedure signature and body structure + +For each procedure metadata object, generate the complete Databricks CREATE PROCEDURE statement. + +Context: {context} +Procedure Metadata: {procedure_metadata} + +Provide only the generated CREATE PROCEDURE SQL statement, no explanations.""" @classmethod def create_prompt(cls, **kwargs): diff --git a/translation_graph/prompts/roles_prompts.py b/translation_graph/prompts/roles_prompts.py index b3ac207..7aa048c 100644 --- a/translation_graph/prompts/roles_prompts.py +++ b/translation_graph/prompts/roles_prompts.py @@ -4,7 +4,27 @@ class RolesPrompts(PromptBase): """Prompts for role translation.""" - SYSTEM_TEMPLATE = "" + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake roles to Databricks Unity Catalog groups. + +Your task is to translate Snowflake role DDL statements to equivalent Databricks Unity Catalog group creation statements. + +Key mappings: +- Snowflake ROLE → Databricks UNITY CATALOG GROUP (Direct Equivalent) +- Role inheritance becomes group membership hierarchy +- Roles map directly to Groups in Unity Catalog + +Important considerations: +- Convert role creation statements to group creation +- Handle role hierarchies by mapping to group membership relationships +- Preserve role comments and descriptions +- Map role ownership appropriately + +For each role DDL statement, provide the equivalent Databricks SQL that creates the Unity Catalog group. + +Context: {context} +Input DDL: {ddl} + +Provide only the translated SQL statements for group creation and membership hierarchy.""" @classmethod def create_prompt(cls, **kwargs): diff --git a/translation_graph/prompts/router_prompts.py b/translation_graph/prompts/router_prompts.py index bb08780..f9d65e3 100644 --- a/translation_graph/prompts/router_prompts.py +++ b/translation_graph/prompts/router_prompts.py @@ -4,7 +4,42 @@ class RouterPrompts(PromptBase): """Prompts for the smart router node.""" - SYSTEM_TEMPLATE = "" + SYSTEM_TEMPLATE = """You are an expert at analyzing Snowflake DDL statements and routing them to the appropriate translation node. + +Your task is to examine the provided DDL content and determine which type of Snowflake object it represents. + +Available routing targets and their characteristics: + +Data Structures: +- databases: CREATE DATABASE statements +- schemas: CREATE SCHEMA statements +- tables: CREATE TABLE, CREATE EXTERNAL TABLE, CREATE TRANSIENT TABLE statements +- views: CREATE VIEW statements (including materialized views) + +Data Transportation & Streaming: +- stages: CREATE STAGE statements +- external_locations: CREATE EXTERNAL LOCATION statements +- streams: CREATE STREAM statements +- pipes: CREATE PIPE statements + +Governance & Security: +- roles: CREATE ROLE statements +- grants: GRANT statements + +Metadata & Object Properties: +- tags: CREATE TAG statements and tag assignments +- comments: COMMENT statements +- masking_policies: CREATE MASKING POLICY statements + +Programmatic & Logical Objects: +- udfs: CREATE FUNCTION/UDF statements +- procedures: CREATE PROCEDURE statements +- sequences: CREATE SEQUENCE statements +- file_formats: CREATE FILE FORMAT statements + +Analyze the DDL content and return ONLY the routing target name from the list above that best matches the object type being created or modified. + +Return only the target name, no explanation.""" @classmethod def create_prompt(cls, **kwargs): diff --git a/translation_graph/prompts/schemas_prompts.py b/translation_graph/prompts/schemas_prompts.py index aaaf3a3..f876ce0 100644 --- a/translation_graph/prompts/schemas_prompts.py +++ b/translation_graph/prompts/schemas_prompts.py @@ -4,7 +4,26 @@ class SchemasPrompts(PromptBase): """Prompts for schema translation.""" - SYSTEM_TEMPLATE = "" + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake schemas to Databricks schemas. + +Your task is to translate Snowflake schema DDL statements to equivalent Databricks schema creation statements. + +Key mappings: +- Snowflake SCHEMA → Databricks SCHEMA (Direct Equivalent) +- Schema structure and naming conventions are directly compatible + +Important considerations: +- Handle schema properties and metadata +- Preserve schema comments +- Map schema ownership and permissions appropriately +- Ensure proper catalog/schema hierarchy in Unity Catalog + +For each schema DDL statement, provide the equivalent Databricks SQL that creates the schema in Unity Catalog. + +Context: {context} +Schema Metadata: {schema_metadata} + +Provide only the translated SQL statements, no explanations.""" @classmethod def create_prompt(cls, **kwargs): diff --git a/translation_graph/prompts/sequences_prompts.py b/translation_graph/prompts/sequences_prompts.py new file mode 100644 index 0000000..8c6f993 --- /dev/null +++ b/translation_graph/prompts/sequences_prompts.py @@ -0,0 +1,36 @@ +from . import PromptBase + + +class SequencesPrompts(PromptBase): + """Prompts for sequence to alternative translation.""" + + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake sequences to Databricks alternatives. + +IMPORTANT: Databricks does not have sequences like Snowflake. Sequences are typically replaced with: +- Identity columns (GENERATED ALWAYS AS IDENTITY) +- Application-generated IDs +- Other auto-increment approaches + +Your task is to analyze Snowflake sequence metadata and provide guidance on equivalent Databricks approaches. + +Key considerations: +- Snowflake SEQUENCE → Databricks identity columns or alternative ID generation +- Sequences are not directly supported in Databricks SQL +- Common alternatives: IDENTITY columns, UUIDs, or application logic + +For each sequence metadata object, provide: +1. Analysis of the sequence usage and requirements +2. Recommended Databricks alternative implementation +3. Sample SQL for identity columns or other approaches +4. Migration notes about any behavioral differences + +Context: {context} +Sequence Metadata: {sequence_metadata} + +Provide migration guidance and alternative implementation approaches.""" + + @classmethod + def create_prompt(cls, **kwargs): + """Create sequence translation system prompt.""" + return cls.system_prompt(**kwargs) + diff --git a/translation_graph/prompts/stages_prompts.py b/translation_graph/prompts/stages_prompts.py new file mode 100644 index 0000000..6333678 --- /dev/null +++ b/translation_graph/prompts/stages_prompts.py @@ -0,0 +1,27 @@ +from . import PromptBase + + +class StagesPrompts(PromptBase): + """Prompts for stage to volume translation.""" + + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake stages to Databricks volumes. + +Your task is to translate Snowflake stage DDL statements to equivalent Databricks volume creation statements. + +Key mappings and considerations: +- Snowflake STAGE → Databricks VOLUME +- IMPERFECT MATCH: Schema isn't automatically matched in Volumes +- Handle schema scoping differences - Databricks volumes are scoped within catalogs/schemas +- Consider access patterns and security implications + +For each stage DDL statement, provide the equivalent Databricks SQL that creates the volume, noting any schema mapping considerations. + +Context: {context} +Input DDL: {ddl} + +Provide only the translated SQL statements with appropriate comments about schema handling.""" + + @classmethod + def create_prompt(cls, **kwargs): + """Create stage translation system prompt.""" + return cls.system_prompt(**kwargs) diff --git a/translation_graph/prompts/streams_prompts.py b/translation_graph/prompts/streams_prompts.py new file mode 100644 index 0000000..09e6c5e --- /dev/null +++ b/translation_graph/prompts/streams_prompts.py @@ -0,0 +1,27 @@ +from . import PromptBase + + +class StreamsPrompts(PromptBase): + """Prompts for stream to Delta Change Data Feed translation.""" + + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake streams to Databricks Delta Change Data Feed. + +Your task is to translate Snowflake stream DDL statements to equivalent Databricks Delta Change Data Feed setup. + +Key mappings: +- Snowflake STREAM → Databricks DELTA CHANGE DATA FEED +- Direct equivalent mapping +- Streams capture change data from tables/views +- Delta CDF provides similar change tracking capabilities + +For each stream DDL statement, provide the equivalent Databricks SQL that enables change data feed on the corresponding table. + +Context: {context} +Input DDL: {ddl} + +Provide only the translated SQL statements for enabling change data feed.""" + + @classmethod + def create_prompt(cls, **kwargs): + """Create stream translation system prompt.""" + return cls.system_prompt(**kwargs) diff --git a/translation_graph/prompts/tables_prompts.py b/translation_graph/prompts/tables_prompts.py index 42618b7..bc04b13 100644 --- a/translation_graph/prompts/tables_prompts.py +++ b/translation_graph/prompts/tables_prompts.py @@ -4,7 +4,37 @@ class TablesPrompts(PromptBase): """Prompts for table translation.""" - SYSTEM_TEMPLATE = "" + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake tables to Databricks tables. + +Your task is to generate Databricks table creation DDL from Snowflake table metadata structures. + +Key mappings: +- Snowflake PERMANENT TABLE → Databricks MANAGED TABLE (Direct Equivalent) +- Snowflake TEMPORARY TABLE → Databricks TEMPORARY VIEW (Direct Equivalent) +- Snowflake TRANSIENT TABLE → Databricks MANAGED TABLE with reduced Time Travel (Imperfect match - optional) +- Snowflake EXTERNAL TABLE → Databricks EXTERNAL TABLE (Direct Equivalent) + +Important considerations for DDL generation: +- Convert Snowflake data types to Databricks equivalents: + * NUMBER(precision, scale) → DECIMAL(precision, scale) or BIGINT/INT + * TEXT(length) → STRING or VARCHAR(length) + * BOOLEAN → BOOLEAN + * TIMESTAMP_NTZ → TIMESTAMP + * TIMESTAMP_LTZ → TIMESTAMP + * VARIANT → STRING (or MAP/ARRAY for complex data) + * ARRAY → ARRAY + * OBJECT → MAP +- Handle nullability: "YES"/"NO" → NULL/NOT NULL +- Include column defaults where applicable +- Add table comments using COMMENT clause +- Generate proper CREATE TABLE syntax for Databricks + +For each table metadata object, generate the equivalent Databricks CREATE TABLE statement. + +Context: {context} +Table Metadata: {table_metadata} + +Provide only the generated CREATE TABLE SQL statement, no explanations.""" @classmethod def create_prompt(cls, **kwargs): diff --git a/translation_graph/prompts/tags_prompts.py b/translation_graph/prompts/tags_prompts.py new file mode 100644 index 0000000..017c20c --- /dev/null +++ b/translation_graph/prompts/tags_prompts.py @@ -0,0 +1,26 @@ +from . import PromptBase + + +class TagsPrompts(PromptBase): + """Prompts for tag translation.""" + + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake tags to Databricks Unity Catalog tags. + +Your task is to translate Snowflake tag DDL statements to equivalent Databricks Unity Catalog tag creation and assignment statements. + +Key mappings: +- Snowflake TAG → Databricks UNITY CATALOG TAG +- Direct equivalent mapping +- Tags are used for metadata classification and governance + +For each tag DDL statement, provide the equivalent Databricks SQL that creates and assigns tags. + +Context: {context} +Input DDL: {ddl} + +Provide only the translated SQL statements for tag creation and assignment.""" + + @classmethod + def create_prompt(cls, **kwargs): + """Create tag translation system prompt.""" + return cls.system_prompt(**kwargs) diff --git a/translation_graph/prompts/udfs_prompts.py b/translation_graph/prompts/udfs_prompts.py new file mode 100644 index 0000000..8bde515 --- /dev/null +++ b/translation_graph/prompts/udfs_prompts.py @@ -0,0 +1,35 @@ +from . import PromptBase + + +class UDFsPrompts(PromptBase): + """Prompts for UDF translation.""" + + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake SQL UDFs to Databricks Unity Catalog SQL UDFs. + +Your task is to generate Databricks UDF DDL from Snowflake function metadata structures. + +Key mappings: +- Snowflake UDFs (SQL) → Databricks UNITY CATALOG SQL UDF +- Direct equivalent mapping +- Handle function signatures, return types, and SQL logic +- Consider any syntax differences between platforms + +Important considerations for DDL generation: +- Generate CREATE FUNCTION statement with proper syntax +- Convert Snowflake SQL syntax to Databricks equivalents +- Infer function parameters and return types from the SQL body +- Handle function body SQL logic and syntax differences +- Add function comments using COMMENT clause +- Generate proper function signature and body structure + +For each function metadata object, generate the complete Databricks CREATE FUNCTION statement. + +Context: {context} +Function Metadata: {function_metadata} + +Provide only the generated CREATE FUNCTION SQL statement, no explanations.""" + + @classmethod + def create_prompt(cls, **kwargs): + """Create UDF translation system prompt.""" + return cls.system_prompt(**kwargs) diff --git a/translation_graph/prompts/views_prompts.py b/translation_graph/prompts/views_prompts.py index a859480..24f30c8 100644 --- a/translation_graph/prompts/views_prompts.py +++ b/translation_graph/prompts/views_prompts.py @@ -4,7 +4,29 @@ class ViewsPrompts(PromptBase): """Prompts for view translation.""" - SYSTEM_TEMPLATE = "" + SYSTEM_TEMPLATE = """You are an expert in migrating Snowflake views to Databricks views. + +Your task is to translate Snowflake view DDL statements to equivalent Databricks view creation statements. + +Key mappings: +- Snowflake NON-MATERIALIZED VIEW → Databricks VIEW (Direct Equivalent) +- Snowflake NON-MATERIALIZED VIEW → Databricks VIEW with ACL restrictions (Imperfect match - optional, security via permissions) +- Snowflake MATERIALIZED VIEW → Databricks MATERIALIZED VIEW (Direct Equivalent) +- Snowflake RECURSIVE VIEW → Databricks VIEW with recursive CTE (Imperfect match - all recursive views have equivalent with CTEs) + +Important considerations: +- Convert view definitions and SQL logic to Databricks syntax +- Handle recursive views by converting to recursive CTEs +- For non-materialized views, note that ACL restrictions may need to be enforced via permissions +- Preserve view comments and metadata +- Handle dependencies and referenced objects + +For each view DDL statement, provide the equivalent Databricks SQL that creates the view. + +Context: {context} +Input DDL: {ddl} + +Provide only the translated SQL statements, no explanations.""" @classmethod def create_prompt(cls, **kwargs): diff --git a/translation_graph/requirements.txt b/translation_graph/requirements.txt deleted file mode 100644 index 90957c6..0000000 --- a/translation_graph/requirements.txt +++ /dev/null @@ -1,22 +0,0 @@ -# Core dependencies for the DDL translation graph -langchain>=0.1.0 -langchain-core>=0.1.0 -langchain-community>=0.1.0 -langgraph>=0.1.0 - -# Databricks LLM support -databricks-langchain>=0.1.0 - -# Environment variable management -python-dotenv>=1.0.0 - -# Testing framework -pytest>=7.0.0 - -# Optional: Additional testing utilities -pytest-cov>=4.0.0 - -# Optional: For more advanced testing -pytest-asyncio>=0.21.0 - -# Note: dataclasses, typing, os, json, sys, unittest are built into Python 3.7+ diff --git a/translation_graph/run_example.py b/translation_graph/run_example.py deleted file mode 100644 index 2d0f746..0000000 --- a/translation_graph/run_example.py +++ /dev/null @@ -1,55 +0,0 @@ -#!/usr/bin/env python3 -""" -Example runner for the data migration accelerator. -""" - -import sys -import os - -# Add the current directory to Python path -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from graph_builder import build_translation_graph -from utils.types import ArtifactBatch - - -def main(): - """Run the example translation workflow.""" - print("Data Migration Accelerator - Example Run") - print("=" * 50) - - # Create a fake ArtifactBatch - batch = ArtifactBatch( - artifact_type="tables", - items=["CREATE TABLE example (id INT, name VARCHAR(255))"], - context={"source_db": "snowflake", "target_db": "postgres"} - ) - - print(f"Processing batch: {batch.artifact_type}") - print(f"Items count: {len(batch.items)}") - print(f"Context: {batch.context}") - print() - - # Build the translation graph - graph = build_translation_graph() - - # Run the graph - print("Running translation graph...") - result = graph.run(batch) - - # Print the result - print("Translation Results:") - print("-" * 20) - for key, value in result.items(): - if key == "metadata": - print(f"{key}:") - for meta_key, meta_value in value.items(): - print(f" {meta_key}: {meta_value}") - else: - print(f"{key}: {value}") - - print("\nExample completed successfully!") - - -if __name__ == "__main__": - main() diff --git a/translation_graph/run_file_processor.py b/translation_graph/run_file_processor.py new file mode 100755 index 0000000..c53c024 --- /dev/null +++ b/translation_graph/run_file_processor.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 +""" +File-based processor for the data migration accelerator. +Processes JSON files containing artifact definitions, determining artifact type from filename +and processing in batches. +""" + +import sys +import os +import argparse +from typing import List, Dict, Any + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from graph_builder import build_translation_graph +from utils.file_processor import process_files, create_batches_from_file + + +def process_single_file( + filepath: str, + batch_size: int = 10, + context: Dict[str, Any] = None +) -> Dict[str, Any]: + """ + Process a single file and return translation results. + + Args: + filepath: Path to the DDL file + batch_size: Number of DDL statements per batch + context: Optional context dictionary + + Returns: + Translation results dictionary + """ + print(f"Processing file: {filepath}") + print(f"Batch size: {batch_size}") + print("-" * 50) + + batches = create_batches_from_file(filepath, batch_size, context) + + print(f"Created {len(batches)} batch(es) from file") + print(f"Total artifacts: {sum(len(batch.items) for batch in batches)}") + print() + + graph = build_translation_graph() + + results = [] + for i, batch in enumerate(batches): + print(f"Processing batch {i + 1}/{len(batches)} ({len(batch.items)} items)...") + result = graph.run(batch) + results.append(result) + print(f"Batch {i + 1} completed") + print() + + if len(results) == 1: + return results[0] + + from nodes.aggregator import aggregate_translations + from utils.types import TranslationResult + + translation_results = [] + for i, (result, batch) in enumerate(zip(results, batches)): + translation_result = TranslationResult( + artifact_type=batch.artifact_type, + results=result.get(batch.artifact_type, []), + errors=result.get("metadata", {}).get("errors", []), + metadata={**result.get("metadata", {}), "batch_index": i} + ) + translation_results.append(translation_result) + + final_result = aggregate_translations(*translation_results) + return final_result + + +def process_multiple_files( + filepaths: List[str], + batch_size: int = 10, + context: Dict[str, Any] = None +) -> Dict[str, Any]: + """ + Process multiple files and aggregate results. + + Args: + filepaths: List of file paths to process + batch_size: Number of DDL statements per batch + context: Optional context dictionary + + Returns: + Aggregated translation results + """ + print(f"Processing {len(filepaths)} file(s)") + print(f"Batch size: {batch_size}") + print("=" * 50) + print() + + graph = build_translation_graph() + all_batches = process_files(filepaths, batch_size, context) + + print(f"Total batches created: {len(all_batches)}") + print() + + return graph.run_batches(all_batches) + + +def main(): + """Main entry point for file-based processing.""" + parser = argparse.ArgumentParser( + description="Process DDL files and translate them using the translation graph" + ) + parser.add_argument( + "files", + nargs="+", + help="One or more JSON files to process. Artifact type is determined from filename." + ) + parser.add_argument( + "--batch-size", + type=int, + default=10, + help="Number of artifacts per batch (default: 10)" + ) + parser.add_argument( + "--output", + type=str, + help="Output file path to save results (optional)" + ) + + args = parser.parse_args() + + print("Data Migration Accelerator - File Processor") + print("=" * 50) + print() + + if len(args.files) == 1: + result = process_single_file(args.files[0], args.batch_size) + else: + result = process_multiple_files(args.files, args.batch_size) + + print("Translation Results:") + print("-" * 50) + + for key, value in result.items(): + if key == "metadata": + print(f"{key}:") + for meta_key, meta_value in value.items(): + if isinstance(meta_value, dict): + print(f" {meta_key}:") + for sub_key, sub_value in meta_value.items(): + print(f" {sub_key}: {sub_value}") + else: + print(f" {meta_key}: {meta_value}") + elif isinstance(value, list): + print(f"{key}: {len(value)} items") + if value and len(value) <= 5: + for item in value: + print(f" - {item[:100]}..." if len(str(item)) > 100 else f" - {item}") + else: + print(f"{key}: {value}") + + if args.output: + import json + with open(args.output, 'w', encoding='utf-8') as f: + json.dump(result, f, indent=2, default=str) + print(f"\nResults saved to: {args.output}") + + print("\nProcessing completed successfully!") + + +if __name__ == "__main__": + main() + diff --git a/translation_graph/tests/__init__.py b/translation_graph/tests/__init__.py index e69de29..ba76eea 100644 --- a/translation_graph/tests/__init__.py +++ b/translation_graph/tests/__init__.py @@ -0,0 +1,3 @@ +# Tests package + + diff --git a/translation_graph/tests/integration/README.md b/translation_graph/tests/integration/README.md new file mode 100644 index 0000000..2c023dd --- /dev/null +++ b/translation_graph/tests/integration/README.md @@ -0,0 +1,80 @@ +# Integration Tests and Examples + +This directory contains integration tests and example notebooks that demonstrate how to use the Translation Graph. + +## Files + +- **`test_integration_example.py`** - Complete integration test that runs the translation graph with example JSON files +- **`run_example.py`** - Basic example showing how to use the graph programmatically +- **`databricks_job_notebook.py`** - Databricks notebook for parameterized jobs +- **`databricks_notebook_example.py`** - Complete Databricks notebook example + +## Example Data + +The `example_data/` directory contains sample JSON files: +- `tables.json` - Sample table definitions +- `views.json` - Sample view definitions +- `schemas.json` - Sample schema definitions + +## Running Integration Tests + +### Run the full integration test suite: + +```bash +cd translation_graph +python tests/integration/test_integration_example.py +``` + +This will: +1. Load example JSON files from `example_data/` +2. Process them through the translation graph +3. Validate the results +4. Save output files (if configured) + +### Run individual examples: + +```bash +# Basic example +python tests/integration/run_example.py + +# Integration test with example files +python tests/integration/test_integration_example.py +``` + +## Example Data Structure + +The example JSON files follow the same structure as real Snowflake object exports: + +```json +{ + "tables": [ + { + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "table_name": "EXAMPLE_TABLE_1", + "table_type": "BASE TABLE", + "columns": [...] + } + ] +} +``` + +## Output + +When running `test_integration_example.py`, results are saved to: +- `example_data/output/translation_results.json` + +## Requirements + +Make sure you have: +1. All dependencies installed (`pip install -r requirements.txt`) +2. Databricks LLM endpoint configured (set `DBX_ENDPOINT` environment variable) +3. Example data files in `example_data/` directory + +## Notes + +- These tests use example data and don't require actual database connections +- The tests demonstrate the full workflow from JSON files to translated DDL +- Results are validated but actual DDL execution is optional + + diff --git a/translation_graph/tests/integration/__init__.py b/translation_graph/tests/integration/__init__.py new file mode 100644 index 0000000..9bc1544 --- /dev/null +++ b/translation_graph/tests/integration/__init__.py @@ -0,0 +1,3 @@ +# Integration tests and examples + + diff --git a/translation_graph/tests/integration/example_data/schemas.json b/translation_graph/tests/integration/example_data/schemas.json new file mode 100644 index 0000000..1db97b7 --- /dev/null +++ b/translation_graph/tests/integration/example_data/schemas.json @@ -0,0 +1,30 @@ +{ + "schemas": [ + { + "database_name": "DATA_MIGRATION_DB", + "schema_name": "BRONZE_LAYER", + "created": "2025-01-01 08:00:00.000000-08:00", + "last_altered": "2025-01-15 10:00:00.000000-08:00", + "comment": "Bronze layer schema for raw data ingestion", + "owner": "DATA_ENGINEER_ROLE" + }, + { + "database_name": "DATA_MIGRATION_DB", + "schema_name": "SILVER_LAYER", + "created": "2025-01-01 08:15:00.000000-08:00", + "last_altered": "2025-01-20 14:30:00.000000-08:00", + "comment": "Silver layer schema for cleaned and transformed data", + "owner": "DATA_ENGINEER_ROLE" + }, + { + "database_name": "DATA_MIGRATION_DB", + "schema_name": "GOLD_LAYER", + "created": "2025-01-01 08:30:00.000000-08:00", + "last_altered": "2025-01-25 16:45:00.000000-08:00", + "comment": "Gold layer schema for business-ready aggregated data", + "owner": "DATA_ANALYST_ROLE" + } + ] +} + + diff --git a/translation_graph/tests/integration/example_data/tables.json b/translation_graph/tests/integration/example_data/tables.json new file mode 100644 index 0000000..70a749b --- /dev/null +++ b/translation_graph/tests/integration/example_data/tables.json @@ -0,0 +1,82 @@ +{ + "tables": [ + { + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "table_name": "EXAMPLE_TABLE_1", + "table_type": "BASE TABLE", + "row_count": 0, + "bytes": 0, + "created": "2025-01-01 12:00:00.000000-08:00", + "last_altered": "2025-01-01 12:00:00.000000-08:00", + "comment": "Example table for testing", + "columns": [ + { + "column_name": "ID", + "data_type": "NUMBER", + "character_maximum_length": null, + "numeric_precision": 38, + "numeric_scale": 0, + "is_nullable": "NO", + "column_default": null, + "comment": "Primary key" + }, + { + "column_name": "NAME", + "data_type": "VARCHAR", + "character_maximum_length": 255, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "YES", + "column_default": null, + "comment": "Name field" + }, + { + "column_name": "CREATED_AT", + "data_type": "TIMESTAMP_NTZ", + "character_maximum_length": null, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "YES", + "column_default": "CURRENT_TIMESTAMP()", + "comment": "Creation timestamp" + } + ] + }, + { + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "table_name": "EXAMPLE_TABLE_2", + "table_type": "BASE TABLE", + "row_count": 0, + "bytes": 0, + "created": "2025-01-01 12:00:00.000000-08:00", + "last_altered": "2025-01-01 12:00:00.000000-08:00", + "comment": "Second example table", + "columns": [ + { + "column_name": "USER_ID", + "data_type": "NUMBER", + "character_maximum_length": null, + "numeric_precision": 38, + "numeric_scale": 0, + "is_nullable": "NO", + "column_default": null, + "comment": null + }, + { + "column_name": "EMAIL", + "data_type": "VARCHAR", + "character_maximum_length": 100, + "numeric_precision": null, + "numeric_scale": null, + "is_nullable": "NO", + "column_default": null, + "comment": "User email address" + } + ] + } + ] +} + + diff --git a/translation_graph/tests/integration/example_data/views.json b/translation_graph/tests/integration/example_data/views.json new file mode 100644 index 0000000..524b9f4 --- /dev/null +++ b/translation_graph/tests/integration/example_data/views.json @@ -0,0 +1,33 @@ +{ + "views": [ + { + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "view_name": "ACTIVE_USERS_VIEW", + "view_definition": "SELECT u.user_id, u.email, u.created_at, p.profile_status FROM users u LEFT JOIN user_profiles p ON u.user_id = p.user_id WHERE u.is_active = true", + "created": "2025-01-15 10:30:00.000000-08:00", + "last_altered": "2025-01-20 14:45:00.000000-08:00", + "comment": "View showing active users with their profile status" + }, + { + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "view_name": "SALES_SUMMARY_VIEW", + "view_definition": "SELECT DATE_TRUNC('month', order_date) as month, product_category, SUM(order_amount) as total_sales, COUNT(*) as order_count FROM sales_orders WHERE order_status = 'completed' GROUP BY DATE_TRUNC('month', order_date), product_category", + "created": "2025-01-10 09:15:00.000000-08:00", + "last_altered": "2025-01-25 16:20:00.000000-08:00", + "comment": "Monthly sales summary by product category" + }, + { + "database_name": "DATA_MIGRATION_DB", + "schema_name": "DATA_MIGRATION_SCHEMA", + "view_name": "INVENTORY_STATUS_VIEW", + "view_definition": "SELECT p.product_id, p.product_name, p.category, i.quantity_available, i.last_inventory_update, CASE WHEN i.quantity_available < 10 THEN 'LOW_STOCK' WHEN i.quantity_available = 0 THEN 'OUT_OF_STOCK' ELSE 'IN_STOCK' END as stock_status FROM products p INNER JOIN inventory i ON p.product_id = i.product_id", + "created": "2025-01-05 11:00:00.000000-08:00", + "last_altered": "2025-01-18 13:30:00.000000-08:00", + "comment": "Current inventory status with stock level indicators" + } + ] +} + + diff --git a/translation_graph/tests/integration/test_full_workflow.py b/translation_graph/tests/integration/test_full_workflow.py new file mode 100755 index 0000000..0b1e667 --- /dev/null +++ b/translation_graph/tests/integration/test_full_workflow.py @@ -0,0 +1,320 @@ +#!/usr/bin/env python3 +""" +Full Workflow Integration Test + +This comprehensive test demonstrates the complete Translation Graph workflow: +1. Loads example JSON files containing Snowflake object definitions +2. Processes them through the translation graph with real LLM calls +3. Validates the generated Databricks SQL DDL results +4. Saves complete results for inspection + +Test scenarios: +- Table translation with real LLM-generated DDL +- Multiple file processing (tables, views, schemas) +- Result persistence and validation +""" + +import sys +import os +import json +from pathlib import Path + +# Add the translation_graph directory to Python path +current_dir = os.path.dirname(os.path.abspath(__file__)) +translation_graph_dir = os.path.dirname(os.path.dirname(current_dir)) +sys.path.insert(0, translation_graph_dir) + +from graph_builder import build_translation_graph +from utils.file_processor import create_batches_from_file, process_files + + +def get_example_data_dir(): + """Get the path to the example data directory.""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + return os.path.join(current_dir, "example_data") + + +def test_translate_tables(): + """Test translating tables from example JSON file.""" + print("=" * 70) + print("Integration Test: Translating Tables") + print("=" * 70) + + example_dir = get_example_data_dir() + tables_file = os.path.join(example_dir, "tables.json") + + if not os.path.exists(tables_file): + print(f"ERROR: Example file not found: {tables_file}") + return False + + print(f"\n1. Loading example file: {tables_file}") + + try: + batches = create_batches_from_file(tables_file, batch_size=10) + print(f" ✓ Created {len(batches)} batch(es)") + print(f" ✓ Total artifacts: {sum(len(batch.items) for batch in batches)}") + + print(f"\n2. Building translation graph...") + graph = build_translation_graph() + print(" ✓ Graph built successfully") + + print(f"\n3. Processing batches through translation graph...") + result = graph.run_batches(batches) + + print(f"\n4. Results:") + metadata = result.get('metadata', {}) + print(f" ✓ Total SQL statements: {metadata.get('total_results', 0)}") + print(f" ✓ Errors: {len(metadata.get('errors', []))}") + + if result.get('tables'): + print(f" ✓ Table SQL statements: {len(result['tables'])}") + for i, table_sql in enumerate(result['tables'][:2], 1): + # Clean up the SQL for display (remove code blocks if present) + if table_sql.startswith('```sql'): + table_sql = table_sql[6:] + if table_sql.endswith('```'): + table_sql = table_sql[:-3] + preview = table_sql.strip()[:100] + "..." if len(table_sql.strip()) > 100 else table_sql.strip() + print(f" Table {i}: {preview}") + + if result.get('metadata', {}).get('errors'): + print(f"\n ⚠ Errors encountered:") + for error in result['metadata']['errors'][:3]: + print(f" - {error}") + + print(f"\n✓ Test completed successfully!") + return True + + except Exception as e: + print(f"\n✗ Test failed with error: {str(e)}") + import traceback + traceback.print_exc() + return False + + +def test_translate_multiple_files(): + """Test translating multiple files (tables, views, schemas).""" + print("\n" + "=" * 70) + print("Integration Test: Translating Multiple Files") + print("=" * 70) + + example_dir = get_example_data_dir() + files = [ + os.path.join(example_dir, "tables.json"), + os.path.join(example_dir, "views.json"), + os.path.join(example_dir, "schemas.json"), + os.path.join(example_dir, "databases.json"), + os.path.join(example_dir, "procedures.json"), + os.path.join(example_dir, "udfs.json"), + os.path.join(example_dir, "sequences.json") + ] + + existing_files = [f for f in files if os.path.exists(f)] + + if not existing_files: + print(f"ERROR: No example files found in {example_dir}") + return False + + print(f"\n1. Processing {len(existing_files)} file(s)...") + for f in existing_files: + print(f" - {os.path.basename(f)}") + + try: + batches = process_files(existing_files, batch_size=5) + print(f"\n2. Created {len(batches)} batch(es) from all files") + + print(f"\n3. Building translation graph...") + graph = build_translation_graph() + + print(f"\n4. Processing all batches...") + result = graph.run_batches(batches) + + print(f"\n5. Results Summary:") + metadata = result.get('metadata', {}) + print(f" ✓ Total results: {metadata.get('total_results', 0)}") + print(f" ✓ Errors: {len(metadata.get('errors', []))}") + + print(f"\n6. Generated SQL by artifact type:") + total_sql_statements = 0 + for artifact_type in ['tables', 'views', 'schemas', 'databases', 'sequences', 'stages', 'streams', 'pipes', 'roles', 'grants', 'tags', 'comments', 'masking_policies', 'udfs', 'procedures', 'external_locations', 'file_formats']: + if artifact_type in result and result[artifact_type]: + sql_count = len(result[artifact_type]) + total_sql_statements += sql_count + print(f" ✓ {artifact_type}: {sql_count} SQL statement(s)") + + # Show first SQL statement as preview for each type + if sql_count > 0: + first_sql = result[artifact_type][0] + # Clean up the SQL for display (remove code blocks if present) + if first_sql.startswith('```sql'): + first_sql = first_sql[6:] + if first_sql.endswith('```'): + first_sql = first_sql[:-3] + preview = first_sql.strip()[:80] + "..." if len(first_sql.strip()) > 80 else first_sql.strip() + print(f" Preview: {preview}") + + print(f"\n7. Total SQL statements generated: {total_sql_statements}") + + if metadata.get('errors'): + print(f"\n8. Errors encountered:") + for error in metadata['errors'][:3]: # Show first 3 errors + print(f" - {error}") + + print(f"\n✓ Test completed successfully!") + return True + + except Exception as e: + print(f"\n✗ Test failed with error: {str(e)}") + import traceback + traceback.print_exc() + return False + + +def test_save_sql_files(): + """Test saving results as separate SQL files for each artifact type.""" + print("\n" + "=" * 70) + print("Integration Test: Saving SQL Files") + print("=" * 70) + + example_dir = get_example_data_dir() + files = [ + os.path.join(example_dir, "tables.json"), + os.path.join(example_dir, "views.json"), + os.path.join(example_dir, "schemas.json"), + os.path.join(example_dir, "databases.json"), + os.path.join(example_dir, "procedures.json"), + os.path.join(example_dir, "udfs.json"), + os.path.join(example_dir, "sequences.json") + ] + output_dir = os.path.join(example_dir, "output", "sql_files") + + # Check that files exist + missing_files = [f for f in files if not os.path.exists(f)] + if missing_files: + print(f"ERROR: Example files not found: {missing_files}") + return False + + try: + print(f"\n1. Processing {len(files)} file(s)...") + for file_path in files: + print(f" - {os.path.basename(file_path)}") + + batches = [] + for file_path in files: + file_batches = create_batches_from_file(file_path, batch_size=10) + batches.extend(file_batches) + + print(f"\n2. Running translation...") + graph = build_translation_graph() + result = graph.run_batches(batches) + + print(f"\n3. Saving SQL files to: {output_dir}") + + # Import the save function + import sys + current_dir = os.path.dirname(os.path.abspath(__file__)) + translation_graph_dir = os.path.dirname(os.path.dirname(current_dir)) + sys.path.insert(0, translation_graph_dir) + from databricks_job import save_sql_files + + save_sql_files(result, output_dir) + + # Verify SQL files were created + sql_files_created = [] + if os.path.exists(output_dir): + for file in os.listdir(output_dir): + if file.endswith('.sql'): + sql_files_created.append(file) + + print(f"\n4. Verification:") + print(f" ✓ SQL files created: {len(sql_files_created)}") + total_sql_statements = 0 + + for sql_file in sorted(sql_files_created): + file_path = os.path.join(output_dir, sql_file) + file_size = os.path.getsize(file_path) + + # Count SQL statements (lines starting with non-comment content) + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + lines = content.split('\n') + sql_statements = [line for line in lines if line.strip() and not line.strip().startswith('--')] + sql_count = len(sql_statements) + + total_sql_statements += sql_count + print(f" - {sql_file}: {file_size} bytes ({sql_count} SQL statements)") + + # Show first few lines as preview + preview_lines = [] + for line in lines[:3]: + if line.strip() and not line.strip().startswith('--'): + preview_lines.append(line.strip()) + if len(preview_lines) >= 2: + break + + if preview_lines: + preview = ' '.join(preview_lines) + if len(preview) > 80: + preview = preview[:80] + "..." + print(f" Preview: {preview}") + + print(f"\n ✓ Total SQL statements saved: {total_sql_statements}") + if sql_files_created: + types = [f.split('.')[0] for f in sql_files_created] + print(f" ✓ Types saved: {', '.join(sorted(types))}") + + print(f"\n✓ Test completed successfully!") + return True + + except Exception as e: + print(f"\n✗ Test failed with error: {str(e)}") + import traceback + traceback.print_exc() + return False + + +def main(): + """Run all integration tests.""" + print("\n" + "=" * 70) + print("Translation Graph Integration Tests") + print("=" * 70) + print("\nRunning integration tests with example data files...") + + results = [] + + # Test 1: Translate tables + results.append(("Translate Tables", test_translate_tables())) + + # Test 2: Translate multiple files + results.append(("Translate Multiple Files", test_translate_multiple_files())) + + # Test 3: Save SQL files + results.append(("Save SQL Files", test_save_sql_files())) + + # Summary + print("\n" + "=" * 70) + print("Test Summary") + print("=" * 70) + + passed = sum(1 for _, result in results if result) + total = len(results) + + for test_name, result in results: + status = "✓ PASSED" if result else "✗ FAILED" + print(f"{status}: {test_name}") + + print(f"\nTotal: {passed}/{total} tests passed") + + if passed == total: + print("\n🎉 All tests passed!") + return 0 + else: + print(f"\n⚠️ {total - passed} test(s) failed") + return 1 + + +if __name__ == "__main__": + exit_code = main() + sys.exit(exit_code) + + diff --git a/translation_graph/tests/test_graph.py b/translation_graph/tests/test_graph.py deleted file mode 100644 index 29cad21..0000000 --- a/translation_graph/tests/test_graph.py +++ /dev/null @@ -1,126 +0,0 @@ -import unittest -import sys -import os - -# Add the parent directory to Python path -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -from utils.types import ArtifactBatch, TranslationResult -from utils.llm_utils import validate_node_requirements -from nodes.router import artifact_router -from nodes.tables_translation import translate_tables -from nodes.views_translation import translate_views -from nodes.schemas_translation import translate_schemas -from nodes.procedures_translation import translate_procedures -from nodes.roles_translation import translate_roles -from nodes.aggregator import aggregate_translations -from graph_builder import build_translation_graph - - -class TestTranslationGraph(unittest.TestCase): - """Basic tests for the translation graph components.""" - - def setUp(self): - """Set up test fixtures.""" - self.test_batch = ArtifactBatch( - artifact_type="tables", - items=["CREATE TABLE test (id INT)"], - context={"source": "snowflake"} - ) - - def test_router_returns_valid_string(self): - """Test that router returns a valid routing string.""" - result = artifact_router(self.test_batch) - - valid_routes = ["tables", "views", "schemas", "procedures", "roles"] - self.assertIn(result, valid_routes) - self.assertIsInstance(result, str) - - def test_translation_nodes_return_results(self): - """Test that all translation nodes return proper TranslationResult objects.""" - translation_functions = [ - (translate_tables, "tables"), - (translate_views, "views"), - (translate_schemas, "schemas"), - (translate_procedures, "procedures"), - (translate_roles, "roles") - ] - - for translate_func, expected_type in translation_functions: - with self.subTest(node=expected_type): - result = translate_func(self.test_batch) - self.assertIsInstance(result, TranslationResult) - self.assertEqual(result.artifact_type, expected_type) - self.assertIsInstance(result.results, list) - self.assertIsInstance(result.errors, list) - self.assertIsInstance(result.metadata, dict) - - def test_aggregator_merges_results(self): - """Test that aggregator correctly merges multiple results.""" - result1 = TranslationResult( - artifact_type="tables", - results=["table1", "table2"], - errors=["error1"], - metadata={"count": 2} - ) - - result2 = TranslationResult( - artifact_type="views", - results=["view1"], - errors=[], - metadata={"count": 1} - ) - - merged = aggregate_translations(result1, result2) - - # Check structure - expected_keys = {"tables", "views", "schemas", "procedures", "roles", "metadata"} - self.assertEqual(set(merged.keys()), expected_keys) - - # Check merged results - self.assertEqual(merged["tables"], ["table1", "table2"]) - self.assertEqual(merged["views"], ["view1"]) - self.assertEqual(merged["schemas"], []) - self.assertEqual(merged["procedures"], []) - self.assertEqual(merged["roles"], []) - - # Check metadata - self.assertEqual(merged["metadata"]["total_results"], 3) - self.assertEqual(merged["metadata"]["errors"], ["error1"]) - - def test_graph_runs_end_to_end(self): - """Test that the full graph runs end-to-end.""" - graph = build_translation_graph() - - result = graph.run(self.test_batch) - - # Check that we get a result dictionary - self.assertIsInstance(result, dict) - - # Check that all expected keys are present - expected_keys = {"tables", "views", "schemas", "procedures", "roles", "metadata"} - self.assertEqual(set(result.keys()), expected_keys) - - # Check metadata structure - self.assertIn("total_results", result["metadata"]) - self.assertIn("errors", result["metadata"]) - self.assertIn("processing_stats", result["metadata"]) - - def test_node_validation_structure(self): - """Test that node validation returns expected structure.""" - validation_result = validate_node_requirements() - - # Check that we get expected keys - expected_keys = {"llm_available", "environment_ready", "errors"} - self.assertEqual(set(validation_result.keys()), expected_keys) - - # Check that errors is a list - self.assertIsInstance(validation_result["errors"], list) - - # Check that boolean flags are present - for key in ["llm_available", "environment_ready"]: - self.assertIsInstance(validation_result[key], bool) - - -if __name__ == '__main__': - unittest.main() diff --git a/translation_graph/utils/file_processor.py b/translation_graph/utils/file_processor.py new file mode 100644 index 0000000..d46ca3a --- /dev/null +++ b/translation_graph/utils/file_processor.py @@ -0,0 +1,201 @@ +import os +import json +from typing import List, Optional, Dict, Any +from utils.types import ArtifactBatch + + +def determine_artifact_type_from_filename(filename: str) -> Optional[str]: + """ + Determine artifact type from filename. + + Args: + filename: The name of the file (with or without path) + + Returns: + Artifact type string or None if cannot be determined + """ + basename = os.path.basename(filename).lower() + + # Map filename keywords to artifact types + artifact_type_mapping = { + "tables": ["table", "tables"], + "views": ["view", "views"], + "schemas": ["schema", "schemas"], + "databases": ["database", "databases", "db"], + "procedures": ["procedure", "procedures", "proc", "procs"], + "roles": ["role", "roles"], + "stages": ["stage", "stages"], + "streams": ["stream", "streams"], + "pipes": ["pipe", "pipes"], + "grants": ["grant", "grants"], + "tags": ["tag", "tags"], + "comments": ["comment", "comments"], + "masking_policies": ["masking_policy", "masking_policies", "masking", "policy"], + "udfs": ["udf", "udfs", "function", "functions"], + "sequences": ["sequence", "sequences"], + "file_formats": ["file_format", "file_formats", "format", "formats"], + "external_locations": ["external_location", "external_locations", "external"] + } + + for artifact_type, keywords in artifact_type_mapping.items(): + for keyword in keywords: + if keyword in basename: + return artifact_type + + return None + + +def get_json_key_mapping(): + """Get mapping from artifact types to JSON keys in files.""" + return { + "udfs": "functions" # Snowflake uses "functions" but we call them "udfs" + } + + for artifact_type, keywords in artifact_type_mapping.items(): + for keyword in keywords: + if keyword in basename: + return artifact_type + + return None + + +def load_json_file(filepath: str) -> Dict[str, Any]: + """ + Load and parse a JSON file. + + Args: + filepath: Path to the JSON file + + Returns: + Parsed JSON data as a dictionary + + Raises: + json.JSONDecodeError: If the file is not valid JSON + FileNotFoundError: If the file doesn't exist + """ + with open(filepath, 'r', encoding='utf-8') as f: + return json.load(f) + + +def extract_artifacts_from_json( + json_data: Dict[str, Any], + artifact_type: str +) -> List[str]: + """ + Extract artifacts from JSON data based on artifact type. + + Args: + json_data: Parsed JSON data + artifact_type: Type of artifact to extract (e.g., "tables", "views") + + Returns: + List of JSON strings, each representing one artifact object + + Raises: + KeyError: If the artifact type key is not found in JSON + """ + if artifact_type not in json_data: + raise KeyError( + f"Artifact type '{artifact_type}' not found in JSON. " + f"Available keys: {list(json_data.keys())}" + ) + + artifacts = json_data[artifact_type] + + if not isinstance(artifacts, list): + raise ValueError( + f"Expected '{artifact_type}' to be a list, got {type(artifacts).__name__}" + ) + + return [json.dumps(artifact) for artifact in artifacts] + + +def create_batches_from_file( + filepath: str, + batch_size: int = 10, + context: Optional[dict] = None +) -> List[ArtifactBatch]: + """ + Read a JSON file, determine artifact type from filename, and create batches. + + Args: + filepath: Path to the JSON file + batch_size: Number of artifacts per batch + context: Optional context dictionary to include in batches + + Returns: + List of ArtifactBatch objects + + Raises: + ValueError: If artifact type cannot be determined from filename + KeyError: If the artifact type key is not found in JSON + json.JSONDecodeError: If the file is not valid JSON + """ + artifact_type = determine_artifact_type_from_filename(filepath) + + if artifact_type is None: + raise ValueError( + f"Cannot determine artifact type from filename: {filepath}. " + f"Filename should contain one of: tables, views, schemas, databases, " + f"procedures, roles, stages, streams, pipes, grants, tags, comments, " + f"masking_policies, udfs, sequences, file_formats, external_locations" + ) + + if not os.path.exists(filepath): + raise FileNotFoundError(f"File not found: {filepath}") + + json_data = load_json_file(filepath) + + # Use the correct JSON key for this artifact type + json_key_mapping = get_json_key_mapping() + json_key = json_key_mapping.get(artifact_type, artifact_type) + artifact_items = extract_artifacts_from_json(json_data, json_key) + + if context is None: + context = {} + + context["source_file"] = filepath + + batches = [] + total_batches = (len(artifact_items) + batch_size - 1) // batch_size + + for i in range(0, len(artifact_items), batch_size): + batch_items = artifact_items[i:i + batch_size] + batch = ArtifactBatch( + artifact_type=artifact_type, + items=batch_items, + context={ + **context, + "batch_index": i // batch_size, + "total_batches": total_batches + } + ) + batches.append(batch) + + return batches + + +def process_files( + filepaths: List[str], + batch_size: int = 10, + context: Optional[dict] = None +) -> List[ArtifactBatch]: + """ + Process multiple JSON files and create batches for each. + + Args: + filepaths: List of JSON file paths to process + batch_size: Number of artifacts per batch + context: Optional context dictionary to include in batches + + Returns: + List of ArtifactBatch objects from all files + """ + all_batches = [] + + for filepath in filepaths: + batches = create_batches_from_file(filepath, batch_size, context) + all_batches.extend(batches) + + return all_batches + diff --git a/translation_graph/utils/llm_utils.py b/translation_graph/utils/llm_utils.py index 0120627..d6d89e0 100644 --- a/translation_graph/utils/llm_utils.py +++ b/translation_graph/utils/llm_utils.py @@ -1,5 +1,6 @@ import os from typing import Dict, Any +import time from config.ddl_config import create_node_llm, get_config @@ -39,7 +40,6 @@ def test_llm_connection() -> Dict[str, Any]: } try: - import time start_time = time.time() llm = create_llm_for_node("smart_router")