Skip to content
5 changes: 3 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ uv run pytest tests/job1/unit_test.py::test_enrich_orders
The detailed specs live in [`specs/`](specs/) — read the relevant one **before** working in that area:

- [`specs/architecture.md`](specs/architecture.md) — execution flow, CLI surface, key classes, jobs DAG, job **generation** (`scripts/sdk_generate_template_job.py` → `resources/jobs.yml`; never hand-edit), CI/CD, job-level params, deploy-time env vars, logging, production guardrails, adding a new job.
- [`specs/data-model.md`](specs/data-model.md) — catalog/schema isolation, medallion flow, table schemas, price freeze, liquid clustering, DQX/quarantine, lineage.
- [`specs/data-model.md`](specs/data-model.md) — plain-words pipeline overview, catalog/schema isolation, medallion flow, table schemas, **field naming conventions**, product-name freeze, liquid clustering, DQX/quarantine, lineage.
- [`specs/test-plan.md`](specs/test-plan.md) — unit / integration / load tests.
- [`specs/tooling.md`](specs/tooling.md) — MCP servers (Databricks, AWS billing/docs, context7), CLI, and skills: what to reach for and when.
- The AI/BI dashboard (`resources/orders_dashboard.lvdash.json`) is committed and edited directly; the catalog is `${var.catalog}`, resolved at deploy time.

### Load-bearing invariants (keep in mind; full detail in specs)

- **Catalog-level isolation** — env separation is at the *catalog* level (`dev_<user>` / `staging` / `prod`); the same medallion schemas (`external_source`/`raw`/`curated`/`report`/`ops`) exist in each. Staging/prod catalogs+schemas are owned by `make init`, not the runtime wheel.
- **Price freeze** — silver freezes `line_revenue` at sale time: batch via an insert-only `MERGE`, SDP via a **streaming table** (a materialized view would *restate* the price; a streaming table appends once and *freezes*). `total_value` in gold is `SUM(line_revenue)`.
- **Product-name freeze** — silver freezes `product_name` onto each order line at sale time: batch via an insert-only `MERGE`, SDP via a **streaming table** (a materialized view would *restate* the name; a streaming table appends once and *freezes*). A later rename never relabels booked orders. `unit_price` is static; `total_value` in gold is `SUM(item_total)`.
- **Generated job config** — `resources/jobs.yml` is generated by `scripts/sdk_generate_template_job.py`; never hand-edit it (it's gitignored).
- **Schema-drift guard** — all medallion writes use `overwriteSchema=false` (the only exception is `ops._health`).

Expand Down Expand Up @@ -81,4 +81,5 @@ Favor solutions with less code, fewer classes, and fewer abstractions. When two
- Don't add `CREATE CATALOG` or `CREATE SCHEMA` calls outside the `args.env == "dev"` branch in `config.py`. Staging/prod catalogs and schemas are owned by `make init`; runtime jobs run without those privileges.
- Don't commit `resources/jobs.yml` (gitignored — regenerated on every deploy).
- Don't commit `.databricks-resources.json` (gitignored — local provisioning state, diverges per developer).
- Don't commit or hand-edit `resources/orders_dashboard_deploy.lvdash.json` (gitignored — regenerated from `orders_dashboard.lvdash.json` on every deploy with `${var.catalog}` resolved, since DABs can't substitute bundle vars inside `.lvdash.json` content). Edit the source `orders_dashboard.lvdash.json` instead.
- Don't hand-edit `resources/jobs.yml` — it is overwritten on every deploy. Change `scripts/sdk_generate_template_job.py` instead (it generates jobs, the SDP pipeline, and the dashboard resource stanza into `resources/jobs.yml`). `resources/orders_dashboard.lvdash.json` is committed and editable directly.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ This project template demonstrates how to:
Deep technical detail lives in [`specs/`](specs/) (the README stays a landing page):

- [**Architecture**](specs/architecture.md) — wheel/CLI surface, jobs DAG, job generation, CI/CD, job-level params, deploy-time env vars, logging, production guardrails, folder structure.
- [**Data model**](specs/data-model.md) — catalog/schema isolation, medallion data flow (diagram), table schemas, price-freeze semantics, liquid clustering, DQX/quarantine, lineage.
- [**Data model**](specs/data-model.md) — catalog/schema isolation, medallion data flow (diagram), table schemas, product-name freeze semantics, liquid clustering, DQX/quarantine, lineage.
- [**Test plan**](specs/test-plan.md) — unit, integration, and load tests.
- [**Tooling**](specs/tooling.md) — MCP servers (Databricks, AWS billing/docs, context7), the Databricks CLI, and the bundled skills.

