|
| 1 | +Add a new feature sample and its validations to the Lakeflow Framework samples. |
| 2 | + |
| 3 | +## Context |
| 4 | + |
| 5 | +You are helping add a new feature sample to the Lakeflow Framework. A feature sample demonstrates a specific framework capability end-to-end — from a dataflowspec JSON/YAML file through to validated output tables. |
| 6 | + |
| 7 | +The framework root is the current working directory (the repo root). |
| 8 | + |
| 9 | +## Your Task |
| 10 | + |
| 11 | +The user will describe what feature they want to add a sample for (e.g. "add a sample for the new append_dedup flow type" or "add a sample for custom Python aggregation transforms"). Follow all steps below. |
| 12 | + |
| 13 | +--- |
| 14 | + |
| 15 | +## Step 1 — Understand the Feature |
| 16 | + |
| 17 | +1. Read the relevant framework source code in `src/` to understand what the feature does, its configuration keys, and what output tables it produces. |
| 18 | +2. Read at least 2 similar existing dataflowspecs in `samples/bronze_sample/src/dataflows/feature_samples/dataflowspec/` to understand the naming convention (`{feature_name}_main.json`) and structure. |
| 19 | +3. Decide which pipeline group the new sample belongs to: |
| 20 | + - `feature_samples_general` → goes in the General Pipeline |
| 21 | + - `feature_samples_data_quality` → goes in the Data Quality Pipeline |
| 22 | + - `feature_samples_snapshots` → goes in the Snapshots Pipeline |
| 23 | + - `feature_samples_table_migration` → goes in the Table Migration Pipeline |
| 24 | + - `feature_samples_python` → goes in the Python Pipeline |
| 25 | + - If it's a new group, you'll also need to create a new pipeline resource YAML (see Step 4). |
| 26 | + |
| 27 | +--- |
| 28 | + |
| 29 | +## Step 2 — Create the Dataflowspec |
| 30 | + |
| 31 | +Create `samples/bronze_sample/src/dataflows/feature_samples/dataflowspec/{feature_name}_main.json` (or `.yaml`). |
| 32 | + |
| 33 | +Follow the exact naming pattern: `{feature_name}_main.json`. |
| 34 | + |
| 35 | +Rules: |
| 36 | +- Set `dataFlowGroup` to the correct pipeline group from Step 1 |
| 37 | +- Use `{staging_schema}`, `{bronze_schema}`, `{silver_schema}` etc. as placeholders (these are substituted at runtime) |
| 38 | +- Use a descriptive `dataFlowId` that starts with `feature_` to distinguish it as a sample |
| 39 | +- Set `targetDetails.table` to `feature_{feature_name}` or a similarly descriptive name |
| 40 | +- Enable `delta.enableChangeDataFeed` in tableProperties where applicable |
| 41 | +- Validate the spec using: `python scripts/validate_dataflows.py samples/bronze_sample/src/dataflows/feature_samples/dataflowspec/{feature_name}_main.json` |
| 42 | + |
| 43 | +--- |
| 44 | + |
| 45 | +## Step 3 — Add Test Data (if needed) |
| 46 | + |
| 47 | +If the feature requires specific source data that doesn't already exist: |
| 48 | +1. Check `samples/test_data_and_orchestrator/src/create_schemas_and_tables.ipynb` — add DDL if a new staging table is needed |
| 49 | +2. Check `samples/test_data_and_orchestrator/src/run_1_staging_load.ipynb` — add initial data inserts |
| 50 | +3. Check `run_2_staging_load.ipynb`, `run_3_staging_load.ipynb`, `run_4_staging_load.ipynb` — add incremental data to exercise SCD2 history, updates, deletes, snapshot changes, etc. |
| 51 | + |
| 52 | +Data loading conventions: |
| 53 | +- Run 1: initial state (all active, no history) |
| 54 | +- Run 2: first set of changes (updates, new records, deletions via flag) |
| 55 | +- Run 3: second set of changes (more updates, snapshot overwrites) |
| 56 | +- Run 4: final changes (additional updates to test multiple SCD2 versions) |
| 57 | + |
| 58 | +--- |
| 59 | + |
| 60 | +## Step 4 — Register in a Pipeline (only if new pipeline group) |
| 61 | + |
| 62 | +If the feature uses a new `dataFlowGroup` that doesn't map to an existing pipeline, create pipeline resource YAMLs: |
| 63 | +- `samples/bronze_sample/resources/serverless/{pipeline_name}.yml` |
| 64 | +- `samples/bronze_sample/resources/classic/{pipeline_name}.yml` |
| 65 | + |
| 66 | +Copy the structure from an existing pipeline YAML (e.g. `bronze_feature_samples_pipeline_general.yml`) and set `pipeline.dataFlowGroupFilter` to the new group. |
| 67 | + |
| 68 | +Then add the pipeline lookup variable to `samples/test_data_and_orchestrator/databricks.yml`: |
| 69 | +```yaml |
| 70 | +lakeflow_samples_{pipeline_key}_id: |
| 71 | + lookup: |
| 72 | + pipeline: "${var.name_prefix}Lakeflow Framework - {Pipeline Display Name} (${var.logical_env})" |
| 73 | +``` |
| 74 | +
|
| 75 | +--- |
| 76 | +
|
| 77 | +## Step 5 — Add Pipeline Tasks to Orchestrator Jobs |
| 78 | +
|
| 79 | +Add the pipeline as a task in all 4 run job YAMLs (both classic and serverless): |
| 80 | +- `samples/test_data_and_orchestrator/resources/serverless/run_1_load_and_schema_initialization_job.yml` |
| 81 | +- `samples/test_data_and_orchestrator/resources/serverless/run_2_load_job.yml` |
| 82 | +- `samples/test_data_and_orchestrator/resources/serverless/run_3_load_job.yml` |
| 83 | +- `samples/test_data_and_orchestrator/resources/serverless/run_4_load_job.yml` |
| 84 | +- Same 4 files under `resources/classic/` |
| 85 | + |
| 86 | +Use `full_refresh: true` in run_1 and `full_refresh: false` in run_2/3/4. |
| 87 | + |
| 88 | +Also add the new pipeline task key to the `depends_on` list of the `validate_run_N` task in all 8 files. |
| 89 | + |
| 90 | +--- |
| 91 | + |
| 92 | +## Step 6 — Write Validations |
| 93 | + |
| 94 | +Add validation cells to all 4 validate notebooks: |
| 95 | +`samples/test_data_and_orchestrator/src/validate_run_{1,2,3,4}.ipynb` |
| 96 | + |
| 97 | +For each notebook, insert new markdown + code cells **before the final `v.print_summary()` cell**. |
| 98 | + |
| 99 | +**Key principles for good validations:** |
| 100 | + |
| 101 | +1. **Exact counts for deterministic tables**: Use `v.validate_row_count(table, N, description)` when you know exactly how many rows should be present after each run (e.g. append-only, SCD1 batch-overwrite, small deterministic CDC streams). |
| 102 | + |
| 103 | +2. **Min counts for complex tables**: Use `v.validate_min_row_count(table, N, description)` for tables where exact counts are harder to predict (e.g. file-based historical snapshots, complex SCD2 with many interacting sources). |
| 104 | + |
| 105 | +3. **SCD2 active/closed counts**: Use these validators for CDC/SCD2 tables: |
| 106 | + - `v.validate_active_scd2_count(table, N, end_at_col)` — records where `end_at_col IS NULL` |
| 107 | + - `v.validate_closed_scd2_count(table, N, end_at_col)` — records where `end_at_col IS NOT NULL` |
| 108 | + - `v.validate_min_closed_scd2_count(table, N, end_at_col)` — at least N closed records |
| 109 | + |
| 110 | +4. **Value checks**: Use `v.validate_column_value(table, where_clause, column, expected_value, description)` to verify specific field values (e.g. check that John's email updated correctly across runs). |
| 111 | + |
| 112 | +5. **Existence checks**: Use `v.validate_values_exist(table, column, [list_of_values], description)` to verify that specific IDs or keys exist. |
| 113 | + |
| 114 | +6. **Null checks**: Use `v.validate_column_not_null(table, column_expr, description)` for operational metadata or required columns. |
| 115 | + |
| 116 | +**Per-run validation logic:** |
| 117 | +- **Run 1**: Initial state. All streaming tables show data from the first load. SCD2 tables show all-active records (0 closed). SCD1 tables show the first snapshot. Historical snapshot tables show SCD2 history from all pre-loaded files. |
| 118 | +- **Run 2**: First incremental. CDC tables grow by the Run 2 inserts. SCD2 tables gain closed records for updated keys. SCD1 tables update in-place. Snapshot sources are overwritten. |
| 119 | +- **Run 3**: Second incremental. Similar pattern; note which snapshot sources are overwritten and which tables are unchanged. |
| 120 | +- **Run 4**: Final incremental. Verify the last expected state, including checking exact final values (e.g. latest email address). |
| 121 | + |
| 122 | +**Add a markdown cell** before each code cell explaining what tables are being validated and the expected state. |
| 123 | + |
| 124 | +--- |
| 125 | + |
| 126 | +## Step 7 — Verify |
| 127 | + |
| 128 | +1. Run the dataflowspec validator to check the spec is valid: |
| 129 | + ```bash |
| 130 | + python scripts/validate_dataflows.py samples/bronze_sample/src/dataflows/feature_samples/dataflowspec/{feature_name}_main.json -v |
| 131 | + ``` |
| 132 | + |
| 133 | +2. If Databricks CLI is configured, you can deploy and test with a logical environment alias: |
| 134 | + ```bash |
| 135 | + cd samples |
| 136 | + ./deploy_and_test.sh -l "_test_{feature_name}" -u "<your-databricks-email>" -h "<your-workspace-url>" -p "<your-cli-profile>" --runs 4 |
| 137 | + ``` |
| 138 | + The `logical_env` parameter (e.g. `_test_my_feature`) namespaces all schemas so you don't affect the main deployment. |
| 139 | + |
| 140 | +3. After a successful test run, review the validation notebook output to confirm all assertions pass. |
| 141 | + |
| 142 | +--- |
| 143 | + |
| 144 | +## Checklist |
| 145 | + |
| 146 | +Before finishing, confirm: |
| 147 | +- [ ] Dataflowspec file created and validates cleanly with `validate_dataflows.py` |
| 148 | +- [ ] Test data added (or confirmed that existing data covers the feature) |
| 149 | +- [ ] Pipeline registered in orchestrator databricks.yml (if new group) |
| 150 | +- [ ] Pipeline tasks added to all 8 run_N job YAMLs (classic + serverless) with correct depends_on |
| 151 | +- [ ] Validation cells added to all 4 validate_run_N notebooks covering: Run 1 (initial), Run 2 (first incremental), Run 3 (second incremental), Run 4 (final state) |
| 152 | +- [ ] Each validation uses the most appropriate validator method (exact count vs min count vs SCD2 active/closed) |
| 153 | +- [ ] All validate task `depends_on` lists in the job YAMLs include the new pipeline task key |
0 commit comments