Expand Down
2 changes: 1 addition & 1 deletion resources/orders_dashboard.lvdash.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"name": "ds_orders",
"displayName": "Orders",
"queryLines": [
"SELECT order_date, country, customer_name AS customer, product_name AS product, category_name AS category, SUM(total_value) AS total_value, SUM(total_orders) AS total_orders FROM ${var.catalog}.report.order_agg WHERE order_date BETWEEN :date_range.min AND :date_range.max GROUP BY 1, 2, 3, 4, 5"
"SELECT a.order_date, a.country, a.customer_name AS customer, l.product_name AS product, a.product_name AS product_name, a.category_name AS category, SUM(a.total_value) AS total_value, SUM(a.total_orders) AS total_orders FROM ${var.catalog}.report.order_agg a JOIN (SELECT product_id, product_name FROM (SELECT product_id, product_name, ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY order_date DESC, product_name DESC) AS rn FROM ${var.catalog}.report.order_agg) WHERE rn = 1) l ON a.product_id = l.product_id WHERE a.order_date BETWEEN :date_range.min AND :date_range.max GROUP BY 1, 2, 3, 4, 5, 6"
],
"parameters": [
{
Expand Down
37 changes: 31 additions & 6 deletions scripts/sdk_drop_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@
from _sdk_sql import get_warehouse_id, run_sql
from template.config import MEDALLION_SCHEMAS

# DROP must match the object kind: a materialized view / streaming table / view cannot be
# removed with DROP TABLE (it raises a type-mismatch error that IF EXISTS does NOT suppress).
# The SDP pipeline materializes MVs and streaming tables in the raw/curated/report schemas,
# so a blind DROP TABLE loop aborts mid-way and leaves the catalog half-dropped.
# DROP must match the object kind: a materialized view / view cannot be removed with
# DROP TABLE (it raises a type-mismatch error that IF EXISTS does NOT suppress). The SDP
# pipeline materializes MVs and streaming tables in the raw/curated/report schemas, so a
# blind DROP TABLE loop aborts mid-way and leaves the catalog half-dropped.
#
# Some SQL warehouse channels also reject the `DROP STREAMING TABLE` grammar with a
# PARSE_SYNTAX_ERROR (while accepting a plain DROP TABLE on the streaming table). So we try
# the kind-specific statement first and fall back to DROP TABLE — _drop_candidates() always
# appends DROP TABLE as the last resort.
_DROP_STMT = {
TableType.MATERIALIZED_VIEW: "DROP MATERIALIZED VIEW IF EXISTS",
TableType.STREAMING_TABLE: "DROP STREAMING TABLE IF EXISTS",
Expand All @@ -19,6 +24,17 @@
}


def _drop_candidates(table_type) -> list[str]:
"""DROP statements to try in order. The kind-specific form first (when there is one),
then plain DROP TABLE as a fallback for warehouses whose parser rejects the
kind-specific grammar (notably DROP STREAMING TABLE)."""
specific = _DROP_STMT.get(table_type)
candidates = [specific] if specific else []
if "DROP TABLE IF EXISTS" not in candidates:
candidates.append("DROP TABLE IF EXISTS")
return candidates


def _resolve_catalog(workspace: WorkspaceClient, env: str) -> str:
if env == "dev":
local_part = workspace.current_user.me().user_name.split("@")[0]
Expand Down Expand Up @@ -56,8 +72,17 @@ def main():
count = 0
for schema in MEDALLION_SCHEMAS:
for table in workspace.tables.list(catalog_name=catalog, schema_name=schema):
stmt = _DROP_STMT.get(table.table_type, "DROP TABLE IF EXISTS")
run_sql(workspace, warehouse_id, f"{stmt} `{catalog}`.`{schema}`.`{table.name}`")
fq = f"`{catalog}`.`{schema}`.`{table.name}`"
last_err: Exception | None = None
for stmt in _drop_candidates(table.table_type):
try:
run_sql(workspace, warehouse_id, f"{stmt} {fq}")
last_err = None
break
except Exception as e: # try the next candidate (e.g. parser rejects DROP STREAMING TABLE)
last_err = e
if last_err is not None:
raise last_err
count += 1

print(f"Done. {count} object(s) dropped.")
Expand Down
10 changes: 5 additions & 5 deletions scripts/sdk_generate_template_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,12 @@ def _build_job(environment: str, sp_id: str | None) -> dict:
]
)

schedule = None
# No standalone schedule on the batch job: in prod it is triggered solely by
# job1_prod_integration (via its `run` RunJobTask), so the orchestrator is the
# single entry point. Prod still gets failure/duration alerting below.
email_notifications = None
health = None
if environment == "prod":
schedule = CronSchedule(quartz_cron_expression="0 0 5 * * ?", timezone_id="UTC")
email_notifications = JobEmailNotifications(
on_failure=_alert_emails(),
on_duration_warning_threshold_exceeded=_alert_emails(),
Expand All @@ -264,8 +265,8 @@ def _build_job(environment: str, sp_id: str | None) -> dict:
name=f"{JOB_NAME}_${{bundle.target}}",
timeout_seconds=3600,
# max_concurrent_runs=1 + queue.enabled=True: if a run is already in flight
# when the next scheduled tick arrives, queue it rather than silently
# skipping. Skipping a scheduled prod run is almost never what you want.
# when job1_prod_integration triggers this job again, queue it rather than
# silently skipping. Skipping a prod run is almost never what you want.
max_concurrent_runs=1,
queue=QueueSettings(enabled=True),
# Suppress alerts for runs that were manually cancelled or skipped by an
Expand All @@ -286,7 +287,6 @@ def _build_job(environment: str, sp_id: str | None) -> dict:
],
tags=_tags(environment),
environments=_environments(),
schedule=schedule,
email_notifications=email_notifications,
health=health,
tasks=tasks,
Expand Down
6 changes: 6 additions & 0 deletions specs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

---

## [#43] · feat/freeze-product-name · 2026-06-23 · feat: freeze product_name instead of line_revenue

Removed the synthetic `line_revenue`/`unit_price_at_sale` columns — gold `total_value` is now `SUM(item_total)` (the line value the source already freezes on the order) — and re-pointed the silver insert-only-MERGE / streaming-table freeze at the mutable `product_name`, which the seed now changes by renaming 2 products per run (`Product N` → `Product N.k`); `unit_price` stays as a static attribute. The AI/BI "by product" chart and Product filter both identify a product by its latest name (consolidating by `product_id`, one line per physical product across renames, and filtering shows the full pre-/post-rename history); the frozen historical names remain in `report.order_agg` for audit. Also removed the batch `job1`'s standalone prod schedule (the SDP pipeline already had none) so `job1_prod_integration` is the single prod trigger orchestrating seed → batch + SDP, fixed `sdk_drop_tables.py` to fall back to `DROP TABLE` when a warehouse's parser rejects `DROP STREAMING TABLE`, and updated unit/integration tests, schemas, the SDP pipeline, and docs.

---

## [#42] · docs/reorg-specs-tooling · 2026-06-23 · docs: rename docs/→assets/, consolidate tooling into specs/tooling.md, slim CLAUDE.md

Renamed the image-only `docs/` folder to `assets/` (it held no prose, only screenshots and the CI/CD draw.io export) and repointed every `<img>` reference in the README and specs. Added `specs/tooling.md` consolidating all four MCP servers (Databricks, aws-billing-cost, aws-documentation, context7), the Databricks CLI, and the bundled skills, then trimmed `CLAUDE.md`'s tooling section to a short every-session decision list that points at it. Moved the repo folder-structure tree out of `architecture.md` into the `specs/README.md` routing hub and flagged `CHANGELOG.md` there as append-only (don't read for context).
Expand Down
2 changes: 1 addition & 1 deletion specs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ reference. Read the relevant spec before working in that area.
| Spec | Read it when you're touching… |
|---|---|
| [architecture.md](architecture.md) | the wheel/CLI surface, jobs DAG, job generation, CI/CD, job-level params, deploy-time env vars, logging, or production guardrails. |
| [data-model.md](data-model.md) | the catalog/schema model, medallion flow, table schemas, the price-freeze semantics, liquid clustering, DQX/quarantine, or lineage. |
| [data-model.md](data-model.md) | the catalog/schema model, medallion flow, table schemas, the product-name freeze semantics, liquid clustering, DQX/quarantine, or lineage. |
| [test-plan.md](test-plan.md) | unit, integration, or load tests. |
| [tooling.md](tooling.md) | MCP servers (Databricks, AWS, context7), the Databricks CLI, and the bundled skills — what to reach for and when. |
| [CHANGELOG.md](CHANGELOG.md) | the per-PR change history. **Append-only — add an entry before every merge; don't read it for context.** |
Expand Down
4 changes: 3 additions & 1 deletion specs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ does not expose custom env vars to the process).
## Jobs DAG

The batch job (`job1`) runs as a Lakeflow Job DAG; the declarative path (`job1_sdp`) runs the same
ETL as a Spark Declarative Pipeline.
ETL as a Spark Declarative Pipeline. In **prod**, neither has its own schedule: `job1_prod_integration`
is the sole scheduled entry point (daily, 06:00 America/Sao_Paulo) — it runs `seed_sources`, then
triggers the batch `job1` (`RunJobTask`) and the `job1_sdp` pipeline (`PipelineTask`) in parallel.

<img src="../assets/dag.png">

Expand Down
Loading
Loading